powered by simpleCommunicator - 2.0.30     © 2024 Programmizd 02
Map
Форумы / Java [игнор отключен] [закрыт для гостей] / Тяпничный CDC/JMS
25 сообщений из 49, страница 1 из 2
Тяпничный CDC/JMS
    #39921165
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Привет котаны-братаны.

По мотивам Мониторинг изменений в базах данных

Буду краток.

Две базы. Два приложения. И 1 месседж брокер.

Запускаем мастера и подчиеннного.

Код: java
1.
$ java CDCMaster --jdbc-url "jdbc:postgresql://host1:5432/base1" --broker "tcp://broker1:61616"


Код: java
1.
$ java CDCSlave --jdbc-url "jdbc:postgresql://host2:5432/base1" --broker "tcp://broker1:61616"


Вобщем надо как-то так исхитриться чтоб изменения мастера попали в Slave.

Топология сети - любая.
База - любая (postgres как пример)
Брокер - любой.
Требований по SLA нету. Как сделаете так и будет.
Изменения - только DML (Insert/Update/Delete)


Go-go кодить.

P.S. Ну триггерочки и служебные таблички там уж сами посоздавайте если надо.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921176
Leonid Kudryavtsev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton

Go-go кодить.

нафига?
https://google.gik-team.com/?q=postgresql master slave replication

Ты же хорошо представляешь, сколько строк кода и сколько такая система реально стоит.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921186
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
не вникал
http://ha-jdbc.org/doc.html
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921187
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Leonid Kudryavtsev, сколько?
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921198
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Понты делов. К воскресенье будет POC.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921206
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
Понты делов. К воскресенье будет POC.

Прям со сплит брейном и поддержкой оффлайн? И чтоб ни единого разрыва!
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921208
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Вы уже процедуру main написали? А?
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921209
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Leonid Kudryavtsev
mayton

Go-go кодить.

нафига?
https://google.gik-team.com/?q=postgresql master slave replication

Ты же хорошо представляешь, сколько строк кода и сколько такая система реально стоит.

Абстрактный JDBC источник.

Я-же писал в задании.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921210
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky
не вникал
http://ha-jdbc.org/doc.html

Если это тоже что и XA - то это тормозной кусок говна. Синхронная фиксация в две системы. Тормоз еще тот.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921214
iOracleDev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
База - любая (postgres как пример)
...
P.S. Ну триггерочки и служебные таблички там уж сами посоздавайте если надо.

Бессмысленное занятие, каждая уважающая себя СУБД уже имеет механизмы репликации
работающие на уровне ядра, триггерочками получится тормозной кусок говна))

Хотя если данных не много и репликация почтовая, то может и пригодится поделье,
но даже в этом случае нативная для СУБД часть будет составлять основной объем кода,
на промежуточном уровне будет только забор готовых upsert/delete и применение их
на другой инстанс в нужном порядке. Основная проблема будет в том, что триггеры на операции
срабатывают непосредственно при проведении операции, а фиксация происходит позже и триггера
уровня строки на фиксацию нет.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921216
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
iOracleDev,

Ты серьёзно так напрягся? Чувак. Это пятничный топик. Это брейншторм. Это челлендж.

Здесь не ищут коробочных решений!!! Go-go!!
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921218
iOracleDev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton,

Смотрю с интересом на то что получится, особенно на часть касающуюся СУБД))
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921235
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Не стоит забывать о проблемах связанных с текущим временем. Если запрос содержит insert currentSystemTime или random - в общем нечто недетерминированное, то просто прогнать инсерт на реплике не получится
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921247
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
Если это тоже что и XA - то это тормозной кусок говна. Синхронная фиксация в две системы. Тормоз еще тот.
Подписался:)
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921273
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник
Не стоит забывать о проблемах связанных с текущим временем. Если запрос содержит insert currentSystemTime или random - в общем нечто недетерминированное, то просто прогнать инсерт на реплике не получится

А что будет видеть триггер?
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921277
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
В качестве первого закидона. Этот код я писал еще летом. И это еще не CDC.

Это подготовка схемы БД к созданию служебных таблиц и триггеров.
Код - требует улучшений. В частности - отвязки от Постгреса и привязки к генерализованной
СУБД.

Есть еще второй вариант такого-же CDC где EAV табличка заменяется на множество типизированных.

Код: javascript
1.
2.
3.
4.
5.
6.
package mayton.watchdog.pg

case class ColumnDefinition( val columnName : String,
                             val dataType   : String,
                             val characterMaximumLength : Int,
                             val isNullable : Boolean)



Код: javascript
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
package mayton.watchdog.pg

import java.io.{FileOutputStream, PrintWriter}
import java.sql.Connection

import mayton.watchdog.pg.Utils._

import scala.collection.mutable

object PgWatchdogEAV {

  def EAV_LOG = "eav_log"
  def TRIGGER_SUFFIX = "_trigger"
  def FUNC_SUFFIX = "_func"

  def createDefaultEAVLog(script : PrintWriter) : Unit = {
    script.println(
      s"""|CREATE TABLE $EAV_LOG (
          |  TS          TIMESTAMP   NOT NULL,
          |  OPERATION   CHAR(1)     NOT NULL CHECK (OPERATION IN ('I','U','D')),
          |  TABLE_NAME  VARCHAR(64) NOT NULL,
          |  COLUMN_NAME VARCHAR(64) NOT NULL,
          |  COL_VALUE   TEXT);
          |
          |  """.stripMargin)
  }

  def eavTriggersFunctionScript(connection: Connection, script: PrintWriter) : Unit = {
    for(tableName <- tables(connection).filter(p => p != EAV_LOG)) {
      val columnDefinitions : List[ColumnDefinition] = getColumnNames(connection, tableName)
      eavFunctionScript(script, tableName, columnDefinitions)
    }
  }

  def eavFunctionScript(script : PrintWriter, tableName : String, columnDefinitions : List[ColumnDefinition]) : Unit = {

    script.print(
      s"""CREATE OR REPLACE FUNCTION $tableName$FUNC_SUFFIX() RETURNS TRIGGER AS $$$$
         |BEGIN
         |  IF TG_OP = 'INSERT' THEN
         |""".stripMargin)

    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF NEW.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'I', '$tableName', '${columnDefinition.columnName}', NEW.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
    }

    script.printf("  ELSIF TG_OP = 'UPDATE' THEN\n")
    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF NEW.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'U', '$tableName', '${columnDefinition.columnName}', NEW.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
    }

    script.printf("  ELSIF TG_OP = 'DELETE' THEN\n")
    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF NEW.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'D', '$tableName', '${columnDefinition.columnName}', NEW.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
    }

    script.print(
     s"""|  END IF;
         |RETURN NEW;
         |END;
         |$$$$
         |LANGUAGE plpgsql;
         |
         |""".stripMargin)

    script.print(
      s"""
         |CREATE TRIGGER $tableName$TRIGGER_SUFFIX AFTER UPDATE OR INSERT OR DELETE
         |   ON $tableName
         |   FOR EACH ROW EXECUTE PROCEDURE $tableName$FUNC_SUFFIX();
         |
         |""".stripMargin)
  }


  def process(connection: Connection, script : PrintWriter) : Boolean = {
    createDefaultEAVLog(script)
    eavTriggersFunctionScript(connection, script)
    true
  }

  def main(arg : Array[String]) : Unit = {
    val props: mutable.Map[String, String] = toScalaMutableMap(tryToLoadSensitiveProperties())
    val host = props("host")
    val port = props("port")
    val database = props("database")
    val connection = createConnection(s"jdbc:postgresql://$host:$port/$database", props("user"), props("password"))
    val script = new PrintWriter(new FileOutputStream("out/pg-watchdog-tables-eav.sql"))
    process(connection, script)
    script.close()
  }

}



Код: javascript
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
package mayton.watchdog.pg

import java.io.{File, FileInputStream}
import java.sql.{Connection, DriverManager}
import java.util.Properties

import mayton.watchdog.pg.PgWatchdogTables.TABLE_PREFIX

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

object Utils {

  def getColumnNames(connection: Connection, tableName: String) : List[ColumnDefinition] = {
    var listBuffer = ListBuffer[ColumnDefinition]()
    val statement = connection.createStatement()
    val resultSet = statement.executeQuery(
      s"""SELECT
         |  ordinal_position,
         |  column_name,
         |  data_type,
         |  character_maximum_length,
         |  is_nullable
         |FROM information_schema.columns WHERE
         |  table_schema   = current_schema()
         |  AND table_name = '$tableName'
         |  ORDER BY ordinal_position
         |
         |  """.stripMargin
    )
    while(resultSet.next()){
      listBuffer += ColumnDefinition(
        resultSet.getString("column_name"),
        resultSet.getString("data_type"),
        resultSet.getInt("character_maximum_length"),
        resultSet.getBoolean("is_nullable")
      )
    }
    resultSet.close()
    statement.close()
    listBuffer.toList
  }

  def tables(connection: Connection) : List[String] = {
    var listBuffer = ListBuffer[String]()
    val statement = connection.createStatement()
    val resultSet = statement.executeQuery(
      "SELECT table_name FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'pg_catalog') AND table_type = 'BASE TABLE'")

    while(resultSet.next()) {
      listBuffer += resultSet.getString("table_name")
    }
    resultSet.close()
    statement.close()
    listBuffer.toList
  }

  def createConnection(url : String, user : String, password : String) : Connection = {
    val props = new Properties
    props.setProperty("user", user)
    props.setProperty("password", password)
    DriverManager.getConnection(url, props)
  }

  def tryToLoadSensitiveProperties() : Properties = {
    val props = new Properties()
    if (new File("sensitive.properties").exists()) {
      props.load(new FileInputStream("sensitive.properties"))
    } else {
      props.put("host",     "localhost")
      props.put("port",     "5432")
      props.put("database", "postgres")
      props.put("user",     "postgres")
      props.put("password", "postgres123")
    }
    props
  }

  def toScalaMutableMap(props : Properties) : mutable.Map[String, String] = {
    var map : mutable.Map[String, String] = new mutable.HashMap[String,String]()
    import scala.collection.JavaConverters._
    props.entrySet().asScala.foreach {
      entry => map += ((entry.getKey.asInstanceOf[String], entry.getValue.asInstanceOf[String]))
    }
    map
  }
}

...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921279
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Всё. Больше никаких тузов в рукаве у меня нет. Надо садиться и писать чортов код.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921306
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Думаю над docker-compose. Исключительно в целях тестирования.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921393
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Docker-compose. Пока вот так вот. Еще не тестил.

Возможно не хватает некоторых env-переменных для ApacheMQ. Логины-пароли там. Админка и прочее.
Вобщем добавте кому не лень.

Здесь /bigdata - это просто мой локальный путь где диск толстый и есть место для флуда. Замените его соотв
на ваш путь как будет удобно.

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
version: '3.7'

services:
  db1:
    build: .
    ports:
      - "5432:5433"
    volumes:
      - /bigdata/cdc-jms/db1:/var/lib/postgresql/data
    environment:
      FLASK_ENV: development
    image: "postgres:12.1"

  db2:
    build: .
    ports:
      - "5432:5434"
    volumes:
      - /bigdata/cdc-jms/db2:/var/lib/postgresql/data
    environment:
      FLASK_ENV: development
    image: "postgres:12.1"

# http://localhost:8161/admin
  activemq:
    build: .
    ports:
      - "8161:8162"
      - "61616:61617"
      - "61613:61614"
    volumes:
      - /bigdata/cdc-jms/activemq/data:/data
      - /bigdata/cdc-jms/activemq/log:/var/log/activemq
    environment:
      FLASK_ENV: development
    image: "webcenter/activemq"
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921395
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Дополнение №1 к базовому заданию.

Для EAV-модели. Что делать с транзакциями на удаление строк в том случае если в таблице нет PK ?
Скорее всего - ничего.

Придется констатировать что мы не можем удалять такие строки на Slave системе.

Итого: игнорируем delete/update для таблиц без PK.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921403
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Вобщем в докер-композиции я всё напутал. Порты не те.

Вот так работает. Можно уже заходить на админский порт MQ и смотреть чо как.
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
version: '3.5'

services:
  db1:
    image: "postgres:12.1"
    ports:
      - "15432:5432"
    volumes:
      - /bigdata/cdc-jms/db1:/var/lib/postgresql/data
    environment:
      FLASK_ENV: development

  db2:
    image: "postgres:12.1"
    ports:
      - "15433:5432"
    volumes:
      - /bigdata/cdc-jms/db2:/var/lib/postgresql/data
    environment:
      FLASK_ENV: development

# http://localhost:8161/admin
  activemq:
    image: "webcenter/activemq"
    ports:
      - "8162:8161"
      - "61617:61616"
      - "61614:61613"
    volumes:
      - /bigdata/cdc-jms/activemq/data:/data
      - /bigdata/cdc-jms/activemq/log:/var/log/activemq
    environment:
      ACTIVEMQ_ADMIN_LOGIN : admin
      ACTIVEMQ_ADMIN_PASSWORD : admin123


Еще надо придумать как автоматизировать создание и наполнение БД.

Здесь я пока - пас. Надеюсь что у мемберов есть под рукой какая-то учебная базячка.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921434
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Дополнение к базовому заданию №2

Как передавать информацию о первичных ключах? Хотелось бы чтобы логика Slave(consumer)
была максимально примитивной. Consumer должен просто принимать JMS/Json сообщения из канала
и просто их трансформировать в целевой DML для целевой DBMS.

(кстати из этого вытекает бонус. Тип DBMS может быть разный)


При этом для формирования предложения WHERE он не должен заглядывать ни в какие справочники
таблиц и ключей. Фактически JMS сообщение должно уже содержать эту информацию.

Consumer - простой и деревянный.
Producer - умный.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921533
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Неутешительные итоги. Насчет POC.
Есть некий каркас вида двух консольных приложений.
Master.java / Slave.java которые просто стартуют
и умеют распознавать входные аргументы.

И есть парочка сущностей типа EavEntity и JMSEntity.
Первая нужна просто для маппинга таблички

Код: java
1.
2.
3.
4.
5.
6.
7.
CREATE TABLE EAV_LOG (
          TS          TIMESTAMP   NOT NULL,
          OPERATION   CHAR(1)     NOT NULL CHECK (OPERATION IN ('I','U','D')),
          TABLE_NAME  VARCHAR(64) NOT NULL,
          COLUMN_NAME VARCHAR(64) NOT NULL,
          COL_VALUE   TEXT
);


И вторая будет по сути месседжем. Ее еще надо будет доработать для признаков первичных ключей.

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
@Immutable
public final class JmsEntity {

    public final String ts;
    public final String operation;
    public final String tableName;
    public final List<Pair<String, String>> fields;

    public JmsEntity(@NotNull String ts, @NotNull String operation, @NotNull String tableName, @NotNull List<Pair<String, String>> fields) {
        this.ts = ts;
        this.operation = operation;
        this.tableName = tableName;
        this.fields = fields;
    }
...



В каркас встроенные фрагменты учебных туториалов из набора ApacheMQ (для быстрого старта).
В принципе они должны поднятся в интеграции с env.

+Создана конфигурация docker-compose которая поднимает environment из двух Постгресов и 1 Apache Active MQ
последних версий.

Постгресы я брал только лишь для того чтобы привязаться к какой-никакой реляционной внешней системе.
Было пофиг что брать и я как и обещал буду строить независимую от dbms систему. В крайнем случае
придется написать сет бриджей или адаптеров диалектов SQL так же как это делают Hibernate/Batis.
Я просто всячески оттягиваю этот момент чтоб он не мешал на старте.

+По мелочи. Добавлены логгеры и их конфигурации. SQL скрипик и докер-скриптик.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921624
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton,
Тормозить будет.
При update таблички на сотню тысяч по where date > ХХХ
будет событий 50000. И все встанет.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921633
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Нас это остановит?

Такие события - нечастые в системе. Мы будем рассчитывать на классический OLTP а котором подобных апдейтов по дизайну нет.

Опишу это как ещё одно дополнение к ТЗ.
...
Рейтинг: 0 / 0
25 сообщений из 49, страница 1 из 2
Форумы / Java [игнор отключен] [закрыт для гостей] / Тяпничный CDC/JMS
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]