powered by simpleCommunicator - 2.0.30     © 2024 Programmizd 02
Map
Форумы / Java [игнор отключен] [закрыт для гостей] / Тяпничный CDC/JMS
24 сообщений из 49, страница 2 из 2
Тяпничный CDC/JMS
    #39921634
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton, при удалении будет ошибка.

см.
TG_OP = 'DELETE' ... NEW. ... RETURN NEW ...
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921657
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky,

О. Поподробнее. В скриптогенераторе ошибка?
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921663
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton, да.
должно быть типа такого

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
package mayton.watchdog.pg

import java.io.{FileOutputStream, PrintWriter}
import java.sql.Connection

import mayton.watchdog.pg.Utils._

import scala.collection.mutable

object PgWatchdogEAV {

  def EAV_LOG = "eav_log"
  def TRIGGER_SUFFIX = "_trigger"
  def FUNC_SUFFIX = "_func"

  def createDefaultEAVLog(script : PrintWriter) : Unit = {
    script.println(
      s"""|CREATE TABLE $EAV_LOG (
          |  TS          TIMESTAMP   NOT NULL,
          |  OPERATION   CHAR(1)     NOT NULL CHECK (OPERATION IN ('I','U','D')),
          |  TABLE_NAME  VARCHAR(64) NOT NULL,
          |  COLUMN_NAME VARCHAR(64) NOT NULL,
          |  COL_VALUE   TEXT);
          |
          |  """.stripMargin)
  }

  def eavTriggersFunctionScript(connection: Connection, script: PrintWriter) : Unit = {
    for(tableName <- tables(connection).filter(p => p != EAV_LOG)) {
      val columnDefinitions : List[ColumnDefinition] = getColumnNames(connection, tableName)
      eavFunctionScript(script, tableName, columnDefinitions)
    }
  }

  def eavFunctionScript(script : PrintWriter, tableName : String, columnDefinitions : List[ColumnDefinition]) : Unit = {

    script.print(
      s"""CREATE OR REPLACE FUNCTION $tableName$FUNC_SUFFIX() RETURNS TRIGGER AS $$$$
         |BEGIN
         |  IF TG_OP = 'INSERT' THEN
         |""".stripMargin)

    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF NEW.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'I', '$tableName', '${columnDefinition.columnName}', NEW.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
      script.print(s"    RETURN NEW;\n")
    }

    script.printf("  ELSIF TG_OP = 'UPDATE' THEN\n")
    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF NEW.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'U', '$tableName', '${columnDefinition.columnName}', NEW.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
      script.print(s"    RETURN NEW;\n")
    }

    script.printf("  ELSIF TG_OP = 'DELETE' THEN\n")
    for(columnDefinition <- columnDefinitions) {
      script.print(s"    IF OLD.${columnDefinition.columnName} IS NOT NULL THEN\n")
      script.print(s"      INSERT INTO $EAV_LOG(TS, OPERATION, TABLE_NAME, COLUMN_NAME, COL_VALUE) VALUES(CURRENT_TIMESTAMP, 'D', '$tableName', '${columnDefinition.columnName}', OLD.${columnDefinition.columnName});\n")
      script.print(s"    END IF;\n")
      script.print(s"    RETURN OLD;\n")
    }

    script.print(
     s"""|  END IF;
         |--RETURN NEW;
         |END;
         |$$$$
         |LANGUAGE plpgsql;
         |
         |""".stripMargin)

    script.print(
      s"""
         |CREATE TRIGGER $tableName$TRIGGER_SUFFIX AFTER UPDATE OR INSERT OR DELETE
         |   ON $tableName
         |   FOR EACH ROW EXECUTE PROCEDURE $tableName$FUNC_SUFFIX();
         |
         |""".stripMargin)
  }


  def process(connection: Connection, script : PrintWriter) : Boolean = {
    createDefaultEAVLog(script)
    eavTriggersFunctionScript(connection, script)
    true
  }

  def main(arg : Array[String]) : Unit = {
    val props: mutable.Map[String, String] = toScalaMutableMap(tryToLoadSensitiveProperties())
    val host = props("host")
    val port = props("port")
    val database = props("database")
    val connection = createConnection(s"jdbc:postgresql://$host:$port/$database", props("user"), props("password"))
    val script = new PrintWriter(new FileOutputStream("out/pg-watchdog-tables-eav.sql"))
    process(connection, script)
    script.close()
  }

}

...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921665
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky,

О спасибо. Проверю. Пофикшу.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921681
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
Такие события - нечастые в системе. Мы будем рассчитывать на классический OLTP а котором подобных апдейтов по дизайну нет.
ну, например, юз кейс - Рассчет зарплаты сотрудников.
Будет куча ненужных событий. Сначала в журнале, потом в триггерах.
...
Да. Ограничения Модели надо обязательно в ТЗ.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921724
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
PetroNotC Sharp
mayton
Такие события - нечастые в системе. Мы будем рассчитывать на классический OLTP а котором подобных апдейтов по дизайну нет.
ну, например, юз кейс - Рассчет зарплаты сотрудников.
Будет куча ненужных событий. Сначала в журнале, потом в триггерах.
...
Да. Ограничения Модели надо обязательно в ТЗ.

Замечание справедливое. Но скажи. В состоянии ли будем мы поддержать ВСЕ продуктовые шаблоны использования
бд. Такие как HR/CRM/ERP ?

Я считаю что нет. Поэтому давай пока ограничимся тем что у нас есть две неизвестных гетеоренных БД (master/slave).
Магазин. И копия магазина. В них есть какая-то нагрузка. Но она невелика. Тоесть массовых апдейтов пока не предполагается.

По поводу того что изменения 500 000 строк завалят нашу систему. Это хороший бенчмарк (особенно для MQ)
системы и мы обязательно этот кейс смоделируем.

Но это не будет главным поинтом или главным требованием.
Чуть позже я опишу еще одно дополнение в виде рекомендации.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921727
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton,
Да. Согласен.
Просто в бд все продумано. И есть флаг в триггере - реагировать на каждую строку оператора update или Один раз на один такой оператор.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921740
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Ты имеешь в виду уровень строки/оператора?

Код: plsql
1.
2.
3.
4.
5.
6.
7.
CREATE [ CONSTRAINT ] TRIGGER имя { BEFORE | AFTER | INSTEAD OF } { событие [ OR ... ] }
    ON имя_таблицы
    [ FROM ссылающаяся_таблица ]
    [ NOT DEFERRABLE | [ DEFERRABLE ] [ INITIALLY IMMEDIATE | INITIALLY DEFERRED ] ]
    [ FOR [ EACH ] { ROW | STATEMENT } ]
    [ WHEN ( условие ) ]
    EXECUTE PROCEDURE имя_функции ( аргументы )



Да надо над этим подумать.

Мне изначально хотелось создать систему в которой целевая БД (Slave) могла очень сильно отличаться.
По сути она может быть даже NoSQL.

Мастер - умный. Slave - глупый. Slave - просто глотает JmsMessages потоком и применяет их не думая
и не делая процессинг.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921808
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton, зачем вообще в данной задаче аудит на триггерах?
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921816
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky, это не аудит.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921822
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky
mayton, зачем вообще в данной задаче аудит на триггерах?
ну хочется ему. Для гимнастики ума.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921838
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Еще альтнативный вариант. Я думал о системе в которой нет триггеров но есть некий timestamp
последнего действия над data-rows.

Предполалается что мы трекаем только INSERT/UPDATE.

Для детектирования-же удалённых строк надо будет делать какое-то соединение между Master/Slave табличками
чтобы понять какие из них исчезли.

Но это - отдельным топиком.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921861
MazoHist
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
Еще альтнативный вариант. Я думал о системе в которой нет триггеров но есть некий timestamp
последнего действия над data-rows.

Предполалается что мы трекаем только INSERT/UPDATE.

Для детектирования-же удалённых строк надо будет делать какое-то соединение между Master/Slave табличками
чтобы понять какие из них исчезли.

Но это - отдельным топиком.

Это репликация снимками - на приемник гоним полную копию источника и сравниваем что менялось (primary/unique key надо бы, да). Использование timestamp без снимков чревато потерянными изменениями (я не встречал БД которая сохраняет именно время фиксации)
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921864
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
MazoHist
mayton
Еще альтнативный вариант. Я думал о системе в которой нет триггеров но есть некий timestamp
последнего действия над data-rows.

Предполалается что мы трекаем только INSERT/UPDATE.

Для детектирования-же удалённых строк надо будет делать какое-то соединение между Master/Slave табличками
чтобы понять какие из них исчезли.

Но это - отдельным топиком.

Это репликация снимками - на приемник гоним полную копию источника и сравниваем что менялось (primary/unique key надо бы, да). Использование timestamp без снимков чревато потерянными изменениями (я не встречал БД которая сохраняет именно время фиксации)

Интересно еще было бы рассмотреть как работает rsync или в частности Merkle-tree.
Если получится - то можно будет выделять группы строк которые изменились и передавать
только их.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921889
iOracleDev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky
mayton, зачем вообще в данной задаче аудит на триггерах?

У этой задачи цель одна))
mayton
Go-go кодить.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921893
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Эхех. Никому сабж неинтересен. Попрошу модера закрыть. Позже подниму топик с другой CDC.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921899
iOracleDev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton,

Ты себе представляешь нагрузку на ровном месте, которую дашь на СУБД? При том что каждая СУБД имеет
свои инструменты, работающие на низком уровне, такой Франкенштейн никому не нужен, разве что поигратья
и насобирать граблей.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921907
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton,
iOracleDev верно сказал.
И потом, представь что на пятом операторе в одной транзакции по БЛ сработал роллбэк отмена.
Такое часто бывает.
База удачно все откатит. А твой триггер уже отработал и послал сообщение.
Его не вернуть будет.
Потом, транзакции могут быть длинные. 20 минут например. Что с этим делать если до коммита это не изменения базы. А коммит может и не быть вообще.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39921909
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
СУБД свой мир. И седствами java там не развернуться.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39922019
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
пример тормозного говна)))

postgresql+h2database+ha-jdbc
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
222.
223.
224.
225.
226.
227.
228.
229.
230.
231.
232.
233.
234.
235.
236.
237.
package ha.jdbc_test;

public class HAJDBC_Test {

    private static final java.util.ArrayList<String> URLS = new java.util.ArrayList<String>() {
        {
            add("jdbc:h2:./test_h2;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE;USER=sa;PASSWORD=");
            add("jdbc:postgresql://127.0.0.1/test_pg?user=postgres&password=secret");
        }
    };

    public HAJDBC_Test() {
    }

    public static void main(String[] args) throws Exception {

        long t = System.currentTimeMillis();

        HAJDBC_Test ha = new HAJDBC_Test();

        for (String url : URLS) {
            try (java.sql.Connection c = java.sql.DriverManager.getConnection(url)) {
                ha.initBase(c);
            }
        }

        ha.initCluster();

        ha.loadData(ha.getConnection());

        Thread.sleep(1000);

        System.out.println("-------CLUSTER--------------------------------------");
        ha.checkData(ha.getConnection());
        System.out.println("----------------------------------------------------");
        

        ha.closeConnecton();

        for (String url : URLS) {
            try (java.sql.Connection c = java.sql.DriverManager.getConnection(url)) {
                ha.checkData(c);
            }
        }
        System.out.println("finish ... " + (System.currentTimeMillis() - t) + " ms");

        //Thread.sleep(1000 * 60 * 5);

        System.exit(0);

    }

    public void initBases() throws Exception {

        for (String url : URLS) {
            long t0 = System.currentTimeMillis();
            try (java.sql.Connection c = java.sql.DriverManager.getConnection(url)) {
                c.createStatement().execute("DROP TABLE IF EXISTS tb_test");
                c.createStatement().execute("CREATE TABLE IF NOT EXISTS tb_test ()");

                System.out.println("init " + c.getMetaData().getURL() + " " + (System.currentTimeMillis() - t0));

            } catch (java.sql.SQLException ex) {
                ex.printStackTrace();
                Thread.sleep(1000);
            }

        }
    }

    public void initCluster() throws InterruptedException {
        try {

            java.util.ArrayList<net.sf.hajdbc.sql.DriverDatabase> bases = new java.util.ArrayList<net.sf.hajdbc.sql.DriverDatabase>();

            int i = 0;
            for (String url : URLS) {
                net.sf.hajdbc.sql.DriverDatabase base = new net.sf.hajdbc.sql.DriverDatabase();
                base.setId("base" + i++);
                base.setLocation(url);

                bases.add(base);
            }

            net.sf.hajdbc.sql.DriverDatabaseClusterConfiguration config = new net.sf.hajdbc.sql.DriverDatabaseClusterConfiguration();

            config.setDatabases(bases);

            config.setDialectFactory(new net.sf.hajdbc.dialect.postgresql.PostgreSQLDialectFactory());

            config.setTransactionMode(net.sf.hajdbc.sql.TransactionModeEnum.SERIAL);

            config.setCurrentDateEvaluationEnabled(true);
            config.setCurrentTimeEvaluationEnabled(true);
            config.setCurrentTimestampEvaluationEnabled(true);
            config.setRandEvaluationEnabled(true);
            config.setSequenceDetectionEnabled(true);
            config.setIdentityColumnDetectionEnabled(true);

            config.setDatabaseMetaDataCacheFactory(new net.sf.hajdbc.cache.eager.SharedEagerDatabaseMetaDataCacheFactory());
            config.setBalancerFactory(new net.sf.hajdbc.balancer.simple.SimpleBalancerFactory());
            config.setStateManagerFactory(new net.sf.hajdbc.state.simple.SimpleStateManagerFactory());
            //config.setAutoActivationExpression(new net.sf.hajdbc.util.concurrent.cron.CronExpression("0 0/1 * 1/1 * ? *"));

            config.setDefaultSynchronizationStrategy("full");

            java.util.ArrayList<net.sf.hajdbc.SynchronizationStrategy> listStrategy = new java.util.ArrayList<net.sf.hajdbc.SynchronizationStrategy>() {
                {
                    add(new net.sf.hajdbc.sync.FullSynchronizationStrategy());
                    add(new net.sf.hajdbc.sync.DifferentialSynchronizationStrategy());
                    add(new net.sf.hajdbc.sync.FastDifferentialSynchronizationStrategy());
                    add(new net.sf.hajdbc.sync.PerTableSynchronizationStrategy(new net.sf.hajdbc.sync.FullSynchronizationStrategy()));
                    add(new net.sf.hajdbc.sync.PassiveSynchronizationStrategy());
                }
            };

            config.setSynchronizationStrategyMap(new java.util.TreeMap<>() {
                {
                    for (net.sf.hajdbc.SynchronizationStrategy e : listStrategy) {
                        put(e.getId(), e);
                    }
                }
            });

            net.sf.hajdbc.sql.Driver.setConfigurationFactory("cluster", new net.sf.hajdbc.SimpleDatabaseClusterConfigurationFactory<java.sql.Driver, net.sf.hajdbc.sql.DriverDatabase>(config));

            connection = java.sql.DriverManager.getConnection("jdbc:ha-jdbc:cluster", "sa", "");

        } catch (Exception ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }
    }

    private java.sql.Connection connection;

    private java.sql.Connection getConnection() {
        return connection;
    }

    private void closeConnecton() throws InterruptedException {

        //DatabaseCluster.stop();
        try {
            if (connection != null) {
                connection.close();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }
    }

    public void checkData(java.sql.Connection connection) throws InterruptedException {
        try {
            System.out.println("check data: " + connection.getCatalog());

            try (java.sql.ResultSet rsTables = connection.createStatement().executeQuery(""
                    + "SELECT * "
                    + "  FROM information_schema.tables"
                    + " ORDER BY table_name")) {

                while (rsTables.next()) {
                    String tableName = rsTables.getString("table_name");
                    if (tableName.toLowerCase().startsWith("tb_test")) {
                        System.out.println("\t" + rsTables.getString("table_name"));

                        try (java.sql.ResultSet rs = connection.createStatement().executeQuery(""
                                + "SELECT  * "
                                + "  FROM " + tableName
                                + " ORDER BY 1")) {

                            java.sql.ResultSetMetaData metaData = rs.getMetaData();
                            for (int col = 0; col < metaData.getColumnCount(); col++) {
                                if (col > 0) {
                                    System.out.print(", ");
                                }
                                System.out.printf("%s (%s)", metaData.getColumnName(col + 1), metaData.getColumnTypeName(col + 1));
                            }
                            System.out.println();
                            while (rs.next()) {
                                for (int col = 0; col < metaData.getColumnCount(); col++) {
                                    if (col > 0) {
                                        System.out.print("\t");
                                    }
                                    System.out.printf("%s", rs.getString(metaData.getColumnName(col + 1)));
                                }
                                System.out.println();
                            }
                        }
                    }

                }
            }
            System.out.println("--------------------");
        } catch (java.sql.SQLException ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }

    }

    private void initBase(java.sql.Connection connection) throws InterruptedException {
        try {
            System.out.println("Init base: " + connection.getCatalog());

            connection.createStatement().execute("DROP TABLE IF EXISTS tb_test");
            //connection.createStatement().execute("CREATE SCHEMA IF NOT EXISTS PUBLIC");
            connection.createStatement().execute("CREATE TABLE IF NOT EXISTS tb_test (id serial NOT NULL, CONSTRAINT pk_test PRIMARY KEY (id))");
            connection.createStatement().execute("ALTER TABLE tb_test ADD COLUMN IF NOT EXISTS verstamp timestamp DEFAULT NOW()");
            connection.createStatement().execute("ALTER TABLE tb_test ADD COLUMN IF NOT EXISTS data varchar");

        } catch (java.sql.SQLException ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }

    }

    private void loadData(java.sql.Connection connection) throws Exception {
        try {
            connection.createStatement().execute(""
                    + "INSERT INTO tb_test (data) "
                    + "             VALUES (CAST(NOW() AS varchar))");

            connection.createStatement().execute(""
                    + "INSERT INTO tb_test (data) "
                    + "             VALUES (CAST(RANDOM() AS varchar))");

        } catch (java.sql.SQLException ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }

    }

}

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
Init base: test_h2
Init base: test_pg
-------CLUSTER--------------------------------------
check data: test_h2
	tb_test
id (INTEGER), verstamp (TIMESTAMP), data (VARCHAR)
1	2020-02-03 18:05:27.395123	2020-02-03 18:05:27.386
2	2020-02-03 18:05:27.424016	0.40035240110768167
--------------------
----------------------------------------------------
check data: test_h2
	tb_test
id (INTEGER), verstamp (TIMESTAMP), data (VARCHAR)
1	2020-02-03 18:05:27.395123	2020-02-03 18:05:27.386
2	2020-02-03 18:05:27.424016	0.40035240110768167
--------------------
check data: test_pg
	tb_test
id (serial), verstamp (timestamp), data (varchar)
1	2020-02-03 18:05:27.398833	2020-02-03 18:05:27.386
2	2020-02-03 18:05:27.424845	0.40035240110768167
--------------------

+покоцал иcходники h2 (небыло show search_path )
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
private Prepared parseShow() {
...
        } else if (readIf("SEARCH_PATH")) {
            buff.append("'");
            if (session.getSchemaSearchPath() == null || session.getSchemaSearchPath().length == 0) {
                buff.append(session.getCurrentSchemaName());
            } else {
                for (String sn : session.getSchemaSearchPath()) {
                    buff.append(sn);
                    buff.append(", ");
                }
                buff.setLength(buff.length() - 2);
            }
            buff.append("' AS SEARCH_PATH FROM DUAL");
...

...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39922021
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky, о. Шикарно. Посмотрю вечером.
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39922099
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
наткнулся на проводки)))

тест запилю))) (postgresql+h2database+ha-jdbc)

интересно ... откудова такие цифры ... всё-таки 10 лямов ....
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39922103
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
запросы о проводках ....
нда.а.а.а . . . результаты не приближённые ...
HAJDBC_Test
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
222.
223.
224.
225.
226.
227.
228.
229.
230.
231.
232.
233.
234.
235.
236.
237.
238.
239.
240.
241.
242.
243.
244.
245.
246.
247.
248.
249.
250.
251.
252.
253.
254.
255.
256.
257.
258.
259.
260.
261.
262.
263.
264.
265.
266.
267.
268.
269.
270.
271.
272.
273.
274.
275.
276.
277.
278.
279.
280.
281.
282.
283.
284.
285.
286.
287.
288.
289.
290.
291.
292.
293.
294.
295.
296.
297.
298.
299.
300.
301.
302.
303.
304.
305.
306.
307.
308.
309.
310.
311.
312.
313.
314.
315.
316.
317.
318.
319.
package ha.jdbc_test;

public class HAJDBC_Test {

    private static final java.util.ArrayList<String> URLS = new java.util.ArrayList<String>() {
        {
            add("jdbc:postgresql://127.0.0.1/test_pg?user=postgres&password=secret");
            add("jdbc:h2:./test_h2;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE;USER=sa;PASSWORD=");
        }
    };

    public HAJDBC_Test() {
    }

    public static void main(String[] args) throws Exception {

        long t = System.currentTimeMillis();

        HAJDBC_Test ha = new HAJDBC_Test();

        for (String url : URLS) {
            try (java.sql.Connection c = java.sql.DriverManager.getConnection(url)) {
                ha.initBase(c);
            }
        }

        ha.initCluster();
        ha.loadData(ha.getConnection());

        System.out.println("=======CLUSTER=================================================");
        ha.checkData(ha.getConnection());
        ha.testProvodki(ha.getConnection());
        ha.closeConnecton();
        System.out.println("===============================================================");

        for (String url : URLS) {
            try (java.sql.Connection c = java.sql.DriverManager.getConnection(url)) {
                ha.checkData(c);
            }
        }

        System.out.println("old");
        System.out.println("old");
        System.out.println("old");
        System.out.println("new");

        System.out.println("=======SINGLE DATABASE=========================================");
        for (String url : URLS) {
            try (java.sql.Connection c = java.sql.DriverManager.getConnection(url)) {
                ha.testProvodki(c);
            }
        }

        System.out.println("finish ... " + (System.currentTimeMillis() - t) + " ms");

        //Thread.sleep(1000 * 60 * 5);
        System.exit(0);

    }

    public void initBases() throws Exception {

        for (String url : URLS) {
            long t0 = System.currentTimeMillis();
            try (java.sql.Connection c = java.sql.DriverManager.getConnection(url)) {
                c.createStatement().execute("DROP TABLE IF EXISTS tb_test");
                c.createStatement().execute("CREATE TABLE IF NOT EXISTS tb_test ()");

                System.out.println("init " + c.getMetaData().getURL() + " " + (System.currentTimeMillis() - t0));

            } catch (java.sql.SQLException ex) {
                ex.printStackTrace();
                Thread.sleep(1000);
            }

        }
    }

    public void initCluster() throws InterruptedException {
        try {

            java.util.ArrayList<net.sf.hajdbc.sql.DriverDatabase> bases = new java.util.ArrayList<net.sf.hajdbc.sql.DriverDatabase>();

            int i = 0;
            for (String url : URLS) {
                net.sf.hajdbc.sql.DriverDatabase base = new net.sf.hajdbc.sql.DriverDatabase();
                base.setId("base" + i++);
                base.setLocation(url);

                bases.add(base);
            }

            net.sf.hajdbc.sql.DriverDatabaseClusterConfiguration config = new net.sf.hajdbc.sql.DriverDatabaseClusterConfiguration();

            config.setDatabases(bases);

            config.setDialectFactory(new net.sf.hajdbc.dialect.postgresql.PostgreSQLDialectFactory());

            config.setTransactionMode(net.sf.hajdbc.sql.TransactionModeEnum.SERIAL);

            config.setCurrentDateEvaluationEnabled(true);
            config.setCurrentTimeEvaluationEnabled(true);
            config.setCurrentTimestampEvaluationEnabled(true);
            config.setRandEvaluationEnabled(true);
            config.setSequenceDetectionEnabled(true);
            config.setIdentityColumnDetectionEnabled(true);

            config.setDatabaseMetaDataCacheFactory(new net.sf.hajdbc.cache.eager.SharedEagerDatabaseMetaDataCacheFactory());
            config.setBalancerFactory(new net.sf.hajdbc.balancer.simple.SimpleBalancerFactory());
            config.setStateManagerFactory(new net.sf.hajdbc.state.simple.SimpleStateManagerFactory());
            //config.setAutoActivationExpression(new net.sf.hajdbc.util.concurrent.cron.CronExpression("0 0/1 * 1/1 * ? *"));

            config.setDefaultSynchronizationStrategy("full");

            java.util.ArrayList<net.sf.hajdbc.SynchronizationStrategy> listStrategy = new java.util.ArrayList<net.sf.hajdbc.SynchronizationStrategy>() {
                {
                    add(new net.sf.hajdbc.sync.FullSynchronizationStrategy());
                    add(new net.sf.hajdbc.sync.DifferentialSynchronizationStrategy());
                    add(new net.sf.hajdbc.sync.FastDifferentialSynchronizationStrategy());
                    add(new net.sf.hajdbc.sync.PerTableSynchronizationStrategy(new net.sf.hajdbc.sync.FullSynchronizationStrategy()));
                    add(new net.sf.hajdbc.sync.PassiveSynchronizationStrategy());
                }
            };

            config.setSynchronizationStrategyMap(new java.util.TreeMap<>() {
                {
                    for (net.sf.hajdbc.SynchronizationStrategy e : listStrategy) {
                        put(e.getId(), e);
                    }
                }
            });

            net.sf.hajdbc.sql.Driver.setConfigurationFactory("cluster", new net.sf.hajdbc.SimpleDatabaseClusterConfigurationFactory<java.sql.Driver, net.sf.hajdbc.sql.DriverDatabase>(config));

            connection = java.sql.DriverManager.getConnection("jdbc:ha-jdbc:cluster", "sa", "");

        } catch (Exception ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }
    }

    private java.sql.Connection connection;

    private java.sql.Connection getConnection() {
        return connection;
    }

    private void closeConnecton() throws InterruptedException {

        //DatabaseCluster.stop();
        try {
            if (connection != null) {
                connection.close();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }
    }

    public void checkData(java.sql.Connection connection) throws InterruptedException {
        try {
            System.out.println("check data: " + connection.getCatalog());

            try (java.sql.ResultSet rsTables = connection.createStatement().executeQuery(""
                    + "SELECT * "
                    + "  FROM information_schema.tables"
                    + " ORDER BY table_name")) {

                while (rsTables.next()) {
                    String tableName = rsTables.getString("table_name");
                    if (tableName.toLowerCase().startsWith("tb_test")) {
                        System.out.println("\t" + rsTables.getString("table_name"));

                        try (java.sql.ResultSet rs = connection.createStatement().executeQuery(""
                                + "SELECT  * "
                                + "  FROM " + tableName
                                + " ORDER BY 1")) {

                            java.sql.ResultSetMetaData metaData = rs.getMetaData();
                            for (int col = 0; col < metaData.getColumnCount(); col++) {
                                if (col > 0) {
                                    System.out.print(", ");
                                }
                                System.out.printf("%s (%s)", metaData.getColumnName(col + 1), metaData.getColumnTypeName(col + 1));
                            }
                            System.out.println();
                            while (rs.next()) {
                                for (int col = 0; col < metaData.getColumnCount(); col++) {
                                    if (col > 0) {
                                        System.out.print("\t");
                                    }
                                    System.out.printf("%s", rs.getString(metaData.getColumnName(col + 1)));
                                }
                                System.out.println();
                            }
                        }
                    }

                }
            }
            System.out.println("--------------------");
        } catch (java.sql.SQLException ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }

    }

    public void initBase(java.sql.Connection connection) throws InterruptedException {
        try {
            System.out.println("Init base: " + connection.getCatalog());

            connection.createStatement().execute("DROP TABLE IF EXISTS tb_test");
            //connection.createStatement().execute("CREATE SCHEMA IF NOT EXISTS PUBLIC");
            connection.createStatement().execute("CREATE TABLE IF NOT EXISTS tb_test (id serial NOT NULL, CONSTRAINT pk_test PRIMARY KEY (id))");
            connection.createStatement().execute("ALTER TABLE tb_test ADD COLUMN IF NOT EXISTS verstamp timestamp DEFAULT NOW()");
            connection.createStatement().execute("ALTER TABLE tb_test ADD COLUMN IF NOT EXISTS data varchar");

        } catch (java.sql.SQLException ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }

    }

    public void loadData(java.sql.Connection connection) throws Exception {
        try {
            connection.createStatement().execute(""
                    + "INSERT INTO tb_test (data) "
                    + "             VALUES (CAST(NOW() AS varchar))");

            connection.createStatement().execute(""
                    + "INSERT INTO tb_test (data) "
                    + "             VALUES (CAST(RANDOM() AS varchar))");

        } catch (java.sql.SQLException ex) {
            ex.printStackTrace();
            Thread.sleep(1000);
        }

    }

    public void testProvodki(java.sql.Connection connection) throws Exception {

        long t_startTest = System.currentTimeMillis();

        System.out.println("===============================================================");
        System.out.println("!!!!!!!!!!!!!!!!!testProvodki!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
        System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");

        System.out.println(" start testProvodki: " + connection.getCatalog() + " " + t_startTest);

        /*, tmp_temp*/ /*need fix source h2 multiple drop*/
        connection.createStatement().execute(""
                + "DROP TABLE IF EXISTS tmp_provodki");
        connection.createStatement().execute(""
                + "DROP TABLE IF EXISTS tmp_temp");

        /*unlogged*/ /*need fix source unlogged/temp table*/
        connection.createStatement().execute(""
                + "CREATE TABLE tmp_provodki \n"
                + "   (from_id  smallint,\n"
                + "    to_id    smallint,\n"
                + "    amount   bigint);"
        );

        long t_startInsert = System.currentTimeMillis();

        System.out.println("     start insert tmp_provodki: " + connection.getCatalog() + " " + (t_startInsert));

        //generate_series(1, 1000000, 1) //--need fix hajdbc --#%$#@Q!#~!@#~ map foreach bath memory
        //connection.createStatement().execute("INSERT INTO tmp_provodki (SELECT random()*89  + 1, random()*89 + 1, random()*100 FROM generate_series(1, 1000000, 1))");
        //"nj rfgtw"
        java.sql.PreparedStatement stmt = connection.prepareStatement("INSERT INTO tmp_provodki VALUES (?, ?, ?)");

        for (int i = 0; i < 10_000_000; i++) {
            stmt.setInt(1, (int) (Math.random() * 89));
            stmt.setInt(2, (int) (Math.random() * 89));
            stmt.setInt(3, (int) (Math.random() * 100));
            stmt.addBatch();
            if (i % 10_000 == 0) {
                stmt.executeBatch();
            }
        }

        //java.sql.Statement stmt = connection.createStatement();
        //for (int i = 0; i < 10; i++) {
        //    stmt.addBatch(String.format("INSERT INTO tmp_provodki VALUES (%d, %d, %d)", (int) (Math.random() * 89), (int) (Math.random() * 89), (int) (Math.random() * 100)));
        //}
        stmt.executeBatch();
        System.out.println("     finish insert tmp_provodki: " + connection.getCatalog() + " " + (System.currentTimeMillis() - t_startInsert) + "");

        System.out.println("     __________________________________________________________");

        long t_startGroup = System.currentTimeMillis();
        System.out.println("         start group tmp_provodki: " + connection.getCatalog() + " >>>" + System.currentTimeMillis());
        connection.createStatement().execute(""
                + ""
                + "CREATE TEMP TABLE results AS ( \n"
                + "SELECT q.from_id, pos - neg    \n"
                + "   FROM (SELECT from_id, sum(amount) as neg           \n"
                + "          FROM tmp_provodki          \n"
                + "         GROUP BY from_id) AS q   \n"
                + "   LEFT JOIN (SELECT to_id, sum(amount) as pos\n"
                + "                FROM tmp_provodki\n"
                + "               GROUP BY to_id) AS w ON q.from_id = w.to_id\n"
                + " )"
        );
        System.out.println("         finish group tmp_provodki: " + connection.getCatalog() + " " + (System.currentTimeMillis() - t_startInsert) + "");
        System.out.println("     __________________________________________________________");
        System.out.println("finish testProvodki: " + connection.getCatalog() + " " + (System.currentTimeMillis() - t_startTest) + "");
        System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
        System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
        System.out.println("===============================================================");
    }

}

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
Init base: test_pg
Init base: test_h2
=======CLUSTER=================================================
check data: test_pg
	tb_test
id (serial), verstamp (timestamp), data (varchar)
1	2020-02-04 02:50:49.232158	2020-02-04 02:50:49.214
2	2020-02-04 02:50:49.262206	0.758233148350979
--------------------
===============================================================
!!!!!!!!!!!!!!!!!testProvodki!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 start testProvodki: test_pg 1580777449365
     start insert tmp_provodki: test_pg 1580777449454
     finish insert tmp_provodki: test_pg 489547
     __________________________________________________________
         start group tmp_provodki: test_pg >>>1580777939008
         finish group tmp_provodki: test_pg 853025
     __________________________________________________________
finish testProvodki: test_pg 853115
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
===============================================================
===============================================================
check data: test_pg
	tb_test
id (serial), verstamp (timestamp), data (varchar)
1	2020-02-04 02:50:49.232158	2020-02-04 02:50:49.214
2	2020-02-04 02:50:49.262206	0.758233148350979
--------------------
check data: test_h2
	tb_test
id (INTEGER), verstamp (TIMESTAMP), data (VARCHAR)
1	2020-02-04 02:50:49.254813	2020-02-04 02:50:49.214
2	2020-02-04 02:50:49.270303	0.758233148350979
--------------------
old
old
old
new
=======SINGLE DATABASE=========================================
===============================================================
!!!!!!!!!!!!!!!!!testProvodki!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 start testProvodki: test_pg 1580778303896
     start insert tmp_provodki: test_pg 1580778304007
     finish insert tmp_provodki: test_pg 446809
     __________________________________________________________
         start group tmp_provodki: test_pg >>>1580778750816
         finish group tmp_provodki: test_pg 451518
     __________________________________________________________
finish testProvodki: test_pg 451629
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
===============================================================
===============================================================
!!!!!!!!!!!!!!!!!testProvodki!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 start testProvodki: test_h2 1580778755550
     start insert tmp_provodki: test_h2 1580778755750
     finish insert tmp_provodki: test_h2 47468
     __________________________________________________________
         start group tmp_provodki: test_h2 >>>1580778803218
         finish group tmp_provodki: test_h2 425039
     __________________________________________________________
finish testProvodki: test_h2 425239
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
===============================================================
finish ... 1737669 ms
...
Рейтинг: 0 / 0
Тяпничный CDC/JMS
    #39922173
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Не посмотрел. Устал вчера. Ох уж этот ентерпрайз...
...
Рейтинг: 0 / 0
24 сообщений из 49, страница 2 из 2
Форумы / Java [игнор отключен] [закрыт для гостей] / Тяпничный CDC/JMS
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]