powered by simpleCommunicator - 2.0.61     © 2026 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / Java [игнор отключен] [закрыт для гостей] / kafka - гарантированная обработка всех сообщений и только один раз
13 сообщений из 13, страница 1 из 1
kafka - гарантированная обработка всех сообщений и только один раз
    #39530699
questioner
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Читаю вот эту статью:

https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1

Тут решается проблема того, чтобы сохранять в базу результат обработки сообщений и оффсет для топика атомарно. Для этой цели решено хранить оффсет в базе и записывать в одной транзакции результат обработки сообщений и оффсет для топика.

В общем-то идея понятна, но код как-то не очень:

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
public class SaveOffsetsOnRebalance implements
  ConsumerRebalanceListener {

    public void onPartitionsRevoked(Collection<TopicPartition>
      partitions) {
                commitDBTransaction(); 1
        }

    public void onPartitionsAssigned(Collection<TopicPartition>
      partitions) {
        for(TopicPartition partition: partitions)
            consumer.seek(partition, getOffsetFromDB(partition)); 2
       }
   }
}


  consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
  consumer.poll(0);

  for (TopicPartition partition: consumer.assignment())
    consumer.seek(partition, getOffsetFromDB(partition));   3

  while (true) {
      ConsumerRecords<String, String> records =
        consumer.poll(100);
      for (ConsumerRecord<String, String> record : records)
      {
          processRecord(record);
          storeRecordInDB(record);
          storeOffsetInDB(record.topic(), record.partition(),
            record.offset()); 4
      }
      commitDBTransaction();
  }



https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1 1
We use an imaginary method here to commit the transaction in the database. The idea here is that the database records and offsets will be inserted to the database as we process the records, and we just need to commit the transactions when we are about to lose the partition to make sure this information is persisted.

2
We also have an imaginary method to fetch the offsets from the database, and then we seek() to those records when we get ownership of new partitions.

3
When the consumer first starts, after we subscribe to topics, we call poll() once to make sure we join a consumer group and get assigned partitions, and then we immediately seek() to the correct offset in the partitions we are assigned to. Keep in mind that seek() only updates the position we are consuming from, so the next poll() will fetch the right messages. If there was an error in seek() (e.g., the offset does not exist), the exception will be thrown by poll().

4
Another imaginary method: this time we update a table storing the offsets in our database. Here we assume that updating records is fast, so we do an update on every record, but commits are slow, so we only commit at the end of the batch. However, this can be optimized in different ways.

SaveOffsetsOnRebalance


я так понял onPartitionsRevoked вызывается когда один из партишенов этого листенера уходит к другому консумеру при ребалансе, ну или вообще его удалили

onPartitionsAssigned - вызывается когда этому консумеру назначили партишен


Ок commitDBTransaction() - коммитит в базу все результаты которые мы обработали а также оффсет.


consumer.seek выставляет консумеру оффсет для партишена.

Я правильно понял как это работает?
...
Рейтинг: 0 / 0
kafka - гарантированная обработка всех сообщений и только один раз
    #39530706
questioner
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
while (true) {
      ConsumerRecords<String, String> records =
        consumer.poll(100);
      for (ConsumerRecord<String, String> record : records)
      {
          processRecord(record);
          storeRecordInDB(record);
          storeOffsetInDB(record.topic(), record.partition(),
            record.offset()); 4
      }
      commitDBTransaction();
  }


Ну и считается, что если внутри for мы вывалились, то транзакция не закоммитится?
...
Рейтинг: 0 / 0
kafka - гарантированная обработка всех сообщений и только один раз
    #39530769
vimba
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
questioner,

в приведенном алгоритме дублируется функционал поциционирования консумеров, пункт 3 выглядит лишним(как и предшествующий ему consumer.poll(0);), потому что onPartitionsAssigned выполняется всегда в том числе при первоначальной подписке. Конечно к логическим ошибкам это дублирование не приводит, так как дублируется выполнение идемпотентной операции. Всегда расставляйте логи в ключевые места консумера, чтобы понимать что происходит.
...
Рейтинг: 0 / 0
kafka - гарантированная обработка всех сообщений и только один раз
    #39530786
questioner
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
vimba,

https://docs.confluent.io/2.0.1/clients/javadocs/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html
авторA callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the consumer changes.
This is applicable when the consumer is having Kafka auto-manage group membership. If the consumer's directly assign partitions, those partitions will never be reassigned and this callback is not applicable.
...
Рейтинг: 0 / 0
kafka - гарантированная обработка всех сообщений и только один раз
    #39530789
questioner
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
vimba,

А зачем делать poll а потом seek?

Особенно непонятно почему игнорится возвращаемое значение poll
...
Рейтинг: 0 / 0
kafka - гарантированная обработка всех сообщений и только один раз
    #39530827
vimba
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
questioner,

Если расшифрововать то как задумывал автор книги, то результат первого пола мы выбрасываем, потому что читали с неизвестного офсета, после того как первый пол закончился нам назначены партиции, и мы перемещаем консумера на сохраненные офсеты.

Кстате не воспринимайте то, что написано в книге как этолон, в напечатанном концепте еще много недосказанного, например что будет если консумер лежал так долго, что часть месседжей уже потерлось. Незаметил в главе ничего про свойство "auto.offset.reset", оно по дефолту равно "latest", что приведет в такой ситуации к тому что, при отсутсвии месседжей по требуему офсету консумер будет позиционирован на конец партиции, и продолбает вообще все месседжи которые еще остались в партиции. Я вам настоятельно рекомендую как минмимум прочитать официальную пользовательскую документацию , описание протокола , и опциоанльно другие статьи с девелоперской вики. Книги конечно хорошая штука, но они больше нужны для слабодокументированных вещей таких как springframework или hibernate, которые хрен поймешь, пока исходный код не прочитаешь.
...
Рейтинг: 0 / 0
kafka - гарантированная обработка всех сообщений и только один раз
    #39530857
vimba
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
questionervimba,

https://docs.confluent.io/2.0.1/clients/javadocs/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html
авторA callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the consumer changes.
This is applicable when the consumer is having Kafka auto-manage group membership. If the consumer's directly assign partitions, those partitions will never be reassigned and this callback is not applicable.

А в приведенном примере консумер не назначает себе партиций, он только позиционируется на тех партициях что ему уже выданны.
...
Рейтинг: 0 / 0
kafka - гарантированная обработка всех сообщений и только один раз
    #39530976
questioner
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
vimba,

Понял, спасибо.

Понимаю, что в книге может быть не всё, но я пока что общие принципы хочу понять. Реальной задачи нет, просто есть свободное время поизучать.
...
Рейтинг: 0 / 0
kafka - гарантированная обработка всех сообщений и только один раз
    #39531073
questioner
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
vimbaquestioner,

Если расшифрововать то как задумывал автор книги, то результат первого пола мы выбрасываем, потому что читали с неизвестного офсета, после того как первый пол закончился нам назначены партиции, и мы перемещаем консумера на сохраненные офсеты.


А зачем его вообще брать это poll если мы его выбрасываем?

Ну то есть если просто его убрать то что то будет работать не так?

Мы же сначала делаем seek и только потом poll ?
...
Рейтинг: 0 / 0
kafka - гарантированная обработка всех сообщений и только один раз
    #39531117
vimba
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
questioner,

Я о том же и написал в первом посте здесь, что делается ненужная работа.

Бинго, я кажется понял. Дело в том, что джавадоки к методу ConsumerRebalanceListener#onPartitionsAssigned противоречат джавадокам в шапке интерфейса ConsumerRebalanceListener, реальному поведению и имени метода. Реальное поведение соответсвует описанию в шапке и названию метода, то есть onPartitionsAssigned дергается при любом ассайне партиций на консумера.

А из джавадоков к методу onPartitionsAssigned любой разумный человек может сделать вывод, что onPartitionsAssigned дергается только после каких-то изменений в топологии
ConsumerRebalanceListener javadocsA callback method the user can implement to provide handling of customized offsets on completion of a successful partition re-assignment.
Но тогда метод должен был, бы называться onPartitions Re assigned, авторов Kafka: The Definitive Guide такие противоречия либо не смутили, либо они их просто не заметили.
...
Рейтинг: 0 / 0
kafka - гарантированная обработка всех сообщений и только один раз
    #39531121
vimba
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
С учетом того, что дублируется идемпотентная операция, которая при повторении ничего не ломает, то ошибку трудно обнаружить, без применения качественного логирования.
...
Рейтинг: 0 / 0
kafka - гарантированная обработка всех сообщений и только один раз
    #39531143
questioner
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
vimba,

Вроде система то уже популярная, а противоречия как в чем-то сыром
...
Рейтинг: 0 / 0
kafka - гарантированная обработка всех сообщений и только один раз
    #39531144
questioner
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
vimba,

Но всё же безотносительно всего вышесказанного


Код: java
1.
2.
3.
4.
5.
 consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
  consumer.poll(0);

  for (TopicPartition partition: consumer.assignment())
    consumer.seek(partition, getOffsetFromDB(partition));   3



poll тут бесполезен абсолютно?
...
Рейтинг: 0 / 0
13 сообщений из 13, страница 1 из 1
Форумы / Java [игнор отключен] [закрыт для гостей] / kafka - гарантированная обработка всех сообщений и только один раз
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]