|
|
|
kafka - гарантированная обработка всех сообщений и только один раз
|
|||
|---|---|---|---|
|
#18+
Читаю вот эту статью: https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1 Тут решается проблема того, чтобы сохранять в базу результат обработки сообщений и оффсет для топика атомарно. Для этой цели решено хранить оффсет в базе и записывать в одной транзакции результат обработки сообщений и оффсет для топика. В общем-то идея понятна, но код как-то не очень: Код: java 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35. https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1 1 We use an imaginary method here to commit the transaction in the database. The idea here is that the database records and offsets will be inserted to the database as we process the records, and we just need to commit the transactions when we are about to lose the partition to make sure this information is persisted. 2 We also have an imaginary method to fetch the offsets from the database, and then we seek() to those records when we get ownership of new partitions. 3 When the consumer first starts, after we subscribe to topics, we call poll() once to make sure we join a consumer group and get assigned partitions, and then we immediately seek() to the correct offset in the partitions we are assigned to. Keep in mind that seek() only updates the position we are consuming from, so the next poll() will fetch the right messages. If there was an error in seek() (e.g., the offset does not exist), the exception will be thrown by poll(). 4 Another imaginary method: this time we update a table storing the offsets in our database. Here we assume that updating records is fast, so we do an update on every record, but commits are slow, so we only commit at the end of the batch. However, this can be optimized in different ways. SaveOffsetsOnRebalance я так понял onPartitionsRevoked вызывается когда один из партишенов этого листенера уходит к другому консумеру при ребалансе, ну или вообще его удалили onPartitionsAssigned - вызывается когда этому консумеру назначили партишен Ок commitDBTransaction() - коммитит в базу все результаты которые мы обработали а также оффсет. consumer.seek выставляет консумеру оффсет для партишена. Я правильно понял как это работает? ... |
|||
|
:
Нравится:
Не нравится:
|
|||
| 04.10.2017, 12:41 |
|
||
|
kafka - гарантированная обработка всех сообщений и только один раз
|
|||
|---|---|---|---|
|
#18+
Код: java 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. Ну и считается, что если внутри for мы вывалились, то транзакция не закоммитится? ... |
|||
|
:
Нравится:
Не нравится:
|
|||
| 04.10.2017, 12:48 |
|
||
|
kafka - гарантированная обработка всех сообщений и только один раз
|
|||
|---|---|---|---|
|
#18+
questioner, в приведенном алгоритме дублируется функционал поциционирования консумеров, пункт 3 выглядит лишним(как и предшествующий ему consumer.poll(0);), потому что onPartitionsAssigned выполняется всегда в том числе при первоначальной подписке. Конечно к логическим ошибкам это дублирование не приводит, так как дублируется выполнение идемпотентной операции. Всегда расставляйте логи в ключевые места консумера, чтобы понимать что происходит. ... |
|||
|
:
Нравится:
Не нравится:
|
|||
| 04.10.2017, 14:24 |
|
||
|
kafka - гарантированная обработка всех сообщений и только один раз
|
|||
|---|---|---|---|
|
#18+
vimba, https://docs.confluent.io/2.0.1/clients/javadocs/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html авторA callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the consumer changes. This is applicable when the consumer is having Kafka auto-manage group membership. If the consumer's directly assign partitions, those partitions will never be reassigned and this callback is not applicable. ... |
|||
|
:
Нравится:
Не нравится:
|
|||
| 04.10.2017, 14:40 |
|
||
|
kafka - гарантированная обработка всех сообщений и только один раз
|
|||
|---|---|---|---|
|
#18+
vimba, А зачем делать poll а потом seek? Особенно непонятно почему игнорится возвращаемое значение poll ... |
|||
|
:
Нравится:
Не нравится:
|
|||
| 04.10.2017, 14:43 |
|
||
|
kafka - гарантированная обработка всех сообщений и только один раз
|
|||
|---|---|---|---|
|
#18+
questioner, Если расшифрововать то как задумывал автор книги, то результат первого пола мы выбрасываем, потому что читали с неизвестного офсета, после того как первый пол закончился нам назначены партиции, и мы перемещаем консумера на сохраненные офсеты. Кстате не воспринимайте то, что написано в книге как этолон, в напечатанном концепте еще много недосказанного, например что будет если консумер лежал так долго, что часть месседжей уже потерлось. Незаметил в главе ничего про свойство "auto.offset.reset", оно по дефолту равно "latest", что приведет в такой ситуации к тому что, при отсутсвии месседжей по требуему офсету консумер будет позиционирован на конец партиции, и продолбает вообще все месседжи которые еще остались в партиции. Я вам настоятельно рекомендую как минмимум прочитать официальную пользовательскую документацию , описание протокола , и опциоанльно другие статьи с девелоперской вики. Книги конечно хорошая штука, но они больше нужны для слабодокументированных вещей таких как springframework или hibernate, которые хрен поймешь, пока исходный код не прочитаешь. ... |
|||
|
:
Нравится:
Не нравится:
|
|||
| 04.10.2017, 15:14 |
|
||
|
kafka - гарантированная обработка всех сообщений и только один раз
|
|||
|---|---|---|---|
|
#18+
questionervimba, https://docs.confluent.io/2.0.1/clients/javadocs/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html авторA callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the consumer changes. This is applicable when the consumer is having Kafka auto-manage group membership. If the consumer's directly assign partitions, those partitions will never be reassigned and this callback is not applicable. А в приведенном примере консумер не назначает себе партиций, он только позиционируется на тех партициях что ему уже выданны. ... |
|||
|
:
Нравится:
Не нравится:
|
|||
| 04.10.2017, 15:40 |
|
||
|
kafka - гарантированная обработка всех сообщений и только один раз
|
|||
|---|---|---|---|
|
#18+
vimba, Понял, спасибо. Понимаю, что в книге может быть не всё, но я пока что общие принципы хочу понять. Реальной задачи нет, просто есть свободное время поизучать. ... |
|||
|
:
Нравится:
Не нравится:
|
|||
| 04.10.2017, 17:38 |
|
||
|
kafka - гарантированная обработка всех сообщений и только один раз
|
|||
|---|---|---|---|
|
#18+
vimbaquestioner, Если расшифрововать то как задумывал автор книги, то результат первого пола мы выбрасываем, потому что читали с неизвестного офсета, после того как первый пол закончился нам назначены партиции, и мы перемещаем консумера на сохраненные офсеты. А зачем его вообще брать это poll если мы его выбрасываем? Ну то есть если просто его убрать то что то будет работать не так? Мы же сначала делаем seek и только потом poll ? ... |
|||
|
:
Нравится:
Не нравится:
|
|||
| 04.10.2017, 19:46 |
|
||
|
kafka - гарантированная обработка всех сообщений и только один раз
|
|||
|---|---|---|---|
|
#18+
questioner, Я о том же и написал в первом посте здесь, что делается ненужная работа. Бинго, я кажется понял. Дело в том, что джавадоки к методу ConsumerRebalanceListener#onPartitionsAssigned противоречат джавадокам в шапке интерфейса ConsumerRebalanceListener, реальному поведению и имени метода. Реальное поведение соответсвует описанию в шапке и названию метода, то есть onPartitionsAssigned дергается при любом ассайне партиций на консумера. А из джавадоков к методу onPartitionsAssigned любой разумный человек может сделать вывод, что onPartitionsAssigned дергается только после каких-то изменений в топологии ConsumerRebalanceListener javadocsA callback method the user can implement to provide handling of customized offsets on completion of a successful partition re-assignment. Но тогда метод должен был, бы называться onPartitions Re assigned, авторов Kafka: The Definitive Guide такие противоречия либо не смутили, либо они их просто не заметили. ... |
|||
|
:
Нравится:
Не нравится:
|
|||
| 04.10.2017, 21:23 |
|
||
|
kafka - гарантированная обработка всех сообщений и только один раз
|
|||
|---|---|---|---|
|
#18+
С учетом того, что дублируется идемпотентная операция, которая при повторении ничего не ломает, то ошибку трудно обнаружить, без применения качественного логирования. ... |
|||
|
:
Нравится:
Не нравится:
|
|||
| 04.10.2017, 21:26 |
|
||
|
kafka - гарантированная обработка всех сообщений и только один раз
|
|||
|---|---|---|---|
|
#18+
vimba, Вроде система то уже популярная, а противоречия как в чем-то сыром ... |
|||
|
:
Нравится:
Не нравится:
|
|||
| 04.10.2017, 22:01 |
|
||
|
kafka - гарантированная обработка всех сообщений и только один раз
|
|||
|---|---|---|---|
|
#18+
vimba, Но всё же безотносительно всего вышесказанного Код: java 1. 2. 3. 4. 5. poll тут бесполезен абсолютно? ... |
|||
|
:
Нравится:
Не нравится:
|
|||
| 04.10.2017, 22:03 |
|
||
|
|

start [/forum/topic.php?fid=59&msg=39531117&tid=2122559]: |
0ms |
get settings: |
7ms |
get forum list: |
18ms |
check forum access: |
3ms |
check topic access: |
3ms |
track hit: |
168ms |
get topic data: |
11ms |
get forum data: |
3ms |
get page messages: |
51ms |
get tp. blocked users: |
1ms |
| others: | 233ms |
| total: | 498ms |

| 0 / 0 |
