powered by simpleCommunicator - 2.0.61     © 2026 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / PostgreSQL [игнор отключен] [закрыт для гостей] / Дублирование ивентов PgQ
32 сообщений из 32, показаны все 2 страниц
Дублирование ивентов PgQ
    #38865904
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
С недавних пор возникла ситуация, когда pgq создает дублирующие батчи. Имеются ввиду батчи с разными batch_id, но содержащие идентичные ивенты.

Консумер читает батч и процессит ивенты поодному, вызывая pgq_ext.set_event_done() после обработки каждого ивента( и конечно проверяет is_even_done перед обработкой). После обработки всех ивентов батча, консумер выполняет finish_batch() и берет следующий.

Когда возникает проблема, консумер обрабатывает все ивенты батча 1, делает finish_batch(), finish_batch говорит "WARNING: finish_batch: batch 1 not found", после этого консумер берет батч 2, который содержит все ивенты, уже обработанные в батче 1( тот же event_id, тот же пейлоад)

Сталкивался ли кто то с такой ситуацией?
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38867092
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Есть подозрение, что виновата нестабильность времени на сервере, оказалось ntp был поломан. Мониторим пока.
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868073
Фотография Ёш
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Кактуз,

а какая версия skytools?
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868141
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Версия 3.1.5, если точнее то версия deb пакета 3.1.5-1.pgdg70+1

Код: plsql
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
# SELECT pgq_ext.version();
 version 
---------
 3.1
(1 row)
# SELECT version();
                                           version                                            
----------------------------------------------------------------------------------------------
 PostgreSQL 9.3.5 on x86_64-unknown-linux-gnu, compiled by gcc (Debian 4.7.2-5) 4.7.2, 64-bit
(1 row)
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868144
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
починка ntp проблемы не решила.
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868299
Фотография Misha Tyurin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Кактуз,

> консумер

какой конкретно консумер?

"finish_batch: batch % not found"
это говорит о том, что батча такого не было вам выдано.

опишите подробно схему, с указанием, кто куда и через что подключается, что за консумер, и как вы его запускаете.
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868300
Фотография Misha Tyurin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
и штатно ли работал сервер - база - консумер перед возникновением "дублирования"?
и еще всё таки уточните, как вы видите, что "тот же event_id, тот же пейлоад"?
ну и какую задачу реашет косумер в общем?
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868453
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Misha Tyurin,

Консумер самописный, он логгирует каждый полученный батч и каждый евент батча. Точно известно что батч ему был выдан.
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868455
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Misha Tyurinи штатно ли работал сервер - база - консумер перед возникновением "дублирования"?
и еще всё таки уточните, как вы видите, что "тот же event_id, тот же пейлоад"?
ну и какую задачу реашет косумер в общем?

Все работало штатно.
Я вижу в логах что консумер вычитывает евенты батча, обрабатывает их(он логирует евенты и отсылает их на REST API удаленной системы), после этого finish_batch дает вышеописанный ворнинг. Следующий батч содержит эти же евенты(такие же ev_id, и такой же пайлоад). Отмечу что удаленная система ведет логи, и эти логи потдверждают вышеописанный сценарий.
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868458
Misha Tyurin,

предположим, что кроме триггерной репликации, наши сервера имеют горячие стендбаи (оба--два, и подписчик и публикатор)
предположим, далее, что батч (его приём) накатан на упавший впоследствии праймари подписчика. (ну например).
а на стендбай подписчика -- нет (по какой-то причине).
Может ли в результате батч очиститься из очереди (на паблишере) , а на запущенном в кач-ве праймери стендбае считаться ненакаченным ?

-- очень похоже на такую историю.
(как вариант -- подписчик упал отработав , но не закоммитив накатку батча [батарейка сдохла], а паблишер почему-то её отработал, без всяких стендбаев и т.п. --Имеем -- на подписчике требуется батч, на публикаторе его нет).
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868459
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
общая логика работы такова:

Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
Псевдокод
while true {
    batch=pgq.get_next_batch()
    for event in pgq.get_batch_events(batch) {
        if pgq_ext.is_event_done(event) 
            next;
        process(event);
        pgq_ext.set_event_done(event)
    }
    pgq.finish_batch()
}
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868460
КактузMisha Tyurinи штатно ли работал сервер - база - консумер перед возникновением "дублирования"?
и еще всё таки уточните, как вы видите, что "тот же event_id, тот же пейлоад"?
ну и какую задачу реашет косумер в общем?

Все работало штатно.
Я вижу в логах что консумер вычитывает евенты батча, обрабатывает их(он логирует евенты и отсылает их на REST API удаленной системы), после этого finish_batch дает вышеописанный ворнинг. Следующий батч содержит эти же евенты(такие же ev_id, и такой же пайлоад). Отмечу что удаленная система ведет логи, и эти логи потдверждают вышеописанный сценарий.

вам надо иисчислить потерянный батч, и катануть его дельту руками, и удалить его с подписчика. штатно это делается ресинком, но очень дорого (COPY all на диск + сравнение)
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868463
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
интересуюсьвам надо иисчислить потерянный батч, и катануть его дельту руками, и удалить его с подписчика. штатно это делается ресинком, но очень дорого (COPY all на диск + сравнение)

У меня нет репликации. У меня консумер другим занимается, прочитайте внимательно.
Проблема в том наш консумер обрабатывает евенты батча, его евенты магическим образом перемещаются в следующий.
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868481
Кактуз,
да , я уже поудмал, что у вас не репликация.

под руками нет установленного pgq, -- если я верно помню -- там эти сообщения могут выкидывать plpgsql хранимки pgq-ных схем . если так -- почитайте тексты-- там станет ясно, что на самом деле у вас происходит (а не то, что вы об этом думаете).

если не так -- то придется в исходники pgq лезть -- а это уже сложнее.


я всё таки думаю, что у вас рассогласованное состояние публикатора и подписчика. и таково оно, поскольку где-то (на чьей-то стороне) не закоммитили очередную порцию. (там таки нет диспетчера распределенных транз. или я не прав ? )
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868500
Фотография Misha Tyurin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Кактуз,

по вашему описанию и псевдокоду сложно что-то сказать определенно, так как очень много всего не известного.

у вас вероятно финиш батч не прошел где-то.

обращаю внимание тогда на этот кусок
https://github.com/markokr/skytools/blob/master/sql/pgq/functions/pgq.finish_batch.sql
WARNING: finish_batch...
такой варнинг -- это вот так как не нашли.
но если смотреть
https://github.com/markokr/skytools/blob/master/sql/pgq/functions/pgq.next_batch.sql

-- has already active batch
if batch_id is not null then
return;
end if;

то выглядит так, как будто у вас код где-то консумера развалился или что-то где-то "переключалось"
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868504
Фотография Misha Tyurin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Misha Tyurin,

> у вас вероятно финиш батч не прошел где-то.

это ни при чем.

надо смотреть как вы с транзакциями в вашем консумере обращаетесь.
почти уверен что, вы

1) открывайте транзакцию в источнике и получаете next_batch
2) ! далее не закрываете её
3) что-то делает в приемнике
4) делаете финиш в источнике
5) закрываете транзакцию в источинике

вы из питона работаете? вы уверены, что понимаете как организовали транзакции ваши на обоих концах? там в питоне абстракции курсора-коннекта-транзакции всё замучено слегка.

и вот у вас код где-то "вылетел" между 1 и 5. а так как сиквенс не транзакционен вы имеете айди новое батча, а события старые.


---
ууух, это была заявка на вангу года уже в январе!
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868514
Фотография Misha Tyurin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
точнее, по вашему "логу", вылетает у вас между 1 и 4; т. е. вы делаете финиш уже того, что "отпало" и не закомитилось.

у вас кстати источник и приемник pgq -- это разные базы?

и в каком месте и по какому принципу идет в консумере работа с "удаленная система"?
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868570
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Misha TyurinMisha Tyurin,

> у вас вероятно финиш батч не прошел где-то.

это ни при чем.

надо смотреть как вы с транзакциями в вашем консумере обращаетесь.
почти уверен что, вы

1) открывайте транзакцию в источнике и получаете next_batch
2) ! далее не закрываете её
3) что-то делает в приемнике
4) делаете финиш в источнике
5) закрываете транзакцию в источинике

вы из питона работаете? вы уверены, что понимаете как организовали транзакции ваши на обоих концах? там в питоне абстракции курсора-коннекта-транзакции всё замучено слегка.

и вот у вас код где-то "вылетел" между 1 и 5. а так как сиквенс не транзакционен вы имеете айди новое батча, а события старые.


---
ууух, это была заявка на вангу года уже в январе!

консумер - приложение на руби, которое читает евенты из postgresql. Работа с pgq написана максимально низкоуронево, никакой динамической генерации sql нет.
Потребитель - REST API, с которым происходит работа по http. Для каждого евента в батче, приложение дергает REST API. Если происходит ексепшн любого рода(проблемы с сетью, http ответ отличается от 200OK) приложение падает и отсылает емейл с бектрейсом, и автоматически работу не возобновляет. В случае возникновения проблемы, код между 1 и 5 не вылетает, эксепшнов не происходит, это подтверждается логами на стороне API и логами postgresql где тоже не видно эксепшнов.
Еще раз уточняю что каждый евент метится обработанным путем вызова set_event_done(), кроме того все выполняется в "режиме автокоммита", то есть явно не вызывается begin/commit, то есть каждый вызов sql - внутри своей транзакции.
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868597
Фотография Misha Tyurin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Кактуз,
нет, не так.

> логами postgresql где тоже не видно эксепшнов

при роллбеке никакого експешина не происходит.
вы просто не коммитете next_batch().
как уж это происходит, это вам разбираться. отлаживайте свой код на руби.

еще раз повторю вопрос: у вас источник и приемник -- это одна и та же база? это может быть важно опять же для понимания как у вас транзакция открывается/закрывается.

--
я могу вам предложить отладку на увроне pg

1) мониторте
http://www.postgresql.org/docs/9.4/static/monitoring-stats.html#PG-STAT-DATABASE-VIEW
xact_rollback

2) ! включите логи запросов на сесии работы с pgq ! --- и вы увидете как у ваз вызывается get_next_batch() и get_events() -- там будет видно, одна и та же ли это транзакция
http://www.postgresql.org/docs/9.4/static/runtime-config-logging.html
%x Transaction ID (0 if none is assigned)


и еще вам можно оч много вариантов предложить, но пока это попробуйте
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868610
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Misha TyurinКактуз,
нет, не так.

> логами postgresql где тоже не видно эксепшнов

при роллбеке никакого експешина не происходит.
вы просто не коммитете next_batch().
как уж это происходит, это вам разбираться. отлаживайте свой код на руби.

еще раз повторю вопрос: у вас источник и приемник -- это одна и та же база? это может быть важно опять же для понимания как у вас транзакция открывается/закрывается.

--
я могу вам предложить отладку на увроне pg

1) мониторте
http://www.postgresql.org/docs/9.4/static/monitoring-stats.html#PG-STAT-DATABASE-VIEW
xact_rollback

2) ! включите логи запросов на сесии работы с pgq ! --- и вы увидете как у ваз вызывается get_next_batch() и get_events() -- там будет видно, одна и та же ли это транзакция
http://www.postgresql.org/docs/9.4/static/runtime-config-logging.html
%x Transaction ID (0 if none is assigned)


и еще вам можно оч много вариантов предложить, но пока это попробуйте

Я уже несколько раз написал, что приемник это HTTP REST API - это не база данных, а внешний сервис, работающий по протоколу http, он не поддерживает транзакции и атомарно обрабатывает только один запрос. Именно потому евенты обрабатываются поодному. Транзакции у меня не открываются/закрываются явно(приложение не вызывает begin/commit). Каждый вызов любой хранимки связанной с pgq происходит в своей транзакции, которая неявно начинается и коммитится, в одной транзакции выполняется только один statement. Я отлично знаю про возможности логгирования pg, проблема в том что на данный момент несколько сложно записать логи по причине их объема, а проблема повторяется нечасто и только с одним инстансом. В тестовом окружении, с теми же версиями кода проблема не повторяется. Логгирование SQL, в тестовом окружении не показывает ни единого begin/commit/rollback которые выполняло бы приложение.
Код: sql
1.
2.
3.
4.
5.
6.
2015-01-31 19:37:14.374 UTC-sandbox-1.1.1.1@sandbox_db-54cd2eba.7aa8 LOG:  statement: SELECT pgq.next_batch('routeserver', 'routeserver')
2015-01-31 19:37:14.875 UTC-sandbox-1.1.1.1@sandbox_db-54cd2eba.7aa8 LOG:  statement: SELECT pgq.next_batch('routeserver', 'routeserver')
2015-01-31 19:37:14.881 UTC-sandbox-1.1.1.1@sandbox_db-54cd2eba.7aa8 LOG:  statement: SELECT * FROM pgq.get_batch_events(185641) WHERE pgq_ext.is_event_done('routeserver', 185641, ev_id) = false ORDER BY ev_id
2015-01-31 19:37:14.889 UTC-sandbox-1.1.1.1@sandbox_db-54cd2eba.7aa8 LOG:  statement: SELECT pgq.finish_batch(185641)
2015-01-31 19:37:14.893 UTC-sandbox-1.1.1.1@sandbox_db-54cd2eba.7aa8 LOG:  statement: SELECT pgq.next_batch('routeserver', 'routeserver')
2015-01-31 19:37:15.394 UTC-sandbox-1.1.1.1@sandbox_db-54cd2eba.7aa8 LOG:  statement: SELECT pgq.next_batch('routeserver', 'routeserver')


54cd2eba.7aa8 - id сессии, а не транзакции.

Ваш посыл я понял - next_batch() незакоммичен, а get_batch_events сработал потому что был с ним в единой транзакции, я в 100500й раз проверю этот вариант, но сложно проверить что один и тот же код внезапно стал явно начинать транзакции, если он был написаен чтобы этого не делать, и в тестовом окружении этого не делает. Не пишите пожалуйста очевидных вещей типа, как логгировать запросы в пг.
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868615
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Misha Tyurin,
Специально логи с тестового окружения, где работает этот же код. log_line_prefix = '%m-%u-%h@%d-%c-%x-%v '
У каждой строки свой virtual transaction id. transaction_id везде 0, тк явно транзакция не начата.

Код: sql
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
2015-01-31 19:56:13.749 UTC-sandbox-1.1.1.1@sandbox_db-54cd328d.7c78-0-5/428930 LOG:  statement: SELECT pgq.next_batch('routeserver', 'routeserver')
2015-01-31 19:56:13.780 UTC-sandbox-1.1.1.1@sandbox_db-54cd328d.7c78-0-5/428931 LOG:  statement: SELECT * FROM pgq.get_batch_events(185666) WHERE pgq_ext.is_event_done('routeserver', 185666, ev_id) = false ORDER BY ev_id
2015-01-31 19:56:13.784 UTC-sandbox-1.1.1.1@sandbox_db-54cd328d.7c78-0-5/428932 LOG:  statement: SELECT pgq_ext.is_event_done('routeserver', 185666, '2093145')
2015-01-31 19:56:13.806 UTC-sandbox-1.1.1.1@sandbox_db-54cd328d.7c78-0-5/428933 LOG:  statement: SELECT pgq_ext.set_event_done('routeserver', 185666, '2093145')
2015-01-31 19:56:13.808 UTC-sandbox-1.1.1.1@sandbox_db-54cd328d.7c78-0-5/428934 LOG:  statement: SELECT pgq_ext.is_event_done('routeserver', 185666, '2093146')
2015-01-31 19:56:13.842 UTC-sandbox-1.1.1.1@sandbox_db-54cd328d.7c78-0-5/428935 LOG:  statement: SELECT pgq_ext.set_event_done('routeserver', 185666, '2093146')
2015-01-31 19:56:13.844 UTC-sandbox-1.1.1.1@sandbox_db-54cd328d.7c78-0-5/428936 LOG:  statement: SELECT pgq_ext.is_event_done('routeserver', 185666, '2093147')
2015-01-31 19:56:13.864 UTC-sandbox-1.1.1.1@sandbox_db-54cd328d.7c78-0-5/428937 LOG:  statement: SELECT pgq_ext.set_event_done('routeserver', 185666, '2093147')
2015-01-31 19:56:13.866 UTC-sandbox-1.1.1.1@sandbox_db-54cd328d.7c78-0-5/428938 LOG:  statement: SELECT pgq_ext.is_event_done('routeserver', 185666, '2093148')
2015-01-31 19:56:13.899 UTC-sandbox-1.1.1.1@sandbox_db-54cd328d.7c78-0-5/428939 LOG:  statement: SELECT pgq_ext.set_event_done('routeserver', 185666, '2093148')
2015-01-31 19:56:13.902 UTC-sandbox-1.1.1.1@sandbox_db-54cd328d.7c78-0-5/428940 LOG:  statement: SELECT pgq_ext.is_event_done('routeserver', 185666, '2093149')
2015-01-31 19:56:13.923 UTC-sandbox-1.1.1.1@sandbox_db-54cd328d.7c78-0-5/428941 LOG:  statement: SELECT pgq_ext.set_event_done('routeserver', 185666, '2093149')
2015-01-31 19:56:13.926 UTC-sandbox-1.1.1.1@sandbox_db-54cd328d.7c78-0-5/428942 LOG:  statement: SELECT pgq.finish_batch(185666)
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868621
Фотография Misha Tyurin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Кактуз,

у вас одна база. тааак.
а через что и как вы коннектитись к pg? есть ли разница между боевым окружением и тестовым?

какая там среда, библиотеки? как там "автокоммит" выставляется?
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868623
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Misha TyurinКактуз,

у вас одна база. тааак.
а через что и как вы коннектитись к pg? есть ли разница между боевым окружением и тестовым?

какая там среда, библиотеки? как там "автокоммит" выставляется?

Автокоммит по дефолту, используется ActiveRecord, но вся работа с pgq сделана сырыми запросами, без всякого "интеллекта" фреймворка.
Разница продакшна и сендбокса только в данных и нагрузке. Ну и в том что на одном из продакшн инстансов наблюдается вышеописанная проблема.
Когда проблема возникла я сразу начал проверять именно приложение, однако в нем проблем на нашел.
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868626
Фотография Misha Tyurin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Кактуз,

ну пока не сильно картина проясняется.

> на одном из продакшн инстансов

вот там и надо логи смотреть.
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868636
Фотография Misha Tyurin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
SELECT pgq.next_batch('routeserver', 'routeserver')
SELECT pgq.next_batch('routeserver', 'routeserver')
SELECT * FROM pgq.get_batch_events(185641) WHERE pgq_ext.is_event_done('routeserver', 185641, ev_id) = false ORDER BY ev_id
SELECT pgq.finish_batch(185641)
SELECT pgq.next_batch('routeserver', 'routeserver')
SELECT pgq.next_batch('routeserver', 'routeserver')


вот это тоже у вас какой-то интересный пример. это реальный лог?
где обработка то событий? или вы её вы просто не привели?

батч айди вернулся, а события уже все были отработаны?
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868776
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Misha TyurinSELECT pgq.next_batch('routeserver', 'routeserver')
SELECT pgq.next_batch('routeserver', 'routeserver')
SELECT * FROM pgq.get_batch_events(185641) WHERE pgq_ext.is_event_done('routeserver', 185641, ev_id) = false ORDER BY ev_id
SELECT pgq.finish_batch(185641)
SELECT pgq.next_batch('routeserver', 'routeserver')
SELECT pgq.next_batch('routeserver', 'routeserver')


вот это тоже у вас какой-то интересный пример. это реальный лог?
где обработка то событий? или вы её вы просто не привели?

батч айди вернулся, а события уже все были отработаны?

Это реальный лог. Я уже не в первый раз пишу, что обработка это отсылка http запросов на удаленный сервер. Происходит она между

Код: sql
1.
SELECT * FROM pgq.get_batch_events(185641) WHERE pgq_ext.is_event_done('routeserver', 185641, ev_id) = false ORDER BY ev_id


и
Код: sql
1.
SELECT pgq.finish_batch(185641)


Естественно что в логи постгреса не пишется ничего связанного с http.
set_event_done() тут не используется, тк в этом батче был только один евент. В случае когда в одном батче много евентов, используется set_event_done()
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868802
Фотография Misha Tyurin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Кактуз,

> set_event_done() тут не используется, тк в этом батче был только один евент
чет как-то сложно всё у вас.

ждем логи...
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38869141
КактузС недавних пор возникла ситуация, когда pgq создает дублирующие батчи. Имеются ввиду батчи с разными batch_id, но содержащие идентичные ивенты.

Консумер читает батч и процессит ивенты поодному, вызывая pgq_ext.set_event_done() после обработки каждого ивента( и конечно проверяет is_even_done перед обработкой). После обработки всех ивентов батча, консумер выполняет finish_batch() и берет следующий.

Когда возникает проблема, консумер обрабатывает все ивенты батча 1, делает finish_batch(), finish_batch говорит "WARNING: finish_batch: batch 1 not found", после этого консумер берет батч 2, который содержит все ивенты, уже обработанные в батче 1( тот же event_id, тот же пейлоад)

Сталкивался ли кто то с такой ситуацией?

кактус, вы исходник-то читали?

Код: plsql
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
-- Function: pgq.finish_batch(bigint)

-- DROP FUNCTION pgq.finish_batch(bigint);

CREATE OR REPLACE FUNCTION pgq.finish_batch(x_batch_id bigint)
  RETURNS integer AS
$BODY$
-- ----------------------------------------------------------------------
-- Function: pgq.finish_batch(1)
--
--      Closes a batch.  No more operations can be done with events
--      of this batch.
--
-- Parameters:
--      x_batch_id      - id of batch.
--
-- Returns:
--      1 if batch was found, 0 otherwise.
-- Calls:
--      None
-- Tables directly manipulated:
--      update - pgq.subscription
-- ----------------------------------------------------------------------
begin
    update pgq.subscription
        set sub_active = now(),
            sub_last_tick = sub_next_tick,
            sub_next_tick = null,
            sub_batch = null
        where sub_batch = x_batch_id;
    if not found then
        raise warning 'finish_batch: batch % not found', x_batch_id;
        return 0;
    end if;

    return 1;
end;
$BODY$
  LANGUAGE plpgsql VOLATILE SECURITY DEFINER
  COST 100;
ALTER FUNCTION pgq.finish_batch(bigint)
  OWNER TO postgres;




-- откуда мораль, вам просто нечего тут апдейтить ( в pgq.subscription нет батча where sub_batch = x_batch_id;).
Почему это проиходит -- вопрос к вам, а не к pgq.

я например в вашем псевдокоде вижу, что вы берете pgq.get_next_batch()
а там, битым словом, (через pgq.next_batch_info<<--pgq.next_batch_custom) -- тащится именно pgq.subscription.sub_batch

Код: sql
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
-- Function: pgq.next_batch_custom(text, text, interval, integer, interval)

-- DROP FUNCTION pgq.next_batch_custom(text, text, interval, integer, interval);

CREATE OR REPLACE FUNCTION pgq.next_batch_custom(IN i_queue_name text, IN i_consumer_name text, IN i_min_lag interval, IN i_min_count integer, IN i_min_interval interval, OUT batch_id bigint, OUT cur_tick_id bigint, OUT prev_tick_id bigint, OUT cur_tick_time timestamp with time zone, OUT prev_tick_time timestamp with time zone, OUT cur_tick_event_seq bigint, OUT prev_tick_event_seq bigint)
  RETURNS record AS
$BODY$
-- ----------------------------------------------------------------------
-- Function: pgq.next_batch_custom(5)
--
--      Makes next block of events active.  Block size can be tuned
--      with i_min_count, i_min_interval parameters.  Events age can
--      be tuned with i_min_lag.
--
--      If it returns NULL, there is no events available in queue.
--      Consumer should sleep then.
--
--      The values from event_id sequence may give hint how big the
--      batch may be.  But they are inexact, they do not give exact size.
--      Client *MUST NOT* use them to detect whether the batch contains any
--      events at all - the values are unfit for that purpose.
--
-- Note:
--      i_min_lag together with i_min_interval/i_min_count is inefficient.
--
-- Parameters:
--      i_queue_name        - Name of the queue
--      i_consumer_name     - Name of the consumer
--      i_min_lag           - Consumer wants events older than that
--      i_min_count         - Consumer wants batch to contain at least this many events
--      i_min_interval      - Consumer wants batch to cover at least this much time
--
-- Returns:
--      batch_id            - Batch ID or NULL if there are no more events available.
--      cur_tick_id         - End tick id.
--      cur_tick_time       - End tick time.
--      cur_tick_event_seq  - Value from event id sequence at the time tick was issued.
--      prev_tick_id        - Start tick id.
--      prev_tick_time      - Start tick time.
--      prev_tick_event_seq - value from event id sequence at the time tick was issued.
-- Calls:
--      pgq.insert_event_raw(11)
-- Tables directly manipulated:
--      update - pgq.subscription
-- ----------------------------------------------------------------------
declare
    errmsg          text;
    queue_id        integer;
    sub_id          integer;
    cons_id         integer;
begin
    select s.sub_queue, s.sub_consumer, s.sub_id, s.sub_batch,
            t1.tick_id, t1.tick_time, t1.tick_event_seq,
            t2.tick_id, t2.tick_time, t2.tick_event_seq
        into queue_id, cons_id, sub_id, batch_id,
             prev_tick_id, prev_tick_time, prev_tick_event_seq,
             cur_tick_id, cur_tick_time, cur_tick_event_seq
        from pgq.consumer c,
             pgq.queue q,
             pgq.subscription s
             left join pgq.tick t1
                on (t1.tick_queue = s.sub_queue
                    and t1.tick_id = s.sub_last_tick)
             left join pgq.tick t2
                on (t2.tick_queue = s.sub_queue
                    and t2.tick_id = s.sub_next_tick)
        where q.queue_name = i_queue_name
          and c.co_name = i_consumer_name
          and s.sub_queue = q.queue_id
          and s.sub_consumer = c.co_id;
    if not found then
        errmsg := 'Not subscriber to queue: '
            || coalesce(i_queue_name, 'NULL')
            || '/'
            || coalesce(i_consumer_name, 'NULL');
        raise exception '%', errmsg;
    end if;

    -- sanity check
    if prev_tick_id is null then
        raise exception 'PgQ corruption: Consumer % on queue % does not see tick %', i_consumer_name, i_queue_name, prev_tick_id;
    end if;

    -- has already active batch
    if batch_id is not null then
        return;
    end if;

    if i_min_interval is null and i_min_count is null then
        -- find next tick
        select tick_id, tick_time, tick_event_seq
            into cur_tick_id, cur_tick_time, cur_tick_event_seq
            from pgq.tick
            where tick_id > prev_tick_id
              and tick_queue = queue_id
            order by tick_queue asc, tick_id asc
            limit 1;
    else
        -- find custom tick
        select next_tick_id, next_tick_time, next_tick_seq
          into cur_tick_id, cur_tick_time, cur_tick_event_seq
          from pgq.find_tick_helper(queue_id, prev_tick_id,
                                    prev_tick_time, prev_tick_event_seq,
                                    i_min_count, i_min_interval);
    end if;

    if i_min_lag is not null then
        -- enforce min lag
        if now() - cur_tick_time < i_min_lag then
            cur_tick_id := NULL;
            cur_tick_time := NULL;
            cur_tick_event_seq := NULL;
        end if;
    end if;

    if cur_tick_id is null then
        -- nothing to do
        prev_tick_id := null;
        prev_tick_time := null;
        prev_tick_event_seq := null;
        return;
    end if;

    -- get next batch
    batch_id := nextval('pgq.batch_id_seq');
    update pgq.subscription
        set sub_batch = batch_id,
            sub_next_tick = cur_tick_id,
            sub_active = now()
        where sub_queue = queue_id
          and sub_consumer = cons_id;
    return;
end;
$BODY$
  LANGUAGE plpgsql VOLATILE SECURITY DEFINER
  COST 100;
ALTER FUNCTION pgq.next_batch_custom(text, text, interval, integer, interval)
  OWNER TO postgres;



-- откуда ровно 2 3 версии --

1. где-то в середине между {=pgq.get_next_batch()} и {pgq.finish_batch()} вы прибиваете (дропаете, или апдейтите ему id) этот sub_batch = x_batch_id; в pgq.subscripton-е. [маловероятно, но вчитайтесь в process()]

2. вы читаете незакоммиченный next_batch в одной транзакции (так и незакомиченной|или rollback-нутой), а финишируете - в другой (где тот-же батч не виден в subscription). [наиболее вероятно, но вы божитесь, что это не так]

3. вы, в прикладухе (см. ваш код), теряете значние x_batch_id, и передаете в финиш то, чего там нет и не было на момент get_.

т.ч. посмотрите на ВАШ код внимательнее -- кто-то пришибает вам батч [или портит его id] между get и finish.
есть фантазия 4. -- вы берете батч в одной базульке, а пытаетесь финишировать в другой, но у вас якобы нет второй бд
как-то так.

ах, да, -- версия 5.: у вас множественные обработчики очереди. И один из них успевает отработать батч (а pgq успевает от него избавиться в subscription) -- и только тут второй обработчик той же очереди подходит к финишу
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38869619
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
интересуюсь-- откуда ровно 2 3 версии --

1. где-то в середине между {=pgq.get_next_batch()} и {pgq.finish_batch()} вы прибиваете (дропаете, или апдейтите ему id) этот sub_batch = x_batch_id; в pgq.subscripton-е. [маловероятно, но вчитайтесь в process()]

2. вы читаете незакоммиченный next_batch в одной транзакции (так и незакомиченной|или rollback-нутой), а финишируете - в другой (где тот-же батч не виден в subscription). [наиболее вероятно, но вы божитесь, что это не так]

3. вы, в прикладухе (см. ваш код), теряете значние x_batch_id, и передаете в финиш то, чего там нет и не было на момент get_.

т.ч. посмотрите на ВАШ код внимательнее -- кто-то пришибает вам батч [или портит его id] между get и finish.
есть фантазия 4. -- вы берете батч в одной базульке, а пытаетесь финишировать в другой, но у вас якобы нет второй бд
как-то так.

ах, да, -- версия 5.: у вас множественные обработчики очереди. И один из них успевает отработать батч (а pgq успевает от него избавиться в subscription) -- и только тут второй обработчик той же очереди подходит к финишу

Исходники я читал. Ваши версии очевидны, они выше уже обсуждались и были проверены. Приложение-консумер точно не портит pgq.subscripton, мы уже копаем в сторону других клиентов базы(мне эта версия кажется наиболее вероятной) ну и идем в сторону "записать гору SQL логов с продакшна". Когда это получится, надеюсь что-то будет видно. Сообщу о результатах.
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38869640
КактузПриложение-консумер точно не портит pgq.subscripton<>.этого мало.
у вас есть защита от запуска 2-х (и более) экземляров консумера ?

//"они оба не портят", но второй экземпляр будет именно так ругаться -- если до него первый отфиниширует,и батч будет почикан из pgq.subscription
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38869751
Фотография Misha Tyurin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Кактуз,

Зачем гору логов ?? Сделайте логи только на сессии работы с pgq
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38870468
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Misha TyurinКактуз,

Зачем гору логов ?? Сделайте логи только на сессии работы с pgq

Сессия работы с pgq - это и есть гора логов. Проблема редко возникает.
...
Рейтинг: 0 / 0
32 сообщений из 32, показаны все 2 страниц
Форумы / PostgreSQL [игнор отключен] [закрыт для гостей] / Дублирование ивентов PgQ
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]