Гость
Целевая тема:
Создать новую тему:
Автор:
Форумы / Java [игнор отключен] [закрыт для гостей] / kafka - гарантированная обработка всех сообщений и только один раз / 13 сообщений из 13, страница 1 из 1
04.10.2017, 12:41
    #39530699
questioner
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
kafka - гарантированная обработка всех сообщений и только один раз
Читаю вот эту статью:

https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1

Тут решается проблема того, чтобы сохранять в базу результат обработки сообщений и оффсет для топика атомарно. Для этой цели решено хранить оффсет в базе и записывать в одной транзакции результат обработки сообщений и оффсет для топика.

В общем-то идея понятна, но код как-то не очень:

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
public class SaveOffsetsOnRebalance implements
  ConsumerRebalanceListener {

    public void onPartitionsRevoked(Collection<TopicPartition>
      partitions) {
                commitDBTransaction(); 1
        }

    public void onPartitionsAssigned(Collection<TopicPartition>
      partitions) {
        for(TopicPartition partition: partitions)
            consumer.seek(partition, getOffsetFromDB(partition)); 2
       }
   }
}


  consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
  consumer.poll(0);

  for (TopicPartition partition: consumer.assignment())
    consumer.seek(partition, getOffsetFromDB(partition));   3

  while (true) {
      ConsumerRecords<String, String> records =
        consumer.poll(100);
      for (ConsumerRecord<String, String> record : records)
      {
          processRecord(record);
          storeRecordInDB(record);
          storeOffsetInDB(record.topic(), record.partition(),
            record.offset()); 4
      }
      commitDBTransaction();
  }



https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1 1
We use an imaginary method here to commit the transaction in the database. The idea here is that the database records and offsets will be inserted to the database as we process the records, and we just need to commit the transactions when we are about to lose the partition to make sure this information is persisted.

2
We also have an imaginary method to fetch the offsets from the database, and then we seek() to those records when we get ownership of new partitions.

3
When the consumer first starts, after we subscribe to topics, we call poll() once to make sure we join a consumer group and get assigned partitions, and then we immediately seek() to the correct offset in the partitions we are assigned to. Keep in mind that seek() only updates the position we are consuming from, so the next poll() will fetch the right messages. If there was an error in seek() (e.g., the offset does not exist), the exception will be thrown by poll().

4
Another imaginary method: this time we update a table storing the offsets in our database. Here we assume that updating records is fast, so we do an update on every record, but commits are slow, so we only commit at the end of the batch. However, this can be optimized in different ways.

SaveOffsetsOnRebalance


я так понял onPartitionsRevoked вызывается когда один из партишенов этого листенера уходит к другому консумеру при ребалансе, ну или вообще его удалили

onPartitionsAssigned - вызывается когда этому консумеру назначили партишен


Ок commitDBTransaction() - коммитит в базу все результаты которые мы обработали а также оффсет.


consumer.seek выставляет консумеру оффсет для партишена.

Я правильно понял как это работает?
...
Рейтинг: 0 / 0
04.10.2017, 12:48
    #39530706
questioner
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
kafka - гарантированная обработка всех сообщений и только один раз
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
while (true) {
      ConsumerRecords<String, String> records =
        consumer.poll(100);
      for (ConsumerRecord<String, String> record : records)
      {
          processRecord(record);
          storeRecordInDB(record);
          storeOffsetInDB(record.topic(), record.partition(),
            record.offset()); 4
      }
      commitDBTransaction();
  }


Ну и считается, что если внутри for мы вывалились, то транзакция не закоммитится?
...
Рейтинг: 0 / 0
04.10.2017, 14:24
    #39530769
vimba
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
kafka - гарантированная обработка всех сообщений и только один раз
questioner,

в приведенном алгоритме дублируется функционал поциционирования консумеров, пункт 3 выглядит лишним(как и предшествующий ему consumer.poll(0);), потому что onPartitionsAssigned выполняется всегда в том числе при первоначальной подписке. Конечно к логическим ошибкам это дублирование не приводит, так как дублируется выполнение идемпотентной операции. Всегда расставляйте логи в ключевые места консумера, чтобы понимать что происходит.
...
Рейтинг: 0 / 0
04.10.2017, 14:40
    #39530786
questioner
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
kafka - гарантированная обработка всех сообщений и только один раз
vimba,

https://docs.confluent.io/2.0.1/clients/javadocs/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html
авторA callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the consumer changes.
This is applicable when the consumer is having Kafka auto-manage group membership. If the consumer's directly assign partitions, those partitions will never be reassigned and this callback is not applicable.
...
Рейтинг: 0 / 0
04.10.2017, 14:43
    #39530789
questioner
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
kafka - гарантированная обработка всех сообщений и только один раз
vimba,

А зачем делать poll а потом seek?

Особенно непонятно почему игнорится возвращаемое значение poll
...
Рейтинг: 0 / 0
04.10.2017, 15:14
    #39530827
vimba
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
kafka - гарантированная обработка всех сообщений и только один раз
questioner,

Если расшифрововать то как задумывал автор книги, то результат первого пола мы выбрасываем, потому что читали с неизвестного офсета, после того как первый пол закончился нам назначены партиции, и мы перемещаем консумера на сохраненные офсеты.

Кстате не воспринимайте то, что написано в книге как этолон, в напечатанном концепте еще много недосказанного, например что будет если консумер лежал так долго, что часть месседжей уже потерлось. Незаметил в главе ничего про свойство "auto.offset.reset", оно по дефолту равно "latest", что приведет в такой ситуации к тому что, при отсутсвии месседжей по требуему офсету консумер будет позиционирован на конец партиции, и продолбает вообще все месседжи которые еще остались в партиции. Я вам настоятельно рекомендую как минмимум прочитать официальную пользовательскую документацию , описание протокола , и опциоанльно другие статьи с девелоперской вики. Книги конечно хорошая штука, но они больше нужны для слабодокументированных вещей таких как springframework или hibernate, которые хрен поймешь, пока исходный код не прочитаешь.
...
Рейтинг: 0 / 0
04.10.2017, 15:40
    #39530857
vimba
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
kafka - гарантированная обработка всех сообщений и только один раз
questionervimba,

https://docs.confluent.io/2.0.1/clients/javadocs/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html
авторA callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the consumer changes.
This is applicable when the consumer is having Kafka auto-manage group membership. If the consumer's directly assign partitions, those partitions will never be reassigned and this callback is not applicable.

А в приведенном примере консумер не назначает себе партиций, он только позиционируется на тех партициях что ему уже выданны.
...
Рейтинг: 0 / 0
04.10.2017, 17:38
    #39530976
questioner
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
kafka - гарантированная обработка всех сообщений и только один раз
vimba,

Понял, спасибо.

Понимаю, что в книге может быть не всё, но я пока что общие принципы хочу понять. Реальной задачи нет, просто есть свободное время поизучать.
...
Рейтинг: 0 / 0
04.10.2017, 19:46
    #39531073
questioner
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
kafka - гарантированная обработка всех сообщений и только один раз
vimbaquestioner,

Если расшифрововать то как задумывал автор книги, то результат первого пола мы выбрасываем, потому что читали с неизвестного офсета, после того как первый пол закончился нам назначены партиции, и мы перемещаем консумера на сохраненные офсеты.


А зачем его вообще брать это poll если мы его выбрасываем?

Ну то есть если просто его убрать то что то будет работать не так?

Мы же сначала делаем seek и только потом poll ?
...
Рейтинг: 0 / 0
04.10.2017, 21:23
    #39531117
vimba
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
kafka - гарантированная обработка всех сообщений и только один раз
questioner,

Я о том же и написал в первом посте здесь, что делается ненужная работа.

Бинго, я кажется понял. Дело в том, что джавадоки к методу ConsumerRebalanceListener#onPartitionsAssigned противоречат джавадокам в шапке интерфейса ConsumerRebalanceListener, реальному поведению и имени метода. Реальное поведение соответсвует описанию в шапке и названию метода, то есть onPartitionsAssigned дергается при любом ассайне партиций на консумера.

А из джавадоков к методу onPartitionsAssigned любой разумный человек может сделать вывод, что onPartitionsAssigned дергается только после каких-то изменений в топологии
ConsumerRebalanceListener javadocsA callback method the user can implement to provide handling of customized offsets on completion of a successful partition re-assignment.
Но тогда метод должен был, бы называться onPartitions Re assigned, авторов Kafka: The Definitive Guide такие противоречия либо не смутили, либо они их просто не заметили.
...
Рейтинг: 0 / 0
04.10.2017, 21:26
    #39531121
vimba
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
kafka - гарантированная обработка всех сообщений и только один раз
С учетом того, что дублируется идемпотентная операция, которая при повторении ничего не ломает, то ошибку трудно обнаружить, без применения качественного логирования.
...
Рейтинг: 0 / 0
04.10.2017, 22:01
    #39531143
questioner
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
kafka - гарантированная обработка всех сообщений и только один раз
vimba,

Вроде система то уже популярная, а противоречия как в чем-то сыром
...
Рейтинг: 0 / 0
04.10.2017, 22:03
    #39531144
questioner
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
kafka - гарантированная обработка всех сообщений и только один раз
vimba,

Но всё же безотносительно всего вышесказанного


Код: java
1.
2.
3.
4.
5.
 consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
  consumer.poll(0);

  for (TopicPartition partition: consumer.assignment())
    consumer.seek(partition, getOffsetFromDB(partition));   3



poll тут бесполезен абсолютно?
...
Рейтинг: 0 / 0
Форумы / Java [игнор отключен] [закрыт для гостей] / kafka - гарантированная обработка всех сообщений и только один раз / 13 сообщений из 13, страница 1 из 1
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]