powered by simpleCommunicator - 2.0.30     © 2024 Programmizd 02
Map
Форумы / Java [игнор отключен] [закрыт для гостей] / Механизм обработки очереди
25 сообщений из 32, страница 1 из 2
Механизм обработки очереди
    #39884810
qi_ip
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Приветствую!

Подскажите, пожалуйста, какие есть механизмы/приложения/фреймворки, чтобы реализовать параллельный механизм обработки сообщений из очереди.

Собственно сама задача: есть несколько сокетов, по которым постоянно приходят сообщения. Все сообщения сохраняются в таблицу А в БД.

Вскоре, из этой таблицы А строки забираются отсортированные по дате записи в БД и передаются парсеру, который обрабатывает их, модифицирует и складывает модифицированные записи в другую таблицу Б.

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
	
	   |  	 | 	          |--------|	|--------|
------> | 4	 |	          |	       |	| 1 MOD  |
------> | 3	 | 1-2-3-4-5 |	       |	| 2 MOD  |
------> | 1	 |---------->|PARSER |====>  3 MOD  |
------> | 2	 | THREAD1  |	       |	| 4 MOD  |
------> | 5	 |	          |	       |	| 5 MOD  |
	   |___|	          |______  |	|_______|



Есть необходимость разделить THREAD1 на несколько потоков, чтобы быстрее забирать данные на обработку. Но не совсем понимаю, в какую сторону копать, чтобы не нарушить логику обработки данных на выходе из парсера. В частности, каждая запись обрабатывается парсером разное время, но отдаваться на выходе должна строго в отсортированном порядке по дате изначальной записи.

Спасибо!
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884814
Фотография fixxer
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
qi_ip, Нужно партиционировать входной поток. Это можно сделать по-разному, например поделить отрезок времени за который запрашиваются данные или взять остаток от деления таймстампа или идентификатора на количество партиций. Вопрос, если вы все равно результат пишете в таблицу, которую можно как угодно сортировать, зачем сохранять порядок при записи?
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884821
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
qi_ip,
1. Зачем промежуточно писать в бд?
2. Какая очередность при параллельной работе?
Шутите?
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884823
qi_ip
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
fixxerqi_ip, Нужно партиционировать входной поток. Это можно сделать по-разному, например поделить отрезок времени за который запрашиваются данные или взять остаток от деления таймстампа или идентификатора на количество партиций. Вопрос, если вы все равно результат пишете в таблицу, которую можно как угодно сортировать, зачем сохранять порядок при записи?
Потому что результат с таблицы Б сразу же забирается и отправляется дальше. И получается, что если, например, запись под номером 1, все еще в обработке, а запись под номером 2 уже обработалась, то нарушается последовательность команд.

PetroNotC Sharpqi_ip,
1. Зачем промежуточно писать в бд?

Изначально при получении сообщений есть минимальные проверки и отправка подтверждения получения сообщения. Плюс нужно сохранять данные на случай, если вдруг приложение упадет.

PetroNotC Sharpqi_ip,
2. Какая очередность при параллельной работе?
Шутите?
Поэтому и создал топик, так как хотел узнать, возможность реализации этого момента )))) Если бы шутил...хех
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884825
Фотография fixxer
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
qi_ip,

Возможность есть. Так называемый механизм watermark. Это значение максимального таймстампа на всех партициях. Партиции безопасно писать записи старше текущего значения watermark.
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884842
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
qi_ipИзначально при получении сообщений есть минимальные проверки и отправка подтверждения получения сообщения. Плюс нужно сохранять данные на случай, если вдруг приложение упадет.это же логи. Или архивация. Получил фио из сокета, отдай копию в фоновый поток и пусть он в фоне сохраняет. А основной от сокета сразу это ФИО отдал на обработку функции
function бизнесЛогика(фио)
Так?

qi_ipPetroNotC Sharp2. Какая очередность при параллельной работе?
Шутите?
Поэтому и создал топик, так как хотел узнать, возможность реализации этого момента )))) Если бы шутил...хех
Вас еще раз спросить?
Какой дурак ставит задачу параллельной работы требуя очередности?
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884849
Sergunka
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
qi_ipПриветствую!

Подскажите, пожалуйста, какие есть механизмы/приложения/фреймворки, чтобы реализовать параллельный механизм обработки сообщений из очереди.

Собственно сама задача: есть несколько сокетов, по которым постоянно приходят сообщения. Все сообщения сохраняются в таблицу А в БД.

Вскоре, из этой таблицы А строки забираются отсортированные по дате записи в БД и передаются парсеру, который обрабатывает их, модифицирует и складывает модифицированные записи в другую таблицу Б.

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
	
	   |  	 | 	          |--------|	|--------|
------> | 4	 |	          |	       |	| 1 MOD  |
------> | 3	 | 1-2-3-4-5 |	       |	| 2 MOD  |
------> | 1	 |---------->|PARSER |====>  3 MOD  |
------> | 2	 | THREAD1  |	       |	| 4 MOD  |
------> | 5	 |	          |	       |	| 5 MOD  |
	   |___|	          |______  |	|_______|



Есть необходимость разделить THREAD1 на несколько потоков, чтобы быстрее забирать данные на обработку. Но не совсем понимаю, в какую сторону копать, чтобы не нарушить логику обработки данных на выходе из парсера. В частности, каждая запись обрабатывается парсером разное время, но отдаваться на выходе должна строго в отсортированном порядке по дате изначальной записи.

Спасибо!


Посмотрите на очередь с приоритетом - приоритет понятно по дате.
https://www.rabbitmq.com/priority.html

Но это все одно не даст Вам требуемого результата так как в вашем случае в любой момент может прийти данные за прошлую неделю

Вы можете буферизовать в очереди несколько тысяч записей и потом уже их раздать на обработку как вариант, но на мой взгляд сортировать данные проще когда уже они попали в БД если у вас там не стоит в постановки к примеру аномали детекшин перед тем как в базу писать ну и тд.

Поговорите с архитектором может он чего не договаривает
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884861
qi_ip
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
PetroNotC Sharpэто же логи. Или архивация. Получил фио из сокета, отдай копию в фоновый поток и пусть он в фоне сохраняет. А основной от сокета сразу это ФИО отдал на обработку функции
function бизнесЛогика(фио)
Так?

Буду посмотреть такой вариант

Какой дурак ставит задачу параллельной работы требуя очередности?
Никто не ставит такую задачу ))) Основная цель - ускорить обработку данных парсером, так как на входе накапливаются данные. Поэтому это и был вопрос :)

SergunkaНо это все одно не даст Вам требуемого результата так как в вашем случае в любой момент может прийти данные за прошлую неделю
Сотрировка идет по времени поступления запроса, то есть, если даже данные содержат информацию прошлой недели, все равно будет сортироваться по времени поступления запроса.
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884866
yvprod
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
qi_ip,

Расскажите подробнее, что за данные
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884870
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
qi_ipНикто не ставит такую задачу ))) Основная цель - ускорить обработку данных парсером, так как на входе накапливаются данные. Поэтому это и был вопрос :)
Значит сортировку исключаем из вопроса. Нечего сортировать.
Парсер потокобезопасный?
Если да, то в поток его.
Если нет, то несколько экземпляров парсера.
Это узкое место по вашим словам.
Если ДВЕРЬ узкая, то либо строите рядом три двери, либо три дома (процесса) с такой узкой дверью.
Код: java
1.
2.
3.
fio = getSoket()
setLogDbAsync(fio
parserA.businessLayer(fio
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884895
Фотография fixxer
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Потом, как всегда, окажется, что узким местом была база. И если предварительно не писать или писать асинхронно, то обработчик вытягивает.
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884899
qi_ip
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
yvprodqi_ip,

Расскажите подробнее, что за данные
Данные - обычные текстовые сообщения-команды, но их большой поток (минимум 30-50 команд в секунду) и разных размеров (от 100 байт до 65 кб).

PetroNotC SharpЗначит сортировку исключаем из вопроса. Нечего сортировать.

Исключить не получиться. Вот смотрите: допустим есть клиент, который постоянно посылает команды. Эти команды обрабатываются парсером, модифицируются и передаются/возвращаются другим клиентам. И если, например, пришли команды:

1. Включить
2. Выгрузить данные
3. Остановить выгрузку
4. Выключить

, то и на другой клиент после парсера они должны попасть в том же порядке.
В случае, если команды 2 и 3 обрабатываются в среднем 1 секунду, а 1 и 4 500 мс может получиться так, что сначала придут команды 1 и 4, а потом только 2 и 3.

Причем, комнда от клиента 1 пришла одна, а после обработки парсером она посылается сразу клиента 2-10, в зависимости, от того для скольких клиентов она предназначена.

fixxerПотом, как всегда, окажется, что узким местом была база. И если предварительно не писать или писать асинхронно, то обработчик вытягивает.
Сейчас добавляю в код тайминги, чтобы понять, где идет затык, на обработке в Java или же на insert/update в БД.
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884901
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Еще никто не предлагал партиционировать по userId и пихать все в кафку?
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884906
yvprod
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
забыл ник,

я к этому и вел, партиционировать по какому нибудь бизнес-правилу и писать в разные очереди
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884907
qi_ip
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Подскажите, может кто читал, где можно найти опенсорс коды или просто описание работы серверных приложений, например, таких как телеграм или ватцап или чем-то похожих мессенджеров.
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884908
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
qi_ipСобственно сама задача: есть несколько сокетов, по которым постоянно приходят сообщения. Все сообщения сохраняются в таблицу А в БД.

Вот с этого момента - фигня какая-то. Никто такую постановку не примет.

С сокетами никто не работает. Должен быть над-сокетный протокол. SOAP, например.
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884909
yvprod
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
qi_ip,

на joker Олег Анастасьев рассказывал про фреймворк мессенджера в одноклассниках, но в открытом доступе вроде еще нет
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884912
Фотография fixxer
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
yvprodзабыл ник,

я к этому и вел, партиционировать по какому нибудь бизнес-правилу и писать в разные очереди

+1
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884913
mad_nazgul
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
qi_ipПодскажите, может кто читал, где можно найти опенсорс коды или просто описание работы серверных приложений, например, таких как телеграм или ватцап или чем-то похожих мессенджеров.

Jabber?!
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884915
qi_ip
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
maytonqi_ipСобственно сама задача: есть несколько сокетов, по которым постоянно приходят сообщения. Все сообщения сохраняются в таблицу А в БД.

Вот с этого момента - фигня какая-то. Никто такую постановку не примет.

С сокетами никто не работает. Должен быть над-сокетный протокол. SOAP, например.
Обмен данных идет по вебсокету
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884921
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Посмотри на это https://docs.oracle.com/javase/8/docs/api/java/util/PriorityQueue.html

Может поможет. И давай модульный тест или макет что ты уже написал.

Трудно говорить об ангелах на кончике иглы...
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884922
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
qi_ipИ если, например, пришли команды:

1. Включить
2. Выгрузить данные
3. Остановить выгрузку
4. Выключить

, то и на другой клиент после парсера они должны попасть в том же порядке.mayton прав.
Ты изобретаешь ПРИКЛАДНОЙ ПРОТОКОЛ.
Нижнего уровня у тебя сокет. Но ты должен сохранить очередность пачки команд.
А если отправили вперемежку команды?
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884923
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
PetroNotC Sharp
Код: java
1.
2.
3.
fio = getSoket()
setLogDbAsync(fio
parserA.businessLayer(fio

покажи тут как команда 4 придет перед командой 1?
1 и 4 идут в разные сокеты?
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884924
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
qi_ipПодскажите, может кто читал, где можно найти опенсорс коды или просто описание работы серверных приложений, например, таких как телеграм или ватцап или чем-то похожих мессенджеров.
Сначала постановку вменяемую дай. Почему парсер тормозит, если его задача тупо передать строку клиенту?
...
Рейтинг: 0 / 0
Механизм обработки очереди
    #39884925
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Вадя эту задачу давно решил)))
...
Рейтинг: 0 / 0
25 сообщений из 32, страница 1 из 2
Форумы / Java [игнор отключен] [закрыт для гостей] / Механизм обработки очереди
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]