powered by simpleCommunicator - 2.0.30     © 2024 Programmizd 02
Map
Форумы / Java [игнор отключен] [закрыт для гостей] / Тяпничный CDC/JMS
49 сообщений из 49, показаны все 2 страниц
Тяпничный CDC/JMS
    #39921165
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Привет котаны-братаны.

По мотивам Мониторинг изменений в базах данных

Буду краток.

Две базы. Два приложения. И 1 месседж брокер.

Запускаем мастера и подчиеннного.

Код: java
1.
$ java CDCMaster --jdbc-url "jdbc:postgresql://host1:5432/base1" --broker "tcp://broker1:61616"


Код: java
1.
$ java CDCSlave --jdbc-url "jdbc:postgresql://host2:5432/base1" --broker "tcp://broker1:61616"


Вобщем надо как-то так исхитриться чтоб изменения мастера попали в Slave.

Топология сети - любая.
База - любая (postgres как пример)
Брокер - любой.
Требований по SLA нету. Как сделаете так и будет.
Изменения - только DML (Insert/Update/Delete)


Go-go кодить.

P.S. Ну триггерочки и служебные таблички там уж сами посоздавайте если надо.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921176
Leonid Kudryavtsev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton

Go-go кодить.

нафига?
https://google.gik-team.com/?q=postgresql master slave replication

Ты же хорошо представляешь, сколько строк кода и сколько такая система реально стоит.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921186
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
не вникал
http://ha-jdbc.org/doc.html
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921187
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Leonid Kudryavtsev, сколько?
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921198
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Понты делов. К воскресенье будет POC.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921206
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
Понты делов. К воскресенье будет POC.

Прям со сплит брейном и поддержкой оффлайн? И чтоб ни единого разрыва!
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921208
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Вы уже процедуру main написали? А?
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921209
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Leonid Kudryavtsev
mayton

Go-go кодить.

нафига?
https://google.gik-team.com/?q=postgresql master slave replication

Ты же хорошо представляешь, сколько строк кода и сколько такая система реально стоит.

Абстрактный JDBC источник.

Я-же писал в задании.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921210
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky
не вникал
http://ha-jdbc.org/doc.html

Если это тоже что и XA - то это тормозной кусок говна. Синхронная фиксация в две системы. Тормоз еще тот.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921214
iOracleDev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
База - любая (postgres как пример)
...
P.S. Ну триггерочки и служебные таблички там уж сами посоздавайте если надо.

Бессмысленное занятие, каждая уважающая себя СУБД уже имеет механизмы репликации
работающие на уровне ядра, триггерочками получится тормозной кусок говна))

Хотя если данных не много и репликация почтовая, то может и пригодится поделье,
но даже в этом случае нативная для СУБД часть будет составлять основной объем кода,
на промежуточном уровне будет только забор готовых upsert/delete и применение их
на другой инстанс в нужном порядке. Основная проблема будет в том, что триггеры на операции
срабатывают непосредственно при проведении операции, а фиксация происходит позже и триггера
уровня строки на фиксацию нет.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921216
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
iOracleDev,

Ты серьёзно так напрягся? Чувак. Это пятничный топик. Это брейншторм. Это челлендж.

Здесь не ищут коробочных решений!!! Go-go!!
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921218
iOracleDev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton,

Смотрю с интересом на то что получится, особенно на часть касающуюся СУБД))
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921235
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Не стоит забывать о проблемах связанных с текущим временем. Если запрос содержит insert currentSystemTime или random - в общем нечто недетерминированное, то просто прогнать инсерт на реплике не получится
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921247
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
Если это тоже что и XA - то это тормозной кусок говна. Синхронная фиксация в две системы. Тормоз еще тот.
Подписался:)
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921273
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник
Не стоит забывать о проблемах связанных с текущим временем. Если запрос содержит insert currentSystemTime или random - в общем нечто недетерминированное, то просто прогнать инсерт на реплике не получится

А что будет видеть триггер?
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921277
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
В качестве первого закидона. Этот код я писал еще летом. И это еще не CDC.

Это подготовка схемы БД к созданию служебных таблиц и триггеров.
Код - требует улучшений. В частности - отвязки от Постгреса и привязки к генерализованной
СУБД.

Есть еще второй вариант такого-же CDC где EAV табличка заменяется на множество типизированных.

Код: javascript
1.
2.
3.
4.
5.
6.
package mayton.watchdog.pg

case class ColumnDefinition( val columnName : String,
                             val dataType   : String,
                             val characterMaximumLength : Int,
                             val isNullable : Boolean)



Код: javascript
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
package mayton.watchdog.pg

import java.io.{FileOutputStream, PrintWriter}
import java.sql.Connection

import mayton.watchdog.pg.Utils._

import scala.collection.mutable

object PgWatchdogEAV {

  def EAV_LOG = "eav_log"
  def TRIGGER_SUFFIX = "_trigger"
  def FUNC_SUFFIX = "_func"

  def createDefaultEAVLog(script : PrintWriter) : Unit = {
    script.println(
      s"""|CREATE TABLE $EAV_LOG (
          |  TS          TIMESTAMP   NOT NULL,
          |  OPERATION   CHAR(1)     NOT NULL CHECK (OPERATION IN ('I','U','D')),
          |  TABLE_NAME  VARCHAR(64) NOT NULL,
          |  COLUMN_NAME VARCHAR(64) NOT NULL,
          |  COL_VALUE   TEXT);
          |
          |  """.stripMargin)
  }

  def eavTriggersFunctionScript(connection: Connection, script: PrintWriter) : Unit = {
    for(tableName <- tables(connection).filter(p => p != EAV_LOG)) {
      val columnDefinitions : List[ColumnDefinition] = getColumnNames(connection, tableName)
      eavFunctionScript(script, tableName, columnDefinitions)
    }
  }

  def eavFunctionScript(script : PrintWriter, tableName : String, columnDefinitions : List[ColumnDefinition]) : Unit = {

    script.print(
      s"""CREATE OR REPLACE FUNCTION $tableName$FUNC_SUFFIX() RETURNS TRIGGER AS $$$$
         |BEGIN
         |  IF TG_OP = 'INSERT' THEN
         |""".stripMargin)

    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF NEW.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'I', '$tableName', '${columnDefinition.columnName}', NEW.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
    }

    script.printf("  ELSIF TG_OP = 'UPDATE' THEN\n")
    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF NEW.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'U', '$tableName', '${columnDefinition.columnName}', NEW.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
    }

    script.printf("  ELSIF TG_OP = 'DELETE' THEN\n")
    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF NEW.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'D', '$tableName', '${columnDefinition.columnName}', NEW.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
    }

    script.print(
     s"""|  END IF;
         |RETURN NEW;
         |END;
         |$$$$
         |LANGUAGE plpgsql;
         |
         |""".stripMargin)

    script.print(
      s"""
         |CREATE TRIGGER $tableName$TRIGGER_SUFFIX AFTER UPDATE OR INSERT OR DELETE
         |   ON $tableName
         |   FOR EACH ROW EXECUTE PROCEDURE $tableName$FUNC_SUFFIX();
         |
         |""".stripMargin)
  }


  def process(connection: Connection, script : PrintWriter) : Boolean = {
    createDefaultEAVLog(script)
    eavTriggersFunctionScript(connection, script)
    true
  }

  def main(arg : Array[String]) : Unit = {
    val props: mutable.Map[String, String] = toScalaMutableMap(tryToLoadSensitiveProperties())
    val host = props("host")
    val port = props("port")
    val database = props("database")
    val connection = createConnection(s"jdbc:postgresql://$host:$port/$database", props("user"), props("password"))
    val script = new PrintWriter(new FileOutputStream("out/pg-watchdog-tables-eav.sql"))
    process(connection, script)
    script.close()
  }

}



Код: javascript
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
package mayton.watchdog.pg

import java.io.{File, FileInputStream}
import java.sql.{Connection, DriverManager}
import java.util.Properties

import mayton.watchdog.pg.PgWatchdogTables.TABLE_PREFIX

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

object Utils {

  def getColumnNames(connection: Connection, tableName: String) : List[ColumnDefinition] = {
    var listBuffer = ListBuffer[ColumnDefinition]()
    val statement = connection.createStatement()
    val resultSet = statement.executeQuery(
      s"""SELECT
         |  ordinal_position,
         |  column_name,
         |  data_type,
         |  character_maximum_length,
         |  is_nullable
         |FROM information_schema.columns WHERE
         |  table_schema   = current_schema()
         |  AND table_name = '$tableName'
         |  ORDER BY ordinal_position
         |
         |  """.stripMargin
    )
    while(resultSet.next()){
      listBuffer += ColumnDefinition(
        resultSet.getString("column_name"),
        resultSet.getString("data_type"),
        resultSet.getInt("character_maximum_length"),
        resultSet.getBoolean("is_nullable")
      )
    }
    resultSet.close()
    statement.close()
    listBuffer.toList
  }

  def tables(connection: Connection) : List[String] = {
    var listBuffer = ListBuffer[String]()
    val statement = connection.createStatement()
    val resultSet = statement.executeQuery(
      "SELECT table_name FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'pg_catalog') AND table_type = 'BASE TABLE'")

    while(resultSet.next()) {
      listBuffer += resultSet.getString("table_name")
    }
    resultSet.close()
    statement.close()
    listBuffer.toList
  }

  def createConnection(url : String, user : String, password : String) : Connection = {
    val props = new Properties
    props.setProperty("user", user)
    props.setProperty("password", password)
    DriverManager.getConnection(url, props)
  }

  def tryToLoadSensitiveProperties() : Properties = {
    val props = new Properties()
    if (new File("sensitive.properties").exists()) {
      props.load(new FileInputStream("sensitive.properties"))
    } else {
      props.put("host",     "localhost")
      props.put("port",     "5432")
      props.put("database", "postgres")
      props.put("user",     "postgres")
      props.put("password", "postgres123")
    }
    props
  }

  def toScalaMutableMap(props : Properties) : mutable.Map[String, String] = {
    var map : mutable.Map[String, String] = new mutable.HashMap[String,String]()
    import scala.collection.JavaConverters._
    props.entrySet().asScala.foreach {
      entry => map += ((entry.getKey.asInstanceOf[String], entry.getValue.asInstanceOf[String]))
    }
    map
  }
}

...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921279
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Всё. Больше никаких тузов в рукаве у меня нет. Надо садиться и писать чортов код.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921306
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Думаю над docker-compose. Исключительно в целях тестирования.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921393
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Docker-compose. Пока вот так вот. Еще не тестил.

Возможно не хватает некоторых env-переменных для ApacheMQ. Логины-пароли там. Админка и прочее.
Вобщем добавте кому не лень.

Здесь /bigdata - это просто мой локальный путь где диск толстый и есть место для флуда. Замените его соотв
на ваш путь как будет удобно.

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
version: '3.7'

services:
  db1:
    build: .
    ports:
      - "5432:5433"
    volumes:
      - /bigdata/cdc-jms/db1:/var/lib/postgresql/data
    environment:
      FLASK_ENV: development
    image: "postgres:12.1"

  db2:
    build: .
    ports:
      - "5432:5434"
    volumes:
      - /bigdata/cdc-jms/db2:/var/lib/postgresql/data
    environment:
      FLASK_ENV: development
    image: "postgres:12.1"

# http://localhost:8161/admin
  activemq:
    build: .
    ports:
      - "8161:8162"
      - "61616:61617"
      - "61613:61614"
    volumes:
      - /bigdata/cdc-jms/activemq/data:/data
      - /bigdata/cdc-jms/activemq/log:/var/log/activemq
    environment:
      FLASK_ENV: development
    image: "webcenter/activemq"
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921395
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Дополнение №1 к базовому заданию.

Для EAV-модели. Что делать с транзакциями на удаление строк в том случае если в таблице нет PK ?
Скорее всего - ничего.

Придется констатировать что мы не можем удалять такие строки на Slave системе.

Итого: игнорируем delete/update для таблиц без PK.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921403
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Вобщем в докер-композиции я всё напутал. Порты не те.

Вот так работает. Можно уже заходить на админский порт MQ и смотреть чо как.
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
version: '3.5'

services:
  db1:
    image: "postgres:12.1"
    ports:
      - "15432:5432"
    volumes:
      - /bigdata/cdc-jms/db1:/var/lib/postgresql/data
    environment:
      FLASK_ENV: development

  db2:
    image: "postgres:12.1"
    ports:
      - "15433:5432"
    volumes:
      - /bigdata/cdc-jms/db2:/var/lib/postgresql/data
    environment:
      FLASK_ENV: development

# http://localhost:8161/admin
  activemq:
    image: "webcenter/activemq"
    ports:
      - "8162:8161"
      - "61617:61616"
      - "61614:61613"
    volumes:
      - /bigdata/cdc-jms/activemq/data:/data
      - /bigdata/cdc-jms/activemq/log:/var/log/activemq
    environment:
      ACTIVEMQ_ADMIN_LOGIN : admin
      ACTIVEMQ_ADMIN_PASSWORD : admin123


Еще надо придумать как автоматизировать создание и наполнение БД.

Здесь я пока - пас. Надеюсь что у мемберов есть под рукой какая-то учебная базячка.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921434
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Дополнение к базовому заданию №2

Как передавать информацию о первичных ключах? Хотелось бы чтобы логика Slave(consumer)
была максимально примитивной. Consumer должен просто принимать JMS/Json сообщения из канала
и просто их трансформировать в целевой DML для целевой DBMS.

(кстати из этого вытекает бонус. Тип DBMS может быть разный)


При этом для формирования предложения WHERE он не должен заглядывать ни в какие справочники
таблиц и ключей. Фактически JMS сообщение должно уже содержать эту информацию.

Consumer - простой и деревянный.
Producer - умный.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921533
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Неутешительные итоги. Насчет POC.
Есть некий каркас вида двух консольных приложений.
Master.java / Slave.java которые просто стартуют
и умеют распознавать входные аргументы.

И есть парочка сущностей типа EavEntity и JMSEntity.
Первая нужна просто для маппинга таблички

Код: java
1.
2.
3.
4.
5.
6.
7.
CREATE TABLE EAV_LOG (
          TS          TIMESTAMP   NOT NULL,
          OPERATION   CHAR(1)     NOT NULL CHECK (OPERATION IN ('I','U','D')),
          TABLE_NAME  VARCHAR(64) NOT NULL,
          COLUMN_NAME VARCHAR(64) NOT NULL,
          COL_VALUE   TEXT
);


И вторая будет по сути месседжем. Ее еще надо будет доработать для признаков первичных ключей.

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
@Immutable
public final class JmsEntity {

    public final String ts;
    public final String operation;
    public final String tableName;
    public final List<Pair<String, String>> fields;

    public JmsEntity(@NotNull String ts, @NotNull String operation, @NotNull String tableName, @NotNull List<Pair<String, String>> fields) {
        this.ts = ts;
        this.operation = operation;
        this.tableName = tableName;
        this.fields = fields;
    }
...



В каркас встроенные фрагменты учебных туториалов из набора ApacheMQ (для быстрого старта).
В принципе они должны поднятся в интеграции с env.

+Создана конфигурация docker-compose которая поднимает environment из двух Постгресов и 1 Apache Active MQ
последних версий.

Постгресы я брал только лишь для того чтобы привязаться к какой-никакой реляционной внешней системе.
Было пофиг что брать и я как и обещал буду строить независимую от dbms систему. В крайнем случае
придется написать сет бриджей или адаптеров диалектов SQL так же как это делают Hibernate/Batis.
Я просто всячески оттягиваю этот момент чтоб он не мешал на старте.

+По мелочи. Добавлены логгеры и их конфигурации. SQL скрипик и докер-скриптик.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921624
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton,
Тормозить будет.
При update таблички на сотню тысяч по where date > ХХХ
будет событий 50000. И все встанет.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921633
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Нас это остановит?

Такие события - нечастые в системе. Мы будем рассчитывать на классический OLTP а котором подобных апдейтов по дизайну нет.

Опишу это как ещё одно дополнение к ТЗ.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921634
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton, при удалении будет ошибка.

см.
TG_OP = 'DELETE' ... NEW. ... RETURN NEW ...
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921657
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky,

О. Поподробнее. В скриптогенераторе ошибка?
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921663
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton, да.
должно быть типа такого

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
package mayton.watchdog.pg

import java.io.{FileOutputStream, PrintWriter}
import java.sql.Connection

import mayton.watchdog.pg.Utils._

import scala.collection.mutable

object PgWatchdogEAV {

  def EAV_LOG = "eav_log"
  def TRIGGER_SUFFIX = "_trigger"
  def FUNC_SUFFIX = "_func"

  def createDefaultEAVLog(script : PrintWriter) : Unit = {
    script.println(
      s"""|CREATE TABLE $EAV_LOG (
          |  TS          TIMESTAMP   NOT NULL,
          |  OPERATION   CHAR(1)     NOT NULL CHECK (OPERATION IN ('I','U','D')),
          |  TABLE_NAME  VARCHAR(64) NOT NULL,
          |  COLUMN_NAME VARCHAR(64) NOT NULL,
          |  COL_VALUE   TEXT);
          |
          |  """.stripMargin)
  }

  def eavTriggersFunctionScript(connection: Connection, script: PrintWriter) : Unit = {
    for(tableName <- tables(connection).filter(p => p != EAV_LOG)) {
      val columnDefinitions : List[ColumnDefinition] = getColumnNames(connection, tableName)
      eavFunctionScript(script, tableName, columnDefinitions)
    }
  }

  def eavFunctionScript(script : PrintWriter, tableName : String, columnDefinitions : List[ColumnDefinition]) : Unit = {

    script.print(
      s"""CREATE OR REPLACE FUNCTION $tableName$FUNC_SUFFIX() RETURNS TRIGGER AS $$$$
         |BEGIN
         |  IF TG_OP = 'INSERT' THEN
         |""".stripMargin)

    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF NEW.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'I', '$tableName', '${columnDefinition.columnName}', NEW.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
      script.print(s"    RETURN NEW;\n")
    }

    script.printf("  ELSIF TG_OP = 'UPDATE' THEN\n")
    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF NEW.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'U', '$tableName', '${columnDefinition.columnName}', NEW.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
      script.print(s"    RETURN NEW;\n")
    }

    script.printf("  ELSIF TG_OP = 'DELETE' THEN\n")
    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF OLD.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'D', '$tableName', '${columnDefinition.columnName}', OLD.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
      script.print(s"    RETURN OLD;\n")
    }

    script.print(
     s"""|  END IF;
         |--RETURN NEW;
         |END;
         |$$$$
         |LANGUAGE plpgsql;
         |
         |""".stripMargin)

    script.print(
      s"""
         |CREATE TRIGGER $tableName$TRIGGER_SUFFIX AFTER UPDATE OR INSERT OR DELETE
         |   ON $tableName
         |   FOR EACH ROW EXECUTE PROCEDURE $tableName$FUNC_SUFFIX();
         |
         |""".stripMargin)
  }


  def process(connection: Connection, script : PrintWriter) : Boolean = {
    createDefaultEAVLog(script)
    eavTriggersFunctionScript(connection, script)
    true
  }

  def main(arg : Array[String]) : Unit = {
    val props: mutable.Map[String, String] = toScalaMutableMap(tryToLoadSensitiveProperties())
    val host = props("host")
    val port = props("port")
    val database = props("database")
    val connection = createConnection(s"jdbc:postgresql://$host:$port/$database", props("user"), props("password"))
    val script = new PrintWriter(new FileOutputStream("out/pg-watchdog-tables-eav.sql"))
    process(connection, script)
    script.close()
  }

}

...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921665
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky,

О спасибо. Проверю. Пофикшу.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921681
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
Такие события - нечастые в системе. Мы будем рассчитывать на классический OLTP а котором подобных апдейтов по дизайну нет.
ну, например, юз кейс - Рассчет зарплаты сотрудников.
Будет куча ненужных событий. Сначала в журнале, потом в триггерах.
...
Да. Ограничения Модели надо обязательно в ТЗ.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921724
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
PetroNotC Sharp
mayton
Такие события - нечастые в системе. Мы будем рассчитывать на классический OLTP а котором подобных апдейтов по дизайну нет.
ну, например, юз кейс - Рассчет зарплаты сотрудников.
Будет куча ненужных событий. Сначала в журнале, потом в триггерах.
...
Да. Ограничения Модели надо обязательно в ТЗ.

Замечание справедливое. Но скажи. В состоянии ли будем мы поддержать ВСЕ продуктовые шаблоны использования
бд. Такие как HR/CRM/ERP ?

Я считаю что нет. Поэтому давай пока ограничимся тем что у нас есть две неизвестных гетеоренных БД (master/slave).
Магазин. И копия магазина. В них есть какая-то нагрузка. Но она невелика. Тоесть массовых апдейтов пока не предполагается.

По поводу того что изменения 500 000 строк завалят нашу систему. Это хороший бенчмарк (особенно для MQ)
системы и мы обязательно этот кейс смоделируем.

Но это не будет главным поинтом или главным требованием.
Чуть позже я опишу еще одно дополнение в виде рекомендации.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921727
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton,
Да. Согласен.
Просто в бд все продумано. И есть флаг в триггере - реагировать на каждую строку оператора update или Один раз на один такой оператор.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921740
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Ты имеешь в виду уровень строки/оператора?

Код: plsql
1.
2.
3.
4.
5.
6.
7.
CREATE [ CONSTRAINT ] TRIGGER имя { BEFORE | AFTER | INSTEAD OF } { событие [ OR ... ] }
    ON имя_таблицы
    [ FROM ссылающаяся_таблица ]
    [ NOT DEFERRABLE | [ DEFERRABLE ] [ INITIALLY IMMEDIATE | INITIALLY DEFERRED ] ]
    [ FOR [ EACH ] { ROW | STATEMENT } ]
    [ WHEN ( условие ) ]
    EXECUTE PROCEDURE имя_функции ( аргументы )



Да надо над этим подумать.

Мне изначально хотелось создать систему в которой целевая БД (Slave) могла очень сильно отличаться.
По сути она может быть даже NoSQL.

Мастер - умный. Slave - глупый. Slave - просто глотает JmsMessages потоком и применяет их не думая
и не делая процессинг.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921808
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton, зачем вообще в данной задаче аудит на триггерах?
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921816
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky, это не аудит.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921822
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky
mayton, зачем вообще в данной задаче аудит на триггерах?
ну хочется ему. Для гимнастики ума.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921838
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Еще альтнативный вариант. Я думал о системе в которой нет триггеров но есть некий timestamp
последнего действия над data-rows.

Предполалается что мы трекаем только INSERT/UPDATE.

Для детектирования-же удалённых строк надо будет делать какое-то соединение между Master/Slave табличками
чтобы понять какие из них исчезли.

Но это - отдельным топиком.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921861
MazoHist
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
Еще альтнативный вариант. Я думал о системе в которой нет триггеров но есть некий timestamp
последнего действия над data-rows.

Предполалается что мы трекаем только INSERT/UPDATE.

Для детектирования-же удалённых строк надо будет делать какое-то соединение между Master/Slave табличками
чтобы понять какие из них исчезли.

Но это - отдельным топиком.

Это репликация снимками - на приемник гоним полную копию источника и сравниваем что менялось (primary/unique key надо бы, да). Использование timestamp без снимков чревато потерянными изменениями (я не встречал БД которая сохраняет именно время фиксации)
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921864
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
MazoHist
mayton
Еще альтнативный вариант. Я думал о системе в которой нет триггеров но есть некий timestamp
последнего действия над data-rows.

Предполалается что мы трекаем только INSERT/UPDATE.

Для детектирования-же удалённых строк надо будет делать какое-то соединение между Master/Slave табличками
чтобы понять какие из них исчезли.

Но это - отдельным топиком.

Это репликация снимками - на приемник гоним полную копию источника и сравниваем что менялось (primary/unique key надо бы, да). Использование timestamp без снимков чревато потерянными изменениями (я не встречал БД которая сохраняет именно время фиксации)

Интересно еще было бы рассмотреть как работает rsync или в частности Merkle-tree.
Если получится - то можно будет выделять группы строк которые изменились и передавать
только их.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921889
iOracleDev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky
mayton, зачем вообще в данной задаче аудит на триггерах?

У этой задачи цель одна))
mayton
Go-go кодить.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921893
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Эхех. Никому сабж неинтересен. Попрошу модера закрыть. Позже подниму топик с другой CDC.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921899
iOracleDev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton,

Ты себе представляешь нагрузку на ровном месте, которую дашь на СУБД? При том что каждая СУБД имеет
свои инструменты, работающие на низком уровне, такой Франкенштейн никому не нужен, разве что поигратья
и насобирать граблей.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921907
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton,
iOracleDev верно сказал.
И потом, представь что на пятом операторе в одной транзакции по БЛ сработал роллбэк отмена.
Такое часто бывает.
База удачно все откатит. А твой триггер уже отработал и послал сообщение.
Его не вернуть будет.
Потом, транзакции могут быть длинные. 20 минут например. Что с этим делать если до коммита это не изменения базы. А коммит может и не быть вообще.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921909
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
СУБД свой мир. И седствами java там не развернуться.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39922019
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
пример тормозного говна)))

postgresql+h2database+ha-jdbc
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
222.
223.
224.
225.
226.
227.
228.
229.
230.
231.
232.
233.
234.
235.
236.
237.
package ha.jdbc_test;

public class HAJDBC_Test {

    private static final java.util.ArrayList<String> URLS = new java.util.ArrayList<String>() {
        {
            add("jdbc:h2:./test_h2;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE;USER=sa;PASSWORD=");
            add("jdbc:postgresql://127.0.0.1/test_pg?user=postgres&password=secret");
        }
    };

    public HAJDBC_Test() {
    }

    public static void main(String[] args) throws Exception {

        long t = System.currentTimeMillis();

        HAJDBC_Test ha = new HAJDBC_Test();

        for (String url : URLS) {
            try (java.sql.Connection c = java.sql.DriverManager.getConnection(url)) {
                ha.initBase(c);
            }
        }

        ha.initCluster();

        ha.loadData(ha.getConnection());

        Thread.sleep(1000);

        System.out.println("-------CLUSTER--------------------------------------");
        ha.checkData(ha.getConnection());
        System.out.println("----------------------------------------------------");
        

        ha.closeConnecton();

        for (String url : URLS) {
            try (java.sql.Connection c = java.sql.DriverManager.getConnection(url)) {
                ha.checkData(c);
            }
        }
        System.out.println("finish ... " + (System.currentTimeMillis() - t) + " ms");

        //Thread.sleep(1000 * 60 * 5);

        System.exit(0);

    }

    public void initBases() throws Exception {

        for (String url : URLS) {
            long t0 = System.currentTimeMillis();
            try (java.sql.Connection c = java.sql.DriverManager.getConnection(url)) {
                c.createStatement().execute("DROP TABLE IF EXISTS tb_test");
                c.createStatement().execute("CREATE TABLE IF NOT EXISTS tb_test ()");

                System.out.println("init " + c.getMetaData().getURL() + " " + (System.currentTimeMillis() - t0));

            } catch (java.sql.SQLException ex) {
                ex.printStackTrace();
                Thread.sleep(1000);
            }

        }
    }

    public void initCluster() throws InterruptedException {
        try {

            java.util.ArrayList<net.sf.hajdbc.sql.DriverDatabase> bases = new java.util.ArrayList<net.sf.hajdbc.sql.DriverDatabase>();

            int i = 0;
            for (String url : URLS) {
                net.sf.hajdbc.sql.DriverDatabase base = new net.sf.hajdbc.sql.DriverDatabase();
                base.setId("base" + i++);
                base.setLocation(url);

                bases.add(base);
            }

            net.sf.hajdbc.sql.DriverDatabaseClusterConfiguration config = new net.sf.hajdbc.sql.DriverDatabaseClusterConfiguration();

            config.setDatabases(bases);

            config.setDialectFactory(new net.sf.hajdbc.dialect.postgresql.PostgreSQLDialectFactory());

            config.setTransactionMode(net.sf.hajdbc.sql.TransactionModeEnum.SERIAL);

            config.setCurrentDateEvaluationEnabled(true);
            config.setCurrentTimeEvaluationEnabled(true);
            config.setCurrentTimestampEvaluationEnabled(true);
            config.setRandEvaluationEnabled(true);
            config.setSequenceDetectionEnabled(true);
            config.setIdentityColumnDetectionEnabled(true);

            config.setDatabaseMetaDataCacheFactory(new net.sf.hajdbc.cache.eager.SharedEagerDatabaseMetaDataCacheFactory());
            config.setBalancerFactory(new net.sf.hajdbc.balancer.simple.SimpleBalancerFactory());
            config.setStateManagerFactory(new net.sf.hajdbc.state.simple.SimpleStateManagerFactory());
            //config.setAutoActivationExpression(new net.sf.hajdbc.util.concurrent.cron.CronExpression("0 0/1 * 1/1 * ? *"));

            config.setDefaultSynchronizationStrategy("full");

            java.util.ArrayList<net.sf.hajdbc.SynchronizationStrategy> listStrategy = new java.util.ArrayList<net.sf.hajdbc.SynchronizationStrategy>() {
                {
                    add(new net.sf.hajdbc.sync.FullSynchronizationStrategy());
                    add(new net.sf.hajdbc.sync.DifferentialSynchronizationStrategy());
                    add(new net.sf.hajdbc.sync.FastDifferentialSynchronizationStrategy());
                    add(new net.sf.hajdbc.sync.PerTableSynchronizationStrategy(new net.sf.hajdbc.sync.FullSynchronizationStrategy()));
                    add(new net.sf.hajdbc.sync.PassiveSynchronizationStrategy());
                }
            };

            config.setSynchronizationStrategyMap(new java.util.TreeMap<>() {
                {
                    for (net.sf.hajdbc.SynchronizationStrategy e : listStrategy) {
                        put(e.getId(), e);
                    }
                }
            });

            net.sf.hajdbc.sql.Driver.setConfigurationFactory("cluster", new net.sf.hajdbc.SimpleDatabaseClusterConfigurationFactory<java.sql.Driver, net.sf.hajdbc.sql.DriverDatabase>(config));

            connection = java.sql.DriverManager.getConnection("jdbc:ha-jdbc:cluster", "sa", "");

        } catch (Exception ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }
    }

    private java.sql.Connection connection;

    private java.sql.Connection getConnection() {
        return connection;
    }

    private void closeConnecton() throws InterruptedException {

        //DatabaseCluster.stop();
        try {
            if (connection != null) {
                connection.close();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }
    }

    public void checkData(java.sql.Connection connection) throws InterruptedException {
        try {
            System.out.println("check data: " + connection.getCatalog());

            try (java.sql.ResultSet rsTables = connection.createStatement().executeQuery(""
                    + "SELECT * "
                    + "  FROM information_schema.tables"
                    + " ORDER BY table_name")) {

                while (rsTables.next()) {
                    String tableName = rsTables.getString("table_name");
                    if (tableName.toLowerCase().startsWith("tb_test")) {
                        System.out.println("\t" + rsTables.getString("table_name"));

                        try (java.sql.ResultSet rs = connection.createStatement().executeQuery(""
                                + "SELECT  * "
                                + "  FROM " + tableName
                                + " ORDER BY 1")) {

                            java.sql.ResultSetMetaData metaData = rs.getMetaData();
                            for (int col = 0; col < metaData.getColumnCount(); col++) {
                                if (col > 0) {
                                    System.out.print(", ");
                                }
                                System.out.printf("%s (%s)", metaData.getColumnName(col + 1), metaData.getColumnTypeName(col + 1));
                            }
                            System.out.println();
                            while (rs.next()) {
                                for (int col = 0; col < metaData.getColumnCount(); col++) {
                                    if (col > 0) {
                                        System.out.print("\t");
                                    }
                                    System.out.printf("%s", rs.getString(metaData.getColumnName(col + 1)));
                                }
                                System.out.println();
                            }
                        }
                    }

                }
            }
            System.out.println("--------------------");
        } catch (java.sql.SQLException ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }

    }

    private void initBase(java.sql.Connection connection) throws InterruptedException {
        try {
            System.out.println("Init base: " + connection.getCatalog());

            connection.createStatement().execute("DROP TABLE IF EXISTS tb_test");
            //connection.createStatement().execute("CREATE SCHEMA IF NOT EXISTS PUBLIC");
            connection.createStatement().execute("CREATE TABLE IF NOT EXISTS tb_test (id serial NOT NULL, CONSTRAINT pk_test PRIMARY KEY (id))");
            connection.createStatement().execute("ALTER TABLE tb_test ADD COLUMN IF NOT EXISTS verstamp timestamp DEFAULT NOW()");
            connection.createStatement().execute("ALTER TABLE tb_test ADD COLUMN IF NOT EXISTS data varchar");

        } catch (java.sql.SQLException ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }

    }

    private void loadData(java.sql.Connection connection) throws Exception {
        try {
            connection.createStatement().execute(""
                    + "INSERT INTO tb_test (data) "
                    + "             VALUES (CAST(NOW() AS varchar))");

            connection.createStatement().execute(""
                    + "INSERT INTO tb_test (data) "
                    + "             VALUES (CAST(RANDOM() AS varchar))");

        } catch (java.sql.SQLException ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }

    }

}

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
Init base: test_h2
Init base: test_pg
-------CLUSTER--------------------------------------
check data: test_h2
	tb_test
id (INTEGER), verstamp (TIMESTAMP), data (VARCHAR)
1	2020-02-03 18:05:27.395123	2020-02-03 18:05:27.386
2	2020-02-03 18:05:27.424016	0.40035240110768167
--------------------
----------------------------------------------------
check data: test_h2
	tb_test
id (INTEGER), verstamp (TIMESTAMP), data (VARCHAR)
1	2020-02-03 18:05:27.395123	2020-02-03 18:05:27.386
2	2020-02-03 18:05:27.424016	0.40035240110768167
--------------------
check data: test_pg
	tb_test
id (serial), verstamp (timestamp), data (varchar)
1	2020-02-03 18:05:27.398833	2020-02-03 18:05:27.386
2	2020-02-03 18:05:27.424845	0.40035240110768167
--------------------

+покоцал иcходники h2 (небыло show search_path )
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
private Prepared parseShow() {
...
        } else if (readIf("SEARCH_PATH")) {
            buff.append("'");
            if (session.getSchemaSearchPath() == null || session.getSchemaSearchPath().length == 0) {
                buff.append(session.getCurrentSchemaName());
            } else {
                for (String sn : session.getSchemaSearchPath()) {
                    buff.append(sn);
                    buff.append(", ");
                }
                buff.setLength(buff.length() - 2);
            }
            buff.append("' AS SEARCH_PATH FROM DUAL");
...

...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39922021
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky, о. Шикарно. Посмотрю вечером.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39922099
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
наткнулся на проводки)))

тест запилю))) (postgresql+h2database+ha-jdbc)

интересно ... откудова такие цифры ... всё-таки 10 лямов ....
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39922103
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
запросы о проводках ....
нда.а.а.а . . . результаты не приближённые ...
HAJDBC_Test
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
222.
223.
224.
225.
226.
227.
228.
229.
230.
231.
232.
233.
234.
235.
236.
237.
238.
239.
240.
241.
242.
243.
244.
245.
246.
247.
248.
249.
250.
251.
252.
253.
254.
255.
256.
257.
258.
259.
260.
261.
262.
263.
264.
265.
266.
267.
268.
269.
270.
271.
272.
273.
274.
275.
276.
277.
278.
279.
280.
281.
282.
283.
284.
285.
286.
287.
288.
289.
290.
291.
292.
293.
294.
295.
296.
297.
298.
299.
300.
301.
302.
303.
304.
305.
306.
307.
308.
309.
310.
311.
312.
313.
314.
315.
316.
317.
318.
319.
package ha.jdbc_test;

public class HAJDBC_Test {

    private static final java.util.ArrayList<String> URLS = new java.util.ArrayList<String>() {
        {
            add("jdbc:postgresql://127.0.0.1/test_pg?user=postgres&password=secret");
            add("jdbc:h2:./test_h2;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE;USER=sa;PASSWORD=");
        }
    };

    public HAJDBC_Test() {
    }

    public static void main(String[] args) throws Exception {

        long t = System.currentTimeMillis();

        HAJDBC_Test ha = new HAJDBC_Test();

        for (String url : URLS) {
            try (java.sql.Connection c = java.sql.DriverManager.getConnection(url)) {
                ha.initBase(c);
            }
        }

        ha.initCluster();
        ha.loadData(ha.getConnection());

        System.out.println("=======CLUSTER=================================================");
        ha.checkData(ha.getConnection());
        ha.testProvodki(ha.getConnection());
        ha.closeConnecton();
        System.out.println("===============================================================");

        for (String url : URLS) {
            try (java.sql.Connection c = java.sql.DriverManager.getConnection(url)) {
                ha.checkData(c);
            }
        }

        System.out.println("old");
        System.out.println("old");
        System.out.println("old");
        System.out.println("new");

        System.out.println("=======SINGLE DATABASE=========================================");
        for (String url : URLS) {
            try (java.sql.Connection c = java.sql.DriverManager.getConnection(url)) {
                ha.testProvodki(c);
            }
        }

        System.out.println("finish ... " + (System.currentTimeMillis() - t) + " ms");

        //Thread.sleep(1000 * 60 * 5);
        System.exit(0);

    }

    public void initBases() throws Exception {

        for (String url : URLS) {
            long t0 = System.currentTimeMillis();
            try (java.sql.Connection c = java.sql.DriverManager.getConnection(url)) {
                c.createStatement().execute("DROP TABLE IF EXISTS tb_test");
                c.createStatement().execute("CREATE TABLE IF NOT EXISTS tb_test ()");

                System.out.println("init " + c.getMetaData().getURL() + " " + (System.currentTimeMillis() - t0));

            } catch (java.sql.SQLException ex) {
                ex.printStackTrace();
                Thread.sleep(1000);
            }

        }
    }

    public void initCluster() throws InterruptedException {
        try {

            java.util.ArrayList<net.sf.hajdbc.sql.DriverDatabase> bases = new java.util.ArrayList<net.sf.hajdbc.sql.DriverDatabase>();

            int i = 0;
            for (String url : URLS) {
                net.sf.hajdbc.sql.DriverDatabase base = new net.sf.hajdbc.sql.DriverDatabase();
                base.setId("base" + i++);
                base.setLocation(url);

                bases.add(base);
            }

            net.sf.hajdbc.sql.DriverDatabaseClusterConfiguration config = new net.sf.hajdbc.sql.DriverDatabaseClusterConfiguration();

            config.setDatabases(bases);

            config.setDialectFactory(new net.sf.hajdbc.dialect.postgresql.PostgreSQLDialectFactory());

            config.setTransactionMode(net.sf.hajdbc.sql.TransactionModeEnum.SERIAL);

            config.setCurrentDateEvaluationEnabled(true);
            config.setCurrentTimeEvaluationEnabled(true);
            config.setCurrentTimestampEvaluationEnabled(true);
            config.setRandEvaluationEnabled(true);
            config.setSequenceDetectionEnabled(true);
            config.setIdentityColumnDetectionEnabled(true);

            config.setDatabaseMetaDataCacheFactory(new net.sf.hajdbc.cache.eager.SharedEagerDatabaseMetaDataCacheFactory());
            config.setBalancerFactory(new net.sf.hajdbc.balancer.simple.SimpleBalancerFactory());
            config.setStateManagerFactory(new net.sf.hajdbc.state.simple.SimpleStateManagerFactory());
            //config.setAutoActivationExpression(new net.sf.hajdbc.util.concurrent.cron.CronExpression("0 0/1 * 1/1 * ? *"));

            config.setDefaultSynchronizationStrategy("full");

            java.util.ArrayList<net.sf.hajdbc.SynchronizationStrategy> listStrategy = new java.util.ArrayList<net.sf.hajdbc.SynchronizationStrategy>() {
                {
                    add(new net.sf.hajdbc.sync.FullSynchronizationStrategy());
                    add(new net.sf.hajdbc.sync.DifferentialSynchronizationStrategy());
                    add(new net.sf.hajdbc.sync.FastDifferentialSynchronizationStrategy());
                    add(new net.sf.hajdbc.sync.PerTableSynchronizationStrategy(new net.sf.hajdbc.sync.FullSynchronizationStrategy()));
                    add(new net.sf.hajdbc.sync.PassiveSynchronizationStrategy());
                }
            };

            config.setSynchronizationStrategyMap(new java.util.TreeMap<>() {
                {
                    for (net.sf.hajdbc.SynchronizationStrategy e : listStrategy) {
                        put(e.getId(), e);
                    }
                }
            });

            net.sf.hajdbc.sql.Driver.setConfigurationFactory("cluster", new net.sf.hajdbc.SimpleDatabaseClusterConfigurationFactory<java.sql.Driver, net.sf.hajdbc.sql.DriverDatabase>(config));

            connection = java.sql.DriverManager.getConnection("jdbc:ha-jdbc:cluster", "sa", "");

        } catch (Exception ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }
    }

    private java.sql.Connection connection;

    private java.sql.Connection getConnection() {
        return connection;
    }

    private void closeConnecton() throws InterruptedException {

        //DatabaseCluster.stop();
        try {
            if (connection != null) {
                connection.close();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }
    }

    public void checkData(java.sql.Connection connection) throws InterruptedException {
        try {
            System.out.println("check data: " + connection.getCatalog());

            try (java.sql.ResultSet rsTables = connection.createStatement().executeQuery(""
                    + "SELECT * "
                    + "  FROM information_schema.tables"
                    + " ORDER BY table_name")) {

                while (rsTables.next()) {
                    String tableName = rsTables.getString("table_name");
                    if (tableName.toLowerCase().startsWith("tb_test")) {
                        System.out.println("\t" + rsTables.getString("table_name"));

                        try (java.sql.ResultSet rs = connection.createStatement().executeQuery(""
                                + "SELECT  * "
                                + "  FROM " + tableName
                                + " ORDER BY 1")) {

                            java.sql.ResultSetMetaData metaData = rs.getMetaData();
                            for (int col = 0; col < metaData.getColumnCount(); col++) {
                                if (col > 0) {
                                    System.out.print(", ");
                                }
                                System.out.printf("%s (%s)", metaData.getColumnName(col + 1), metaData.getColumnTypeName(col + 1));
                            }
                            System.out.println();
                            while (rs.next()) {
                                for (int col = 0; col < metaData.getColumnCount(); col++) {
                                    if (col > 0) {
                                        System.out.print("\t");
                                    }
                                    System.out.printf("%s", rs.getString(metaData.getColumnName(col + 1)));
                                }
                                System.out.println();
                            }
                        }
                    }

                }
            }
            System.out.println("--------------------");
        } catch (java.sql.SQLException ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }

    }

    public void initBase(java.sql.Connection connection) throws InterruptedException {
        try {
            System.out.println("Init base: " + connection.getCatalog());

            connection.createStatement().execute("DROP TABLE IF EXISTS tb_test");
            //connection.createStatement().execute("CREATE SCHEMA IF NOT EXISTS PUBLIC");
            connection.createStatement().execute("CREATE TABLE IF NOT EXISTS tb_test (id serial NOT NULL, CONSTRAINT pk_test PRIMARY KEY (id))");
            connection.createStatement().execute("ALTER TABLE tb_test ADD COLUMN IF NOT EXISTS verstamp timestamp DEFAULT NOW()");
            connection.createStatement().execute("ALTER TABLE tb_test ADD COLUMN IF NOT EXISTS data varchar");

        } catch (java.sql.SQLException ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }

    }

    public void loadData(java.sql.Connection connection) throws Exception {
        try {
            connection.createStatement().execute(""
                    + "INSERT INTO tb_test (data) "
                    + "             VALUES (CAST(NOW() AS varchar))");

            connection.createStatement().execute(""
                    + "INSERT INTO tb_test (data) "
                    + "             VALUES (CAST(RANDOM() AS varchar))");

        } catch (java.sql.SQLException ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }

    }

    public void testProvodki(java.sql.Connection connection) throws Exception {

        long t_startTest = System.currentTimeMillis();

        System.out.println("===============================================================");
        System.out.println("!!!!!!!!!!!!!!!!!testProvodki!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
        System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");

        System.out.println(" start testProvodki: " + connection.getCatalog() + " " + t_startTest);

        /*, tmp_temp*/ /*need fix source h2 multiple drop*/
        connection.createStatement().execute(""
                + "DROP TABLE IF EXISTS tmp_provodki");
        connection.createStatement().execute(""
                + "DROP TABLE IF EXISTS tmp_temp");

        /*unlogged*/ /*need fix source unlogged/temp table*/
        connection.createStatement().execute(""
                + "CREATE TABLE tmp_provodki \n"
                + "   (from_id  smallint,\n"
                + "    to_id    smallint,\n"
                + "    amount   bigint);"
        );

        long t_startInsert = System.currentTimeMillis();

        System.out.println("     start insert tmp_provodki: " + connection.getCatalog() + " " + (t_startInsert));

        //generate_series(1, 1000000, 1) //--need fix hajdbc --#%$#@Q!#~!@#~ map foreach bath memory
        //connection.createStatement().execute("INSERT INTO tmp_provodki (SELECT random()*89  + 1, random()*89 + 1, random()*100 FROM generate_series(1, 1000000, 1))");
        //"nj rfgtw"
        java.sql.PreparedStatement stmt = connection.prepareStatement("INSERT INTO tmp_provodki VALUES (?, ?, ?)");

        for (int i = 0; i < 10_000_000; i++) {
            stmt.setInt(1, (int) (Math.random() * 89));
            stmt.setInt(2, (int) (Math.random() * 89));
            stmt.setInt(3, (int) (Math.random() * 100));
            stmt.addBatch();
            if (i % 10_000 == 0) {
                stmt.executeBatch();
            }
        }

        //java.sql.Statement stmt = connection.createStatement();
        //for (int i = 0; i < 10; i++) {
        //    stmt.addBatch(String.format("INSERT INTO tmp_provodki VALUES (%d, %d, %d)", (int) (Math.random() * 89), (int) (Math.random() * 89), (int) (Math.random() * 100)));
        //}
        stmt.executeBatch();
        System.out.println("     finish insert tmp_provodki: " + connection.getCatalog() + " " + (System.currentTimeMillis() - t_startInsert) + "");

        System.out.println("     __________________________________________________________");

        long t_startGroup = System.currentTimeMillis();
        System.out.println("         start group tmp_provodki: " + connection.getCatalog() + " >>>" + System.currentTimeMillis());
        connection.createStatement().execute(""
                + ""
                + "CREATE TEMP TABLE results AS ( \n"
                + "SELECT q.from_id, pos - neg    \n"
                + "   FROM (SELECT from_id, sum(amount) as neg           \n"
                + "          FROM tmp_provodki          \n"
                + "         GROUP BY from_id) AS q   \n"
                + "   LEFT JOIN (SELECT to_id, sum(amount) as pos\n"
                + "                FROM tmp_provodki\n"
                + "               GROUP BY to_id) AS w ON q.from_id = w.to_id\n"
                + " )"
        );
        System.out.println("         finish group tmp_provodki: " + connection.getCatalog() + " " + (System.currentTimeMillis() - t_startInsert) + "");
        System.out.println("     __________________________________________________________");
        System.out.println("finish testProvodki: " + connection.getCatalog() + " " + (System.currentTimeMillis() - t_startTest) + "");
        System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
        System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
        System.out.println("===============================================================");
    }

}

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
Init base: test_pg
Init base: test_h2
=======CLUSTER=================================================
check data: test_pg
	tb_test
id (serial), verstamp (timestamp), data (varchar)
1	2020-02-04 02:50:49.232158	2020-02-04 02:50:49.214
2	2020-02-04 02:50:49.262206	0.758233148350979
--------------------
===============================================================
!!!!!!!!!!!!!!!!!testProvodki!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 start testProvodki: test_pg 1580777449365
     start insert tmp_provodki: test_pg 1580777449454
     finish insert tmp_provodki: test_pg 489547
     __________________________________________________________
         start group tmp_provodki: test_pg >>>1580777939008
         finish group tmp_provodki: test_pg 853025
     __________________________________________________________
finish testProvodki: test_pg 853115
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
===============================================================
===============================================================
check data: test_pg
	tb_test
id (serial), verstamp (timestamp), data (varchar)
1	2020-02-04 02:50:49.232158	2020-02-04 02:50:49.214
2	2020-02-04 02:50:49.262206	0.758233148350979
--------------------
check data: test_h2
	tb_test
id (INTEGER), verstamp (TIMESTAMP), data (VARCHAR)
1	2020-02-04 02:50:49.254813	2020-02-04 02:50:49.214
2	2020-02-04 02:50:49.270303	0.758233148350979
--------------------
old
old
old
new
=======SINGLE DATABASE=========================================
===============================================================
!!!!!!!!!!!!!!!!!testProvodki!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 start testProvodki: test_pg 1580778303896
     start insert tmp_provodki: test_pg 1580778304007
     finish insert tmp_provodki: test_pg 446809
     __________________________________________________________
         start group tmp_provodki: test_pg >>>1580778750816
         finish group tmp_provodki: test_pg 451518
     __________________________________________________________
finish testProvodki: test_pg 451629
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
===============================================================
===============================================================
!!!!!!!!!!!!!!!!!testProvodki!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 start testProvodki: test_h2 1580778755550
     start insert tmp_provodki: test_h2 1580778755750
     finish insert tmp_provodki: test_h2 47468
     __________________________________________________________
         start group tmp_provodki: test_h2 >>>1580778803218
         finish group tmp_provodki: test_h2 425039
     __________________________________________________________
finish testProvodki: test_h2 425239
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
===============================================================
finish ... 1737669 ms
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39922173
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Не посмотрел. Устал вчера. Ох уж этот ентерпрайз...
...
Рейтинг: 0 / 0
49 сообщений из 49, показаны все 2 страниц
Форумы / Java [игнор отключен] [закрыт для гостей] / Тяпничный CDC/JMS
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]