Гость
Форумы / Java [игнор отключен] [закрыт для гостей] / Механизм обработки очереди / 25 сообщений из 32, страница 1 из 2
03.11.2019, 21:02
    #39884810
qi_ip
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
Приветствую!

Подскажите, пожалуйста, какие есть механизмы/приложения/фреймворки, чтобы реализовать параллельный механизм обработки сообщений из очереди.

Собственно сама задача: есть несколько сокетов, по которым постоянно приходят сообщения. Все сообщения сохраняются в таблицу А в БД.

Вскоре, из этой таблицы А строки забираются отсортированные по дате записи в БД и передаются парсеру, который обрабатывает их, модифицирует и складывает модифицированные записи в другую таблицу Б.

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
	
	   |  	 | 	          |--------|	|--------|
------> | 4	 |	          |	       |	| 1 MOD  |
------> | 3	 | 1-2-3-4-5 |	       |	| 2 MOD  |
------> | 1	 |---------->|PARSER |====>  3 MOD  |
------> | 2	 | THREAD1  |	       |	| 4 MOD  |
------> | 5	 |	          |	       |	| 5 MOD  |
	   |___|	          |______  |	|_______|



Есть необходимость разделить THREAD1 на несколько потоков, чтобы быстрее забирать данные на обработку. Но не совсем понимаю, в какую сторону копать, чтобы не нарушить логику обработки данных на выходе из парсера. В частности, каждая запись обрабатывается парсером разное время, но отдаваться на выходе должна строго в отсортированном порядке по дате изначальной записи.

Спасибо!
...
Рейтинг: 0 / 0
03.11.2019, 21:21
    #39884814
fixxer
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
qi_ip, Нужно партиционировать входной поток. Это можно сделать по-разному, например поделить отрезок времени за который запрашиваются данные или взять остаток от деления таймстампа или идентификатора на количество партиций. Вопрос, если вы все равно результат пишете в таблицу, которую можно как угодно сортировать, зачем сохранять порядок при записи?
...
Рейтинг: 0 / 0
03.11.2019, 21:48
    #39884821
PetroNotC Sharp
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
qi_ip,
1. Зачем промежуточно писать в бд?
2. Какая очередность при параллельной работе?
Шутите?
...
Рейтинг: 0 / 0
03.11.2019, 22:05
    #39884823
qi_ip
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
fixxerqi_ip, Нужно партиционировать входной поток. Это можно сделать по-разному, например поделить отрезок времени за который запрашиваются данные или взять остаток от деления таймстампа или идентификатора на количество партиций. Вопрос, если вы все равно результат пишете в таблицу, которую можно как угодно сортировать, зачем сохранять порядок при записи?
Потому что результат с таблицы Б сразу же забирается и отправляется дальше. И получается, что если, например, запись под номером 1, все еще в обработке, а запись под номером 2 уже обработалась, то нарушается последовательность команд.

PetroNotC Sharpqi_ip,
1. Зачем промежуточно писать в бд?

Изначально при получении сообщений есть минимальные проверки и отправка подтверждения получения сообщения. Плюс нужно сохранять данные на случай, если вдруг приложение упадет.

PetroNotC Sharpqi_ip,
2. Какая очередность при параллельной работе?
Шутите?
Поэтому и создал топик, так как хотел узнать, возможность реализации этого момента )))) Если бы шутил...хех
...
Рейтинг: 0 / 0
03.11.2019, 22:21
    #39884825
fixxer
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
qi_ip,

Возможность есть. Так называемый механизм watermark. Это значение максимального таймстампа на всех партициях. Партиции безопасно писать записи старше текущего значения watermark.
...
Рейтинг: 0 / 0
04.11.2019, 00:54
    #39884842
PetroNotC Sharp
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
qi_ipИзначально при получении сообщений есть минимальные проверки и отправка подтверждения получения сообщения. Плюс нужно сохранять данные на случай, если вдруг приложение упадет.это же логи. Или архивация. Получил фио из сокета, отдай копию в фоновый поток и пусть он в фоне сохраняет. А основной от сокета сразу это ФИО отдал на обработку функции
function бизнесЛогика(фио)
Так?

qi_ipPetroNotC Sharp2. Какая очередность при параллельной работе?
Шутите?
Поэтому и создал топик, так как хотел узнать, возможность реализации этого момента )))) Если бы шутил...хех
Вас еще раз спросить?
Какой дурак ставит задачу параллельной работы требуя очередности?
...
Рейтинг: 0 / 0
04.11.2019, 02:44
    #39884849
Sergunka
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
qi_ipПриветствую!

Подскажите, пожалуйста, какие есть механизмы/приложения/фреймворки, чтобы реализовать параллельный механизм обработки сообщений из очереди.

Собственно сама задача: есть несколько сокетов, по которым постоянно приходят сообщения. Все сообщения сохраняются в таблицу А в БД.

Вскоре, из этой таблицы А строки забираются отсортированные по дате записи в БД и передаются парсеру, который обрабатывает их, модифицирует и складывает модифицированные записи в другую таблицу Б.

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
	
	   |  	 | 	          |--------|	|--------|
------> | 4	 |	          |	       |	| 1 MOD  |
------> | 3	 | 1-2-3-4-5 |	       |	| 2 MOD  |
------> | 1	 |---------->|PARSER |====>  3 MOD  |
------> | 2	 | THREAD1  |	       |	| 4 MOD  |
------> | 5	 |	          |	       |	| 5 MOD  |
	   |___|	          |______  |	|_______|



Есть необходимость разделить THREAD1 на несколько потоков, чтобы быстрее забирать данные на обработку. Но не совсем понимаю, в какую сторону копать, чтобы не нарушить логику обработки данных на выходе из парсера. В частности, каждая запись обрабатывается парсером разное время, но отдаваться на выходе должна строго в отсортированном порядке по дате изначальной записи.

Спасибо!


Посмотрите на очередь с приоритетом - приоритет понятно по дате.
https://www.rabbitmq.com/priority.html

Но это все одно не даст Вам требуемого результата так как в вашем случае в любой момент может прийти данные за прошлую неделю

Вы можете буферизовать в очереди несколько тысяч записей и потом уже их раздать на обработку как вариант, но на мой взгляд сортировать данные проще когда уже они попали в БД если у вас там не стоит в постановки к примеру аномали детекшин перед тем как в базу писать ну и тд.

Поговорите с архитектором может он чего не договаривает
...
Рейтинг: 0 / 0
04.11.2019, 08:45
    #39884861
qi_ip
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
PetroNotC Sharpэто же логи. Или архивация. Получил фио из сокета, отдай копию в фоновый поток и пусть он в фоне сохраняет. А основной от сокета сразу это ФИО отдал на обработку функции
function бизнесЛогика(фио)
Так?

Буду посмотреть такой вариант

Какой дурак ставит задачу параллельной работы требуя очередности?
Никто не ставит такую задачу ))) Основная цель - ускорить обработку данных парсером, так как на входе накапливаются данные. Поэтому это и был вопрос :)

SergunkaНо это все одно не даст Вам требуемого результата так как в вашем случае в любой момент может прийти данные за прошлую неделю
Сотрировка идет по времени поступления запроса, то есть, если даже данные содержат информацию прошлой недели, все равно будет сортироваться по времени поступления запроса.
...
Рейтинг: 0 / 0
04.11.2019, 09:05
    #39884866
yvprod
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
qi_ip,

Расскажите подробнее, что за данные
...
Рейтинг: 0 / 0
04.11.2019, 09:17
    #39884870
PetroNotC Sharp
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
qi_ipНикто не ставит такую задачу ))) Основная цель - ускорить обработку данных парсером, так как на входе накапливаются данные. Поэтому это и был вопрос :)
Значит сортировку исключаем из вопроса. Нечего сортировать.
Парсер потокобезопасный?
Если да, то в поток его.
Если нет, то несколько экземпляров парсера.
Это узкое место по вашим словам.
Если ДВЕРЬ узкая, то либо строите рядом три двери, либо три дома (процесса) с такой узкой дверью.
Код: java
1.
2.
3.
fio = getSoket()
setLogDbAsync(fio
parserA.businessLayer(fio
...
Рейтинг: 0 / 0
04.11.2019, 11:34
    #39884895
fixxer
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
Потом, как всегда, окажется, что узким местом была база. И если предварительно не писать или писать асинхронно, то обработчик вытягивает.
...
Рейтинг: 0 / 0
04.11.2019, 11:49
    #39884899
qi_ip
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
yvprodqi_ip,

Расскажите подробнее, что за данные
Данные - обычные текстовые сообщения-команды, но их большой поток (минимум 30-50 команд в секунду) и разных размеров (от 100 байт до 65 кб).

PetroNotC SharpЗначит сортировку исключаем из вопроса. Нечего сортировать.

Исключить не получиться. Вот смотрите: допустим есть клиент, который постоянно посылает команды. Эти команды обрабатываются парсером, модифицируются и передаются/возвращаются другим клиентам. И если, например, пришли команды:

1. Включить
2. Выгрузить данные
3. Остановить выгрузку
4. Выключить

, то и на другой клиент после парсера они должны попасть в том же порядке.
В случае, если команды 2 и 3 обрабатываются в среднем 1 секунду, а 1 и 4 500 мс может получиться так, что сначала придут команды 1 и 4, а потом только 2 и 3.

Причем, комнда от клиента 1 пришла одна, а после обработки парсером она посылается сразу клиента 2-10, в зависимости, от того для скольких клиентов она предназначена.

fixxerПотом, как всегда, окажется, что узким местом была база. И если предварительно не писать или писать асинхронно, то обработчик вытягивает.
Сейчас добавляю в код тайминги, чтобы понять, где идет затык, на обработке в Java или же на insert/update в БД.
...
Рейтинг: 0 / 0
04.11.2019, 11:55
    #39884901
забыл ник
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
Еще никто не предлагал партиционировать по userId и пихать все в кафку?
...
Рейтинг: 0 / 0
04.11.2019, 12:05
    #39884906
yvprod
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
забыл ник,

я к этому и вел, партиционировать по какому нибудь бизнес-правилу и писать в разные очереди
...
Рейтинг: 0 / 0
04.11.2019, 12:06
    #39884907
qi_ip
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
Подскажите, может кто читал, где можно найти опенсорс коды или просто описание работы серверных приложений, например, таких как телеграм или ватцап или чем-то похожих мессенджеров.
...
Рейтинг: 0 / 0
04.11.2019, 12:09
    #39884908
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
qi_ipСобственно сама задача: есть несколько сокетов, по которым постоянно приходят сообщения. Все сообщения сохраняются в таблицу А в БД.

Вот с этого момента - фигня какая-то. Никто такую постановку не примет.

С сокетами никто не работает. Должен быть над-сокетный протокол. SOAP, например.
...
Рейтинг: 0 / 0
04.11.2019, 12:12
    #39884909
yvprod
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
qi_ip,

на joker Олег Анастасьев рассказывал про фреймворк мессенджера в одноклассниках, но в открытом доступе вроде еще нет
...
Рейтинг: 0 / 0
04.11.2019, 12:19
    #39884912
fixxer
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
yvprodзабыл ник,

я к этому и вел, партиционировать по какому нибудь бизнес-правилу и писать в разные очереди

+1
...
Рейтинг: 0 / 0
04.11.2019, 12:21
    #39884913
mad_nazgul
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
qi_ipПодскажите, может кто читал, где можно найти опенсорс коды или просто описание работы серверных приложений, например, таких как телеграм или ватцап или чем-то похожих мессенджеров.

Jabber?!
...
Рейтинг: 0 / 0
04.11.2019, 12:24
    #39884915
qi_ip
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
maytonqi_ipСобственно сама задача: есть несколько сокетов, по которым постоянно приходят сообщения. Все сообщения сохраняются в таблицу А в БД.

Вот с этого момента - фигня какая-то. Никто такую постановку не примет.

С сокетами никто не работает. Должен быть над-сокетный протокол. SOAP, например.
Обмен данных идет по вебсокету
...
Рейтинг: 0 / 0
04.11.2019, 12:56
    #39884921
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
Посмотри на это https://docs.oracle.com/javase/8/docs/api/java/util/PriorityQueue.html

Может поможет. И давай модульный тест или макет что ты уже написал.

Трудно говорить об ангелах на кончике иглы...
...
Рейтинг: 0 / 0
04.11.2019, 12:56
    #39884922
PetroNotC Sharp
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
qi_ipИ если, например, пришли команды:

1. Включить
2. Выгрузить данные
3. Остановить выгрузку
4. Выключить

, то и на другой клиент после парсера они должны попасть в том же порядке.mayton прав.
Ты изобретаешь ПРИКЛАДНОЙ ПРОТОКОЛ.
Нижнего уровня у тебя сокет. Но ты должен сохранить очередность пачки команд.
А если отправили вперемежку команды?
...
Рейтинг: 0 / 0
04.11.2019, 12:59
    #39884923
PetroNotC Sharp
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
PetroNotC Sharp
Код: java
1.
2.
3.
fio = getSoket()
setLogDbAsync(fio
parserA.businessLayer(fio

покажи тут как команда 4 придет перед командой 1?
1 и 4 идут в разные сокеты?
...
Рейтинг: 0 / 0
04.11.2019, 13:01
    #39884924
PetroNotC Sharp
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
qi_ipПодскажите, может кто читал, где можно найти опенсорс коды или просто описание работы серверных приложений, например, таких как телеграм или ватцап или чем-то похожих мессенджеров.
Сначала постановку вменяемую дай. Почему парсер тормозит, если его задача тупо передать строку клиенту?
...
Рейтинг: 0 / 0
04.11.2019, 13:01
    #39884925
PetroNotC Sharp
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Механизм обработки очереди
Вадя эту задачу давно решил)))
...
Рейтинг: 0 / 0
Форумы / Java [игнор отключен] [закрыт для гостей] / Механизм обработки очереди / 25 сообщений из 32, страница 1 из 2
Целевая тема:
Создать новую тему:
Автор:
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]