powered by simpleCommunicator - 2.0.61     © 2026 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / PostgreSQL [игнор отключен] [закрыт для гостей] / Дублирование ивентов PgQ
7 сообщений из 32, страница 2 из 2
Дублирование ивентов PgQ
    #38868776
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Misha TyurinSELECT pgq.next_batch('routeserver', 'routeserver')
SELECT pgq.next_batch('routeserver', 'routeserver')
SELECT * FROM pgq.get_batch_events(185641) WHERE pgq_ext.is_event_done('routeserver', 185641, ev_id) = false ORDER BY ev_id
SELECT pgq.finish_batch(185641)
SELECT pgq.next_batch('routeserver', 'routeserver')
SELECT pgq.next_batch('routeserver', 'routeserver')


вот это тоже у вас какой-то интересный пример. это реальный лог?
где обработка то событий? или вы её вы просто не привели?

батч айди вернулся, а события уже все были отработаны?

Это реальный лог. Я уже не в первый раз пишу, что обработка это отсылка http запросов на удаленный сервер. Происходит она между

Код: sql
1.
SELECT * FROM pgq.get_batch_events(185641) WHERE pgq_ext.is_event_done('routeserver', 185641, ev_id) = false ORDER BY ev_id


и
Код: sql
1.
SELECT pgq.finish_batch(185641)


Естественно что в логи постгреса не пишется ничего связанного с http.
set_event_done() тут не используется, тк в этом батче был только один евент. В случае когда в одном батче много евентов, используется set_event_done()
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38868802
Фотография Misha Tyurin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Кактуз,

> set_event_done() тут не используется, тк в этом батче был только один евент
чет как-то сложно всё у вас.

ждем логи...
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38869141
КактузС недавних пор возникла ситуация, когда pgq создает дублирующие батчи. Имеются ввиду батчи с разными batch_id, но содержащие идентичные ивенты.

Консумер читает батч и процессит ивенты поодному, вызывая pgq_ext.set_event_done() после обработки каждого ивента( и конечно проверяет is_even_done перед обработкой). После обработки всех ивентов батча, консумер выполняет finish_batch() и берет следующий.

Когда возникает проблема, консумер обрабатывает все ивенты батча 1, делает finish_batch(), finish_batch говорит "WARNING: finish_batch: batch 1 not found", после этого консумер берет батч 2, который содержит все ивенты, уже обработанные в батче 1( тот же event_id, тот же пейлоад)

Сталкивался ли кто то с такой ситуацией?

кактус, вы исходник-то читали?

Код: plsql
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
-- Function: pgq.finish_batch(bigint)

-- DROP FUNCTION pgq.finish_batch(bigint);

CREATE OR REPLACE FUNCTION pgq.finish_batch(x_batch_id bigint)
  RETURNS integer AS
$BODY$
-- ----------------------------------------------------------------------
-- Function: pgq.finish_batch(1)
--
--      Closes a batch.  No more operations can be done with events
--      of this batch.
--
-- Parameters:
--      x_batch_id      - id of batch.
--
-- Returns:
--      1 if batch was found, 0 otherwise.
-- Calls:
--      None
-- Tables directly manipulated:
--      update - pgq.subscription
-- ----------------------------------------------------------------------
begin
    update pgq.subscription
        set sub_active = now(),
            sub_last_tick = sub_next_tick,
            sub_next_tick = null,
            sub_batch = null
        where sub_batch = x_batch_id;
    if not found then
        raise warning 'finish_batch: batch % not found', x_batch_id;
        return 0;
    end if;

    return 1;
end;
$BODY$
  LANGUAGE plpgsql VOLATILE SECURITY DEFINER
  COST 100;
ALTER FUNCTION pgq.finish_batch(bigint)
  OWNER TO postgres;




-- откуда мораль, вам просто нечего тут апдейтить ( в pgq.subscription нет батча where sub_batch = x_batch_id;).
Почему это проиходит -- вопрос к вам, а не к pgq.

я например в вашем псевдокоде вижу, что вы берете pgq.get_next_batch()
а там, битым словом, (через pgq.next_batch_info<<--pgq.next_batch_custom) -- тащится именно pgq.subscription.sub_batch

Код: sql
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
-- Function: pgq.next_batch_custom(text, text, interval, integer, interval)

-- DROP FUNCTION pgq.next_batch_custom(text, text, interval, integer, interval);

CREATE OR REPLACE FUNCTION pgq.next_batch_custom(IN i_queue_name text, IN i_consumer_name text, IN i_min_lag interval, IN i_min_count integer, IN i_min_interval interval, OUT batch_id bigint, OUT cur_tick_id bigint, OUT prev_tick_id bigint, OUT cur_tick_time timestamp with time zone, OUT prev_tick_time timestamp with time zone, OUT cur_tick_event_seq bigint, OUT prev_tick_event_seq bigint)
  RETURNS record AS
$BODY$
-- ----------------------------------------------------------------------
-- Function: pgq.next_batch_custom(5)
--
--      Makes next block of events active.  Block size can be tuned
--      with i_min_count, i_min_interval parameters.  Events age can
--      be tuned with i_min_lag.
--
--      If it returns NULL, there is no events available in queue.
--      Consumer should sleep then.
--
--      The values from event_id sequence may give hint how big the
--      batch may be.  But they are inexact, they do not give exact size.
--      Client *MUST NOT* use them to detect whether the batch contains any
--      events at all - the values are unfit for that purpose.
--
-- Note:
--      i_min_lag together with i_min_interval/i_min_count is inefficient.
--
-- Parameters:
--      i_queue_name        - Name of the queue
--      i_consumer_name     - Name of the consumer
--      i_min_lag           - Consumer wants events older than that
--      i_min_count         - Consumer wants batch to contain at least this many events
--      i_min_interval      - Consumer wants batch to cover at least this much time
--
-- Returns:
--      batch_id            - Batch ID or NULL if there are no more events available.
--      cur_tick_id         - End tick id.
--      cur_tick_time       - End tick time.
--      cur_tick_event_seq  - Value from event id sequence at the time tick was issued.
--      prev_tick_id        - Start tick id.
--      prev_tick_time      - Start tick time.
--      prev_tick_event_seq - value from event id sequence at the time tick was issued.
-- Calls:
--      pgq.insert_event_raw(11)
-- Tables directly manipulated:
--      update - pgq.subscription
-- ----------------------------------------------------------------------
declare
    errmsg          text;
    queue_id        integer;
    sub_id          integer;
    cons_id         integer;
begin
    select s.sub_queue, s.sub_consumer, s.sub_id, s.sub_batch,
            t1.tick_id, t1.tick_time, t1.tick_event_seq,
            t2.tick_id, t2.tick_time, t2.tick_event_seq
        into queue_id, cons_id, sub_id, batch_id,
             prev_tick_id, prev_tick_time, prev_tick_event_seq,
             cur_tick_id, cur_tick_time, cur_tick_event_seq
        from pgq.consumer c,
             pgq.queue q,
             pgq.subscription s
             left join pgq.tick t1
                on (t1.tick_queue = s.sub_queue
                    and t1.tick_id = s.sub_last_tick)
             left join pgq.tick t2
                on (t2.tick_queue = s.sub_queue
                    and t2.tick_id = s.sub_next_tick)
        where q.queue_name = i_queue_name
          and c.co_name = i_consumer_name
          and s.sub_queue = q.queue_id
          and s.sub_consumer = c.co_id;
    if not found then
        errmsg := 'Not subscriber to queue: '
            || coalesce(i_queue_name, 'NULL')
            || '/'
            || coalesce(i_consumer_name, 'NULL');
        raise exception '%', errmsg;
    end if;

    -- sanity check
    if prev_tick_id is null then
        raise exception 'PgQ corruption: Consumer % on queue % does not see tick %', i_consumer_name, i_queue_name, prev_tick_id;
    end if;

    -- has already active batch
    if batch_id is not null then
        return;
    end if;

    if i_min_interval is null and i_min_count is null then
        -- find next tick
        select tick_id, tick_time, tick_event_seq
            into cur_tick_id, cur_tick_time, cur_tick_event_seq
            from pgq.tick
            where tick_id > prev_tick_id
              and tick_queue = queue_id
            order by tick_queue asc, tick_id asc
            limit 1;
    else
        -- find custom tick
        select next_tick_id, next_tick_time, next_tick_seq
          into cur_tick_id, cur_tick_time, cur_tick_event_seq
          from pgq.find_tick_helper(queue_id, prev_tick_id,
                                    prev_tick_time, prev_tick_event_seq,
                                    i_min_count, i_min_interval);
    end if;

    if i_min_lag is not null then
        -- enforce min lag
        if now() - cur_tick_time < i_min_lag then
            cur_tick_id := NULL;
            cur_tick_time := NULL;
            cur_tick_event_seq := NULL;
        end if;
    end if;

    if cur_tick_id is null then
        -- nothing to do
        prev_tick_id := null;
        prev_tick_time := null;
        prev_tick_event_seq := null;
        return;
    end if;

    -- get next batch
    batch_id := nextval('pgq.batch_id_seq');
    update pgq.subscription
        set sub_batch = batch_id,
            sub_next_tick = cur_tick_id,
            sub_active = now()
        where sub_queue = queue_id
          and sub_consumer = cons_id;
    return;
end;
$BODY$
  LANGUAGE plpgsql VOLATILE SECURITY DEFINER
  COST 100;
ALTER FUNCTION pgq.next_batch_custom(text, text, interval, integer, interval)
  OWNER TO postgres;



-- откуда ровно 2 3 версии --

1. где-то в середине между {=pgq.get_next_batch()} и {pgq.finish_batch()} вы прибиваете (дропаете, или апдейтите ему id) этот sub_batch = x_batch_id; в pgq.subscripton-е. [маловероятно, но вчитайтесь в process()]

2. вы читаете незакоммиченный next_batch в одной транзакции (так и незакомиченной|или rollback-нутой), а финишируете - в другой (где тот-же батч не виден в subscription). [наиболее вероятно, но вы божитесь, что это не так]

3. вы, в прикладухе (см. ваш код), теряете значние x_batch_id, и передаете в финиш то, чего там нет и не было на момент get_.

т.ч. посмотрите на ВАШ код внимательнее -- кто-то пришибает вам батч [или портит его id] между get и finish.
есть фантазия 4. -- вы берете батч в одной базульке, а пытаетесь финишировать в другой, но у вас якобы нет второй бд
как-то так.

ах, да, -- версия 5.: у вас множественные обработчики очереди. И один из них успевает отработать батч (а pgq успевает от него избавиться в subscription) -- и только тут второй обработчик той же очереди подходит к финишу
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38869619
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
интересуюсь-- откуда ровно 2 3 версии --

1. где-то в середине между {=pgq.get_next_batch()} и {pgq.finish_batch()} вы прибиваете (дропаете, или апдейтите ему id) этот sub_batch = x_batch_id; в pgq.subscripton-е. [маловероятно, но вчитайтесь в process()]

2. вы читаете незакоммиченный next_batch в одной транзакции (так и незакомиченной|или rollback-нутой), а финишируете - в другой (где тот-же батч не виден в subscription). [наиболее вероятно, но вы божитесь, что это не так]

3. вы, в прикладухе (см. ваш код), теряете значние x_batch_id, и передаете в финиш то, чего там нет и не было на момент get_.

т.ч. посмотрите на ВАШ код внимательнее -- кто-то пришибает вам батч [или портит его id] между get и finish.
есть фантазия 4. -- вы берете батч в одной базульке, а пытаетесь финишировать в другой, но у вас якобы нет второй бд
как-то так.

ах, да, -- версия 5.: у вас множественные обработчики очереди. И один из них успевает отработать батч (а pgq успевает от него избавиться в subscription) -- и только тут второй обработчик той же очереди подходит к финишу

Исходники я читал. Ваши версии очевидны, они выше уже обсуждались и были проверены. Приложение-консумер точно не портит pgq.subscripton, мы уже копаем в сторону других клиентов базы(мне эта версия кажется наиболее вероятной) ну и идем в сторону "записать гору SQL логов с продакшна". Когда это получится, надеюсь что-то будет видно. Сообщу о результатах.
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38869640
КактузПриложение-консумер точно не портит pgq.subscripton<>.этого мало.
у вас есть защита от запуска 2-х (и более) экземляров консумера ?

//"они оба не портят", но второй экземпляр будет именно так ругаться -- если до него первый отфиниширует,и батч будет почикан из pgq.subscription
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38869751
Фотография Misha Tyurin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Кактуз,

Зачем гору логов ?? Сделайте логи только на сессии работы с pgq
...
Рейтинг: 0 / 0
Дублирование ивентов PgQ
    #38870468
Кактуз
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Misha TyurinКактуз,

Зачем гору логов ?? Сделайте логи только на сессии работы с pgq

Сессия работы с pgq - это и есть гора логов. Проблема редко возникает.
...
Рейтинг: 0 / 0
7 сообщений из 32, страница 2 из 2
Форумы / PostgreSQL [игнор отключен] [закрыт для гостей] / Дублирование ивентов PgQ
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]