Гость
Map
Форумы / Java [игнор отключен] [закрыт для гостей] / Тяпничный CDC/JMS / 25 сообщений из 49, страница 1 из 2
31.01.2020, 20:29
    #39921165
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Привет котаны-братаны.

По мотивам Мониторинг изменений в базах данных

Буду краток.

Две базы. Два приложения. И 1 месседж брокер.

Запускаем мастера и подчиеннного.

Код: java
1.
$ java CDCMaster --jdbc-url "jdbc:postgresql://host1:5432/base1" --broker "tcp://broker1:61616"


Код: java
1.
$ java CDCSlave --jdbc-url "jdbc:postgresql://host2:5432/base1" --broker "tcp://broker1:61616"


Вобщем надо как-то так исхитриться чтоб изменения мастера попали в Slave.

Топология сети - любая.
База - любая (postgres как пример)
Брокер - любой.
Требований по SLA нету. Как сделаете так и будет.
Изменения - только DML (Insert/Update/Delete)


Go-go кодить.

P.S. Ну триггерочки и служебные таблички там уж сами посоздавайте если надо.
...
Рейтинг: 0 / 0
31.01.2020, 20:45
    #39921176
Leonid Kudryavtsev
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
mayton

Go-go кодить.

нафига?
https://google.gik-team.com/?q=postgresql master slave replication

Ты же хорошо представляешь, сколько строк кода и сколько такая система реально стоит.
...
Рейтинг: 0 / 0
31.01.2020, 21:13
    #39921186
Victor Nevsky
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
не вникал
http://ha-jdbc.org/doc.html
...
Рейтинг: 0 / 0
31.01.2020, 21:14
    #39921187
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Leonid Kudryavtsev, сколько?
...
Рейтинг: 0 / 0
31.01.2020, 21:57
    #39921198
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Понты делов. К воскресенье будет POC.
...
Рейтинг: 0 / 0
31.01.2020, 22:16
    #39921206
забыл ник
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
mayton
Понты делов. К воскресенье будет POC.

Прям со сплит брейном и поддержкой оффлайн? И чтоб ни единого разрыва!
...
Рейтинг: 0 / 0
31.01.2020, 22:17
    #39921208
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Вы уже процедуру main написали? А?
...
Рейтинг: 0 / 0
31.01.2020, 22:22
    #39921209
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Leonid Kudryavtsev
mayton

Go-go кодить.

нафига?
https://google.gik-team.com/?q=postgresql master slave replication

Ты же хорошо представляешь, сколько строк кода и сколько такая система реально стоит.

Абстрактный JDBC источник.

Я-же писал в задании.
...
Рейтинг: 0 / 0
31.01.2020, 22:25
    #39921210
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Victor Nevsky
не вникал
http://ha-jdbc.org/doc.html

Если это тоже что и XA - то это тормозной кусок говна. Синхронная фиксация в две системы. Тормоз еще тот.
...
Рейтинг: 0 / 0
31.01.2020, 22:41
    #39921214
iOracleDev
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
mayton
База - любая (postgres как пример)
...
P.S. Ну триггерочки и служебные таблички там уж сами посоздавайте если надо.

Бессмысленное занятие, каждая уважающая себя СУБД уже имеет механизмы репликации
работающие на уровне ядра, триггерочками получится тормозной кусок говна))

Хотя если данных не много и репликация почтовая, то может и пригодится поделье,
но даже в этом случае нативная для СУБД часть будет составлять основной объем кода,
на промежуточном уровне будет только забор готовых upsert/delete и применение их
на другой инстанс в нужном порядке. Основная проблема будет в том, что триггеры на операции
срабатывают непосредственно при проведении операции, а фиксация происходит позже и триггера
уровня строки на фиксацию нет.
...
Рейтинг: 0 / 0
31.01.2020, 22:44
    #39921216
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
iOracleDev,

Ты серьёзно так напрягся? Чувак. Это пятничный топик. Это брейншторм. Это челлендж.

Здесь не ищут коробочных решений!!! Go-go!!
...
Рейтинг: 0 / 0
31.01.2020, 22:49
    #39921218
iOracleDev
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
mayton,

Смотрю с интересом на то что получится, особенно на часть касающуюся СУБД))
...
Рейтинг: 0 / 0
31.01.2020, 23:50
    #39921235
забыл ник
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Не стоит забывать о проблемах связанных с текущим временем. Если запрос содержит insert currentSystemTime или random - в общем нечто недетерминированное, то просто прогнать инсерт на реплике не получится
...
Рейтинг: 0 / 0
01.02.2020, 00:44
    #39921247
Victor Nevsky
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
mayton
Если это тоже что и XA - то это тормозной кусок говна. Синхронная фиксация в две системы. Тормоз еще тот.
Подписался:)
...
Рейтинг: 0 / 0
01.02.2020, 10:57
    #39921273
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
забыл ник
Не стоит забывать о проблемах связанных с текущим временем. Если запрос содержит insert currentSystemTime или random - в общем нечто недетерминированное, то просто прогнать инсерт на реплике не получится

А что будет видеть триггер?
...
Рейтинг: 0 / 0
01.02.2020, 11:21
    #39921277
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
В качестве первого закидона. Этот код я писал еще летом. И это еще не CDC.

Это подготовка схемы БД к созданию служебных таблиц и триггеров.
Код - требует улучшений. В частности - отвязки от Постгреса и привязки к генерализованной
СУБД.

Есть еще второй вариант такого-же CDC где EAV табличка заменяется на множество типизированных.

Код: javascript
1.
2.
3.
4.
5.
6.
package mayton.watchdog.pg

case class ColumnDefinition( val columnName : String,
                             val dataType   : String,
                             val characterMaximumLength : Int,
                             val isNullable : Boolean)



Код: javascript
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
package mayton.watchdog.pg

import java.io.{FileOutputStream, PrintWriter}
import java.sql.Connection

import mayton.watchdog.pg.Utils._

import scala.collection.mutable

object PgWatchdogEAV {

  def EAV_LOG = "eav_log"
  def TRIGGER_SUFFIX = "_trigger"
  def FUNC_SUFFIX = "_func"

  def createDefaultEAVLog(script : PrintWriter) : Unit = {
    script.println(
      s"""|CREATE TABLE $EAV_LOG (
          |  TS          TIMESTAMP   NOT NULL,
          |  OPERATION   CHAR(1)     NOT NULL CHECK (OPERATION IN ('I','U','D')),
          |  TABLE_NAME  VARCHAR(64) NOT NULL,
          |  COLUMN_NAME VARCHAR(64) NOT NULL,
          |  COL_VALUE   TEXT);
          |
          |  """.stripMargin)
  }

  def eavTriggersFunctionScript(connection: Connection, script: PrintWriter) : Unit = {
    for(tableName <- tables(connection).filter(p => p != EAV_LOG)) {
      val columnDefinitions : List[ColumnDefinition] = getColumnNames(connection, tableName)
      eavFunctionScript(script, tableName, columnDefinitions)
    }
  }

  def eavFunctionScript(script : PrintWriter, tableName : String, columnDefinitions : List[ColumnDefinition]) : Unit = {

    script.print(
      s"""CREATE OR REPLACE FUNCTION $tableName$FUNC_SUFFIX() RETURNS TRIGGER AS $$$$
         |BEGIN
         |  IF TG_OP = 'INSERT' THEN
         |""".stripMargin)

    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF NEW.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'I', '$tableName', '${columnDefinition.columnName}', NEW.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
    }

    script.printf("  ELSIF TG_OP = 'UPDATE' THEN\n")
    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF NEW.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'U', '$tableName', '${columnDefinition.columnName}', NEW.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
    }

    script.printf("  ELSIF TG_OP = 'DELETE' THEN\n")
    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF NEW.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'D', '$tableName', '${columnDefinition.columnName}', NEW.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
    }

    script.print(
     s"""|  END IF;
         |RETURN NEW;
         |END;
         |$$$$
         |LANGUAGE plpgsql;
         |
         |""".stripMargin)

    script.print(
      s"""
         |CREATE TRIGGER $tableName$TRIGGER_SUFFIX AFTER UPDATE OR INSERT OR DELETE
         |   ON $tableName
         |   FOR EACH ROW EXECUTE PROCEDURE $tableName$FUNC_SUFFIX();
         |
         |""".stripMargin)
  }


  def process(connection: Connection, script : PrintWriter) : Boolean = {
    createDefaultEAVLog(script)
    eavTriggersFunctionScript(connection, script)
    true
  }

  def main(arg : Array[String]) : Unit = {
    val props: mutable.Map[String, String] = toScalaMutableMap(tryToLoadSensitiveProperties())
    val host = props("host")
    val port = props("port")
    val database = props("database")
    val connection = createConnection(s"jdbc:postgresql://$host:$port/$database", props("user"), props("password"))
    val script = new PrintWriter(new FileOutputStream("out/pg-watchdog-tables-eav.sql"))
    process(connection, script)
    script.close()
  }

}



Код: javascript
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
package mayton.watchdog.pg

import java.io.{File, FileInputStream}
import java.sql.{Connection, DriverManager}
import java.util.Properties

import mayton.watchdog.pg.PgWatchdogTables.TABLE_PREFIX

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

object Utils {

  def getColumnNames(connection: Connection, tableName: String) : List[ColumnDefinition] = {
    var listBuffer = ListBuffer[ColumnDefinition]()
    val statement = connection.createStatement()
    val resultSet = statement.executeQuery(
      s"""SELECT
         |  ordinal_position,
         |  column_name,
         |  data_type,
         |  character_maximum_length,
         |  is_nullable
         |FROM information_schema.columns WHERE
         |  table_schema   = current_schema()
         |  AND table_name = '$tableName'
         |  ORDER BY ordinal_position
         |
         |  """.stripMargin
    )
    while(resultSet.next()){
      listBuffer += ColumnDefinition(
        resultSet.getString("column_name"),
        resultSet.getString("data_type"),
        resultSet.getInt("character_maximum_length"),
        resultSet.getBoolean("is_nullable")
      )
    }
    resultSet.close()
    statement.close()
    listBuffer.toList
  }

  def tables(connection: Connection) : List[String] = {
    var listBuffer = ListBuffer[String]()
    val statement = connection.createStatement()
    val resultSet = statement.executeQuery(
      "SELECT table_name FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'pg_catalog') AND table_type = 'BASE TABLE'")

    while(resultSet.next()) {
      listBuffer += resultSet.getString("table_name")
    }
    resultSet.close()
    statement.close()
    listBuffer.toList
  }

  def createConnection(url : String, user : String, password : String) : Connection = {
    val props = new Properties
    props.setProperty("user", user)
    props.setProperty("password", password)
    DriverManager.getConnection(url, props)
  }

  def tryToLoadSensitiveProperties() : Properties = {
    val props = new Properties()
    if (new File("sensitive.properties").exists()) {
      props.load(new FileInputStream("sensitive.properties"))
    } else {
      props.put("host",     "localhost")
      props.put("port",     "5432")
      props.put("database", "postgres")
      props.put("user",     "postgres")
      props.put("password", "postgres123")
    }
    props
  }

  def toScalaMutableMap(props : Properties) : mutable.Map[String, String] = {
    var map : mutable.Map[String, String] = new mutable.HashMap[String,String]()
    import scala.collection.JavaConverters._
    props.entrySet().asScala.foreach {
      entry => map += ((entry.getKey.asInstanceOf[String], entry.getValue.asInstanceOf[String]))
    }
    map
  }
}

...
Рейтинг: 0 / 0
01.02.2020, 11:26
    #39921279
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Всё. Больше никаких тузов в рукаве у меня нет. Надо садиться и писать чортов код.
...
Рейтинг: 0 / 0
01.02.2020, 13:42
    #39921306
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Думаю над docker-compose. Исключительно в целях тестирования.
...
Рейтинг: 0 / 0
02.02.2020, 00:31
    #39921393
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Docker-compose. Пока вот так вот. Еще не тестил.

Возможно не хватает некоторых env-переменных для ApacheMQ. Логины-пароли там. Админка и прочее.
Вобщем добавте кому не лень.

Здесь /bigdata - это просто мой локальный путь где диск толстый и есть место для флуда. Замените его соотв
на ваш путь как будет удобно.

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
version: '3.7'

services:
  db1:
    build: .
    ports:
      - "5432:5433"
    volumes:
      - /bigdata/cdc-jms/db1:/var/lib/postgresql/data
    environment:
      FLASK_ENV: development
    image: "postgres:12.1"

  db2:
    build: .
    ports:
      - "5432:5434"
    volumes:
      - /bigdata/cdc-jms/db2:/var/lib/postgresql/data
    environment:
      FLASK_ENV: development
    image: "postgres:12.1"

# http://localhost:8161/admin
  activemq:
    build: .
    ports:
      - "8161:8162"
      - "61616:61617"
      - "61613:61614"
    volumes:
      - /bigdata/cdc-jms/activemq/data:/data
      - /bigdata/cdc-jms/activemq/log:/var/log/activemq
    environment:
      FLASK_ENV: development
    image: "webcenter/activemq"
...
Рейтинг: 0 / 0
02.02.2020, 00:38
    #39921395
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Дополнение №1 к базовому заданию.

Для EAV-модели. Что делать с транзакциями на удаление строк в том случае если в таблице нет PK ?
Скорее всего - ничего.

Придется констатировать что мы не можем удалять такие строки на Slave системе.

Итого: игнорируем delete/update для таблиц без PK.
...
Рейтинг: 0 / 0
02.02.2020, 01:36
    #39921403
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Вобщем в докер-композиции я всё напутал. Порты не те.

Вот так работает. Можно уже заходить на админский порт MQ и смотреть чо как.
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
version: '3.5'

services:
  db1:
    image: "postgres:12.1"
    ports:
      - "15432:5432"
    volumes:
      - /bigdata/cdc-jms/db1:/var/lib/postgresql/data
    environment:
      FLASK_ENV: development

  db2:
    image: "postgres:12.1"
    ports:
      - "15433:5432"
    volumes:
      - /bigdata/cdc-jms/db2:/var/lib/postgresql/data
    environment:
      FLASK_ENV: development

# http://localhost:8161/admin
  activemq:
    image: "webcenter/activemq"
    ports:
      - "8162:8161"
      - "61617:61616"
      - "61614:61613"
    volumes:
      - /bigdata/cdc-jms/activemq/data:/data
      - /bigdata/cdc-jms/activemq/log:/var/log/activemq
    environment:
      ACTIVEMQ_ADMIN_LOGIN : admin
      ACTIVEMQ_ADMIN_PASSWORD : admin123


Еще надо придумать как автоматизировать создание и наполнение БД.

Здесь я пока - пас. Надеюсь что у мемберов есть под рукой какая-то учебная базячка.
...
Рейтинг: 0 / 0
02.02.2020, 11:42
    #39921434
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Дополнение к базовому заданию №2

Как передавать информацию о первичных ключах? Хотелось бы чтобы логика Slave(consumer)
была максимально примитивной. Consumer должен просто принимать JMS/Json сообщения из канала
и просто их трансформировать в целевой DML для целевой DBMS.

(кстати из этого вытекает бонус. Тип DBMS может быть разный)


При этом для формирования предложения WHERE он не должен заглядывать ни в какие справочники
таблиц и ключей. Фактически JMS сообщение должно уже содержать эту информацию.

Consumer - простой и деревянный.
Producer - умный.
...
Рейтинг: 0 / 0
02.02.2020, 21:43
    #39921533
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Неутешительные итоги. Насчет POC.
Есть некий каркас вида двух консольных приложений.
Master.java / Slave.java которые просто стартуют
и умеют распознавать входные аргументы.

И есть парочка сущностей типа EavEntity и JMSEntity.
Первая нужна просто для маппинга таблички

Код: java
1.
2.
3.
4.
5.
6.
7.
CREATE TABLE EAV_LOG (
          TS          TIMESTAMP   NOT NULL,
          OPERATION   CHAR(1)     NOT NULL CHECK (OPERATION IN ('I','U','D')),
          TABLE_NAME  VARCHAR(64) NOT NULL,
          COLUMN_NAME VARCHAR(64) NOT NULL,
          COL_VALUE   TEXT
);


И вторая будет по сути месседжем. Ее еще надо будет доработать для признаков первичных ключей.

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
@Immutable
public final class JmsEntity {

    public final String ts;
    public final String operation;
    public final String tableName;
    public final List<Pair<String, String>> fields;

    public JmsEntity(@NotNull String ts, @NotNull String operation, @NotNull String tableName, @NotNull List<Pair<String, String>> fields) {
        this.ts = ts;
        this.operation = operation;
        this.tableName = tableName;
        this.fields = fields;
    }
...



В каркас встроенные фрагменты учебных туториалов из набора ApacheMQ (для быстрого старта).
В принципе они должны поднятся в интеграции с env.

+Создана конфигурация docker-compose которая поднимает environment из двух Постгресов и 1 Apache Active MQ
последних версий.

Постгресы я брал только лишь для того чтобы привязаться к какой-никакой реляционной внешней системе.
Было пофиг что брать и я как и обещал буду строить независимую от dbms систему. В крайнем случае
придется написать сет бриджей или адаптеров диалектов SQL так же как это делают Hibernate/Batis.
Я просто всячески оттягиваю этот момент чтоб он не мешал на старте.

+По мелочи. Добавлены логгеры и их конфигурации. SQL скрипик и докер-скриптик.
...
Рейтинг: 0 / 0
03.02.2020, 09:10
    #39921624
PetroNotC Sharp
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
mayton,
Тормозить будет.
При update таблички на сотню тысяч по where date > ХХХ
будет событий 50000. И все встанет.
...
Рейтинг: 0 / 0
03.02.2020, 09:40
    #39921633
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Тяпничный CDC/JMS
Нас это остановит?

Такие события - нечастые в системе. Мы будем рассчитывать на классический OLTP а котором подобных апдейтов по дизайну нет.

Опишу это как ещё одно дополнение к ТЗ.
...
Рейтинг: 0 / 0
Форумы / Java [игнор отключен] [закрыт для гостей] / Тяпничный CDC/JMS / 25 сообщений из 49, страница 1 из 2
Целевая тема:
Создать новую тему:
Автор:
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]