powered by simpleCommunicator - 2.0.61     © 2026 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / Delphi [игнор отключен] [закрыт для гостей] / ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
25 сообщений из 260, страница 2 из 11
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38758247
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Таким образом, со статичным борокером сообщений задача решена.

Можно даже запустить общий брокер для всех типов сообщений, и строить сетевую архитектуру вокруг него.

Т.е., топология "Звезда" работает. Какое-то время.
...пока не возникнут вопросы вроде пропускной способности и расширения/усложнения логики брокера.

АДЪ неминуем.

ZMQ придет, порядок наведет!
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38758260
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Куда лучше реализовать брокер не в качестве транспорта сообщений, а в качестве поставщика адресной информации.

Для этого следует использовать сокеты ZMQ типа XPUB и XSUB, так как с ними ZMQ не пересылает сообщения от издателя к подписчику напрямую.
Сокеты XSUB и XPUB - точно такие же, как и сокеты типа SUB и PUB, за исключением того, что они обрабатывают подписки в форме специальных сообщений. А при подключении SUB и PUB сокетов к XPUB и XSUB сокетам первые связываются друг с другом уже по известным адресам.


То есть, основной поток данных идет, минуя брокер:
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38758261
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччД...
То есть, основной поток данных идет, минуя брокер:
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38758262
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
А ты еще спрашиваешь
defecator...а для чего ?
...
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38758398
vavan
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччДс винсокетами ты можешь послать всех, кто (например) неправильно отправил заголовок, еще "в процессе", на принимая сообщения целиком"послать" ты можешь только закрыв сокет. а так пока тебе в него будут вваливать придется читать
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38758427
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
vavanчччДс винсокетами ты можешь послать всех, кто (например) неправильно отправил заголовок, еще "в процессе", на принимая сообщения целиком"послать" ты можешь только закрыв сокет. а так пока тебе в него будут вваливать придется читать
Вот именно - c WinSockets ты можешь закрыть сокет в процессе чтения, не дочитав...

А в ZMQ - ты начинаешь читать, когда "все уже здеcь".
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38758443
vavan
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччДв ZMQ - ты начинаешь читать, когда "все уже здеcь"при условии что оно поместилось
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38758615
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
vavanчччДв ZMQ - ты начинаешь читать, когда "все уже здеcь"при условии что оно поместилось
При всех условиях.
Если ты начал читать, значит - сообщение здесь. Они либо приходит, либо нет.
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38759698
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Шаблон "Разделяемая очередь".

Потихоньку перейдем к сокетам DEALER и ROUTER .
...

В реальности может потребоваться, чтобы множество клиентов могли подключаться к разным сервисам (например, для распределения нагрузки по сервисам).

Для реализации коннектов "много:много" есть два пути:
- каждый клиентский сокет может коннектиться ко множеству сервисных конечных точек. То есть, один клиентский сокет типа (REQ) коннектится к сервисным сокетам с известными адресами. После этого запросы будут распределяться между сервисами.


Например, коннектим клинтский сокет к трем конечным точкам ("сервисам") :A, B, и C.
Вспомним этот ( 16562041 ) пример, поясняющий схему "Запрос - Ответ" (REQ - REP). Чтобы клиент ( 16562066 ) подключился к трем сервисам (например, по tpc на localhost, на трех разных портах), нужно

Код: pascal
1.
2.
3.
  zmq_connect(requester, 'tcp://localhost:5555');
  zmq_connect(requester, 'tcp://localhost:5556');
  zmq_connect(requester, 'tcp://localhost:5557');


Клиент последовательно выполняет запросы R1, R2, R3, R4. В результате запросы R1 и R4 отправляются к сервису A, R2 - к B, R3 - к C.
Циклически распределяя запросы (наш любимый roud-robin).

Такая конструкция позволяет добавлять без проблем добавлять сколько угодно клиентов.
Как показано выше, с помощью zmq_connect() можно добавлять сколько угодно сервисов. Беда в том, что клиент должен знать, где находится новый сервис. Если клиентов - 100, и в течении скажем, суток, добавляется всего три новых сервиса, то нужно в итоге триста раз переконфигурировать всех клиентов.
Что грустно.
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38759713
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
В идеале, мы должны быть в состоянии добавлять и удалять сервисы или клиентов в любое время, не касаясь любой другой части топологии.
...
авторДля реализации коннектов "много:много" есть два пути:
- каждый клиентский сокет может коннектиться ко множеству сервисных конечных точек. То есть, один клиентский сокет типа (REQ) коннектится к сервисным сокетам с известными адресами. После этого запросы будут распределяться между сервисами.

- второй путь - использование брокера запросов как промежуточного слоя.
...

Напишем крошечный брокер запросов, реализующий желаемую гибкость топологии.
Брокер соединит две конечные точки - фронтенд (сокет стороны клиентов) и бэкэнд (сокет стороны сервисов).
Затем брокер, используя zmq_poll(), будет отслеживать активность этих сокетов, и перебрасывать сообщения от одного сокета к другому. При чем, в ручном управлении очередности использования сервисов нет необходимости, т.к. ZeroMQ делает это автоматически для каждого сокета.
Когда мы построили приложение ( 16562041 ) по схеме "Запрос - Ответ (Req_Rep)", система получилась с синхронным диалогом обмена. Клиент шлет запрос. Сервис читает запрос и шлет ответ. Клиент читает ответ. Если клиент или сервис будут выполнять что-то другое (например, клиент пошлет два запроса подряд без ожидания ответа), система просто перестанет работать.

...

Конечно, раз теперь мы умеем пользоваться zmq_poll() ( 16619247 ), сделаем можно сделать брокер неблокирующим.
Но мы пойдем другим путем, и не станем использовать сокеты типа REP и REQ.

Так вот, есть схема использования пар сокетов, которые реализуют схему "Посредник - Маршрутизатор" , режимы сокетов соответственно называются DEALER и ROUTER . Они позволяют получить неблокирующий режим для схемы "Запрос - Ответ".
В нашей схеме "Запрос - Ответ" сокет REQ будет "говорить" с сокетом ROUTER, а сокет DEALER - с сокетом REP.
Сокеты DEALER ROUTER будет как раз размещаться на нашем брокере, передачу сообщений между ними мы обеспечим с помощью кода. Будем извлекать сообщение из одного и передавать его другому сокету.

Наш брокер схемы "Запрос - Ответ" привязывается к двум конечным точкам: одна для коннектов к ней клиентов(фронтэнд сокет), вторая - для коннектов сервисов(бэкэнд сокет).

Задача та же, что и в первом примере: возведение в квадрат целых чисел. Клиент отсылает запросы, сервис (сервер) - выполняет.
Запрос - беззнаковое x32 целое число (UInt32) , ответ - квадрат беззнаковое x64 целого (UInt64).

Вот что у нас было:
Клиент
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
program BrRR_Client;

{$APPTYPE CONSOLE}
// Клиент
// Коннектится сокетом REQ к tcp://localhost:5559
// Шлет целое число сервису (серверу), обратно получает квадрат числа

uses
  SysUtils, ZMQ;
const
  c_ReqCnt = 100;
var
  fContext: Pointer;
  fSocketRequester: Pointer;
  fSrcVal: Cardinal;
  fResultVal: UInt64;
  i: integer;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketRequester := zmq_socket(fContext, ZMQ_REQ);
  zmq_connect(fSocketRequester, 'tcp://localhost:5559'); // Коннект к сервису
  Randomize();
  for i := 0 to Pred(c_ReqCnt) do begin
    fSrcVal := Cardinal(Random(-1)); // Генерация случайного целого
    zmq_send(fSocketRequester, fSrcVal, SizeOf(fSrcVal), 0); // Запрос
    zmq_recv(fSocketRequester, fResultVal, SizeOf(fResultVal), 0); // Ответ
    Writeln('Iter: ', i, ' src=', fSrcVal, ' result=', fResultVal);
  end;
  zmq_close(fSocketRequester);
  zmq_ctx_destroy(fContext);
  Readln;

end.




Сервис (сервер)
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
program BrRR_Service;

{$APPTYPE CONSOLE}
// Сервис (сервер)
// Находится в известной клиентам конечной точке, связывает сокет REP с tcp:*:5560,
// Получает целое число, возводит в квадрат и отправляет обратно результат

uses
  SysUtils, ZMQ;
var
  fContext: Pointer;
  fSocketResponder: Pointer;
  fSrcVal: Cardinal;
  fResultVal: UInt64;
  i: integer;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketResponder := zmq_socket(fContext, ZMQ_REP);
  zmq_bind(fSocketResponder, 'tcp://*:5559'); // Привязка к конечной точке
  Writeln('Starting service...');
  i := 0;
  while True do begin
    zmq_recv(fSocketResponder, fSrcVal, SizeOf(fSrcVal), 0); // Запрос
    fResultVal := UInt64(fSrcVal) * UInt64(fSrcVal); // "Полезная работа"
    zmq_send(fSocketResponder, fResultVal, SizeOf(fResultVal), 0); // Ответ
    Writeln('Iter: ', i, ' src=', fSrcVal, ' result=', fResultVal);
    Inc(i);
  end;
  zmq_close(fSocketResponder);
  zmq_ctx_destroy(fContext);
  Readln;

end.



Пока ничего нового не видим: продублирована функциональность схемы "Вопрос - Ответ". Клиент - коннектится к конечной точке - к сервису, сервис находится в этой конечной точке и ждет запросов клиентов. "Конечная точка" - это известный адрес (Например, "tcp://localhost:5560").
То есть, реализована вот эта схема: 16630265 .

Будем улучшать мир. Воткнем между ними брокер.
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38759717
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Брокер.

Что-то новенькое: сокеты DIALER и ROUTER.

Брокер
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
program BrRR_Broker;

{$APPTYPE CONSOLE}
// Брокер
// Находится в известной клиентам и сервисам конечной точке,
// Находится в известной клиентам конечной точке,
// связывает сокет ZMQ_ROUTER с tcp:*:5559,
// связывает сокет ZMQ_DEALER с tcp:*:5560,
// Перекидывает сообщения между сокетами от FrontEnd к BackEnd
// и обратно

uses
  SysUtils, ZMQ, Math;
var
  fContext: Pointer;
  fSocketFrontEnd: Pointer;
  fSocketBackEnd: Pointer;
  fZMQPoll: array[0..1] of pollitem_t;
  fMsg: zmq_msg_t;
  fDoMore: Boolean;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketFrontEnd := zmq_socket(fContext, ZMQ_ROUTER);
  fSocketBackEnd := zmq_socket(fContext, ZMQ_DEALER);
  zmq_bind(fSocketFrontEnd, 'tcp://*:5559');
    // Конечная точка для клиентов
  zmq_bind(fSocketBackEnd, 'tcp://*:5560');
    // Кончная точка для сервисов
  fZMQPoll[0].socket := fSocketFrontEnd;
    // Инициализация пула сокетов
  fZMQPoll[0].fd := 0;
  fZMQPoll[0].events := ZMQ_POLLIN;
  fZMQPoll[0].revents := 0;
  fZMQPoll[1].socket := fSocketBackEnd;
  fZMQPoll[1].fd := 0;
  fZMQPoll[1].events := ZMQ_POLLIN;
  fZMQPoll[1].revents := 0;
  while true do begin
    zmq_poll(fZMQPoll[0], Length(fZMQPoll), -1);
      // Проверка состояния сокетов из пула
    if (fZMQPoll[0].revents and ZMQ_POLLIN) <> 0 then
      while True do
      begin // Трансляция сообщний от клиента к сервису
      // Обработка всх частей сообщения
        zmq_msg_init(fMsg);
        zmq_msg_recv(fMsg, fSocketFrontEnd, 0);
        fDoMore := zmq_msg_more(fMSG) <> 0;
        zmq_msg_send(fMsg, fSocketBackEnd, IfThen(fDoMore, ZMQ_SNDMORE, 0));
        zmq_msg_close(fMsg);
        if not fDoMore then
          Break; // Это была последняя часть сообщения
      end;
    if (fZMQPoll[1].revents and ZMQ_POLLIN) <> 0 then
      while True do
        // Трансляция сообщний от сервиса к клиенту
      begin // Обработка всх частей сообщения
        zmq_msg_init(fMsg);
        zmq_msg_recv(fMsg, fSocketBackEnd, 0);
        fDoMore := zmq_msg_more(fMSG) <> 0;
        zmq_msg_send(fMsg, fSocketFrontEnd, IfThen(fDoMore, ZMQ_SNDMORE, 0));
        zmq_msg_close(fMsg);
        if not fDoMore then
          Break; // Это была последняя часть сообщения
      end;
  end;
  zmq_close(fSocketFrontEnd);
  zmq_close(fSocketBackEnd);
  zmq_ctx_destroy(fContext);
  Readln;

end.




Чтобы все заработало, нужно у сервиса из 16630300 заменить биндинг на коннект:

Код: pascal
1.
2.
//  zmq_bind(fSocketResponder, 'tcp://*:5559'); // Привязка к конечной точке
  zmq_connect (fSocketResponder, 'tcp://localhost:5560'); // Коннект к конкретной точке



Брокер в асинхронном режиме слушает пул из сокетов с помощью zmq_poll(), затем читает (возможно, по частям) сообщение и транслирует его в выходной сокет. В обе стороны.

Чтение по частям реализовано "для общности". Чтобы работало с любыми сообщениями.
Интересно, что наши крошечные сообщения
Код: pascal
1.
2.
3.
     zmq_send(fSocketRequester, fSrcVal, SizeOf(fSrcVal), 0); // Запрос
...
     zmq_recv(fSocketRequester, fResultVal, SizeOf(fResultVal), 0); // Ответ

и
Код: pascal
1.
2.
3.
    zmq_recv(fSocketResponder, fSrcVal, SizeOf(fSrcVal), 0); // Запрос
...
    zmq_send(fSocketResponder, fResultVal, SizeOf(fResultVal), 0); // Ответ


порождают на стороне брокера каскад из нескольких составных сообщений из структур zmq_msg_t, что хорошо видно в отладчике.


Теперь можно запускать сколько хочешь сервисов и сколько хочешь клиентов - все они будут общаться через брокер.

Такой брокер для схемы "Запрос- Ответ" существенно облегчает обслуживание сети , так как клиентам нет нужды знать, где размещены сервисы, а сервисам нет нужны знать, где размещены клиенты. Единственной статической точкой является брокер.
Жизнь налаживается. :)
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38759718
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччД...
Теперь можно запускать сколько хочешь сервисов и сколько хочешь клиентов - все они будут общаться через брокер.

Такой брокер для схемы "Запрос- Ответ" существенно облегчает обслуживание сети , так как клиентам нет нужды знать, где размещены сервисы, а сервисам нет нужны знать, где размещены клиенты. Единственной статической точкой является брокер.
...
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38759719
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Приведенный выше код трансляции сообщений представляется очень полезным для многих случаев для схемы "Запрос - Ответ".

Так вот, ZeroMQ есть даже специальный метод, реализующий все, что мы нако'дли в брокере.
См. ниже: "Та-да-мммм!"


Код: pascal
1.
function zmq_proxy( frontend, backend, capture: Pointer ): Integer; 



frontend - сокет фронтэнд
backend - сокет бэкэнд
capture - сокет для перехвата сообщений (nil, если не используется)

Технически нет никакой разницы между фронтэнд и бэкэнд сокетами.


Та-да-мммм!
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
program BrRR_Broker;

{$APPTYPE CONSOLE}
// Брокер
// Находится в известной клиентам и сервисам конечной точке,
// Находится в известной клиентам конечной точке,
// связывает сокет ZMQ_ROUTER с tcp:*:5559,
// связывает сокет ZMQ_DEALER с tcp:*:5560,
// Перекидывает сообщения между сокетами от FrontEnd к BackEnd
// и обратно

uses
  SysUtils, ZMQ, Math;
var
  fContext: Pointer;
  fSocketFrontEnd: Pointer;
  fSocketBackEnd: Pointer;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketFrontEnd := zmq_socket(fContext, ZMQ_ROUTER);
  fSocketBackEnd := zmq_socket(fContext, ZMQ_DEALER);
  zmq_bind(fSocketFrontEnd, 'tcp://*:5559'); // Конечная точка для клиентов
  zmq_bind(fSocketBackEnd, 'tcp://*:5560'); // Кончная точка для сервисов

  zmq_proxy (fSocketFrontEnd, fSocketBackEnd, nil); // Старт прокси!

  zmq_close(fSocketFrontEnd);
  zmq_close(fSocketBackEnd);
  zmq_ctx_destroy(fContext);
  Readln;

end.




Дальше будет еще интереснее.

...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38759721
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Важно.
Видов сокетов, которые практически можно использовать в брокере:

Код: plaintext
1.
2.
ROUTER - DEALER
XSUB   - XPUB
PULL   - PUSH.
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38759954
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Мосты.

Вопрос: "как передать сообщения из одной подсети в другую?"
Или даже - из сети с протоколом tcp в сеть pgm .

Вариант решения - с помощью моста .

В качестве моста используем только что рассмотренный прокси (брокер сообщений).
То есть, "мост" - это маленькое приложение, которое общается одним сокетом по одному протоколу, а другим - по другому.
Ну и преобразовывает сообщения в подходящий для протокола вид, если нужно.

В качестве примера напишем крошечный прокси, который, находясь между издателем и множеством подписчиков, соединяет две разные сети.
Вернемся к примеру с метеостанцией. 16583825
Предположим, что сервер-издатель (которые измеряет температуру, давление и проч) работает во внутренней сети, часть подписчиков - тоже во внутренней. А еще часть - во внешней.
Создаем прокси, в которой фронтэнд сокет (SUB) будет общаться с внутренней сетью, а бэкэнд сокет (PUB) - с внешней.
Прокси будет подписываться фронтэнд-сокетом на события сервиса погоды и пере-публиковывать их на бэкэнд-сокете.
Прокси для метео: "мост в интернет"
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
program BrRR_BrokerMeteoInterNet;

{$APPTYPE CONSOLE}
// Брокер для проекта "Метео"
// Реализует мост между интрасаетью метеосервера
// и внешней сетью. Размещается во внутренней сети,
// для брокера открыт "наружу" tcp порт 8100.
// "Внешние" клиенты коннектятся к известной конечной точке tcp://10.1.1.0:8100
// Коннектит сокет ZMQ_XSUB с известной конечной точкой метеосервера tcp://192.168.55.210:5556
// Связывает сокет ZMQ_XPUB с tcp://10.1.1.0:8100,
// Перекидывает сообщения подписки между сокетами от FrontEnd к BackEnd
// и обратно

uses
  SysUtils, ZMQ;
var
  fContext: Pointer;
  fSocketFrontEnd: Pointer;
  fSocketBackEnd: Pointer;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketFrontEnd := zmq_socket(fContext, ZMQ_XSUB);
  fSocketBackEnd := zmq_socket(fContext, ZMQ_XPUB);
  zmq_connect(fSocketFrontEnd, 'tcp://192.168.55.210:5556'); // Конечная точка метеосервиса
  zmq_bind(fSocketBackEnd, 'tcp://10.1.1.0:8100'); // Конечная точка для "внешних" подписчиков
//  zmq_bind(fSocketBackEnd, 'tcp://*:8100'); // Можно и так, главное - чтобы внешние клиенты знали адрес

  zmq_proxy (fSocketFrontEnd, fSocketBackEnd, nil); // Старт прокси

  zmq_close(fSocketFrontEnd);
  zmq_close(fSocketBackEnd);
  zmq_ctx_destroy(fContext);
  Readln;

end.




Очень похоже на код предыдущего прокси, но используется для трансляции сообщений из одной подсети в другую. Точно так же подобный прокси - мост можно использовать, например, для подключения подписчиков в мультикаст PGM сети с издателем в TCP.
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38759956
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччД...прокси, который, находясь между издателем и множеством подписчиков, соединяет две разные сети...
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38759961
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38759965
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччДКино нашел, на русском:

http://wiki.4intra.net/0MQ_—_Сокеты_на_стероидах_(Сергей_Гулько,_OSDN-UA-2012)

...У ZeroMQ пропускная способность выше, чем у TCP/IP, хотя ZeroMQ работает over TCP/IP...

Поискал - "каким же образом?"
Нашел кое-какое описание, вроде специальной упаковки сообщений ZMQ в пакеты tcp.
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38760163
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Обработка ошибок. ETERM.

Обработка ошибок ZeroMQ основана на двух положениях:
- считается, что процессы уязвимы от внутренних ошибок
- считается, что внешние ошибки (и атаки) можно обработать (отразить атаки).

В качестве примера приводится функционирование живой клетки, которая самоуничтожается при внутреннем сбое и максимально долго борется с внешними угрозами.

Надежность кода должна обеспечиваться использованием Assert(). Срабатывание Assert() вызывает либо завершение приложения, либо исключительную ситуацию.
Когда ZeroMQ обнаруживает внешнюю проблему, она возвращает соответствующий код завершения. В редких случаях, ZMQ показывает сообщения "молча", если нет очевидной стратегии, позволяющей восстановиться после ошибки.

В рассмотренных ранее примерах обработки ошибок не было.
В реальном коде необходимо анализировать код завершения вызова каждого метода ZMQ.


Существует несколько простых правил, ноги которых растут еще с соглашений POSIX:

- Методы, которые создают объекты, в случае ошибки возвращают nil;
- Методы, которые обрабатывают данные, могут вернуть число обработанных байт, или -1 в случае ошибки;
- Другие методы возвращают 0, когда все ОК, и, в случае ошибки - ненулевой код ошибке;
- Код ошибки доступен через errno (для ОС POSIX) или через метод zmq_errno();
- Описание ошибки (например, для логирования) можно получить с помощью метода zmq_strerror().

Пример:
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
  fContext := zmq_ctx_new(); // Инициализация
  Assert(fContext <> nil);

  fSocketFrontEnd := zmq_socket(fContext, ZMQ_ROUTER);
  Assert(fSocketFrontEnd <> nil);

  fSocketBackEnd := zmq_socket(fContext, ZMQ_DEALER);
  Assert(fSocketBackEnd <> nil);

  fRC := zmq_bind(fSocketFrontEnd, 'tcp://*:5559'); // Конечная точка для клиентов
  Assert(fRC <> -1, 'Bind failed: tcp://*:5559');

  fRC := zmq_bind(fSocketBackEnd, 'tcp://*:5560'); // Кончная точка для сервисов
  Assert(fRC <> -1, 'Bind failed: tcp://*:5560');




Есть две исключительные ситуации, которые могут обрабатываться как некритические:
- когда ваш код принимает сообщение с ZMQ_NOWAIT ("асинхронно"), но данных не ожидается. ZMQ вернет -1 и установит код ошибка равным EAGAIN ;
- когда один поток вызывает zmq_ctx_destroy() , а второй все еще выполняет работу в блокирующем режиме, то вызов zmq_ctx_destroy() вызывает закрытие контекста и всех блокирующих вызовов с кодом завершения -1, а код ошибки устанавливается равным ETERM .

После того, как код будет отлажен и "вылизан", Asserts() могут быть отключены опциями компиляции. Однако, не стоит компилировать саму библиотеку ZMQ с отключенными assert() - запросто можно прозевать проблему в самом неожиданном месте.


Рассмотрим способы "чистого" завершения процессов. В качестве примера возьмем параллельный трубопровод из примера: 16607833 .

Мы возьмем пример параллельного газопровода из предыдущего раздела. Если мы в фоне стартовали множество рабочих процессов "Worker", то теперь мы хотим уничтожить их всех, когда все пакетное задание будет выполнено. Будем делать это, отправляя рабочим процессам сообщение "умри!". Этим будет заниматься процесс - сборщик "Synk" (так как он знает, когда завершается пакетное задание).

Каким же образом подключить сборщик "Synk" к рабочим процессам? Сокеты PUSH/PULL допускают передачу только в одну сторону. Можно использовать сокеты другого типа, или смешать несколько потоков. Попробуем следующее: используем схему "Издатель-Подписчик" (pub-sub) для отправки рабочим процессам сообщения "умри!".

Описание:
Сборщик ("Sink") создает сокет - издатель (PUB) в новой конечной точке.
Рабочие процессы ("Worker") связывают свои входные сокеты с этой конечной точкой.
Когда сборщик ("Sink") определяет, что задание выполнено, он посылает сигнал "умри!" в свой сокет - издатель (PUB).
Когда рабочий процесс ("Worker") обнаруживает сообщение "умри!", он завершается.

Сборщик почти не меняется, добавится еще один сокет и отправка сообщения "умри!":
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
var
...
  fSocketController: Pointer;
...
begin
...
  // Сокет для отправки сигнала "умри!"
  fSocketController := zmq_socket(fContext, ZMQ_PUB);
  zmq_bind(fSocketController, 'tcp://*:5559');
...
  // Отправка сигнала "умри!" рабочим процессам
  zmq_send(fSocketController, PChar('KILL')^, 4, 0);
...



Вот полный код сборщика:
Сборщик "Sink"
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
program PL_Sink;

{$APPTYPE CONSOLE}

uses
  SysUtils, Windows, ZMQ;

const
  c_task_count = 100;

var
  fContext: Pointer;
  fSocketReceiver: Pointer;
  fSocketController: Pointer;
  fStart: Cardinal;
  fTaskCount: Integer;
  fDummy: string;
begin
 // Инициализация
  fContext := zmq_ctx_new();

  // Сокет для приема сигналов
  fSocketReceiver := zmq_socket(fContext, ZMQ_PULL);
  zmq_bind(fSocketReceiver, 'tcp://*:5558');

  // Сокет для отправки сигнала "умри!"
  fSocketController := zmq_socket(fContext, ZMQ_PUB);
  zmq_bind(fSocketController, 'tcp://*:5559');

  // Ожидание первого сигнала о старте пакета задач
  zmq_recv(fSocketReceiver, PChar(nil)^, 0, 0);

  // Фиксация времени начала выполнения пакета
  fStart := GetTickCount();
  // Ждем подтверждения выполнения от 100 рабочих процессов
  for fTaskCount := 0 to Pred(c_task_count) do begin
    zmq_recv(fSocketReceiver, PChar(nil)^, 0, 0);

    if (fTaskCount mod 10 = 0) then // "Статус - бар" :)
      Write(':')
    else
      Write('.')
  end;
  // Вычисление и показ времени выполнения пакета задач
  Writeln('Total elapsed time: ', GetTickCount() - fStart, 'ms');

  // Отправка сигнала "умри!" рабочим процессам
  zmq_send(fSocketController, PChar('KILL')^, 4, 0);

  zmq_close(fSocketReceiver);
  zmq_ctx_destroy(fContext);
  Readln(fDummy);

end.



Естественно, рабочий процесс "Worker" тоже придется переделать.
Теперь "Worker" управляется двумя сокетами: PULL - для получения задачи, и SUB - для получения команд управления.
Не забываем, что SUB сокет должен быть настроен:
Код: pascal
1.
  zmq_setsockopt(fSocketController, ZMQ_SUBSCRIBE, nil, 0);


Используем технику с zmq_poll (), которую уже применяли ранее.

Код рабочего процесса "Worker"
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
program PL_Worker;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ;

var
  fContext: Pointer;
  fSocketReceiver: Pointer;
  fSocketSender: Pointer;
  fSocketController: Pointer;
  fStrMsg: string;
  fLen: Integer;
  fMsg: zmq_msg_t;
  fDummy: string;
  fPollItems: array[0..1] of pollitem_t;
begin
  fContext := zmq_ctx_new();

 // Сокет для приема сообщений
  fSocketReceiver := zmq_socket(fContext, ZMQ_PULL);
  zmq_connect(fSocketReceiver, 'tcp://localhost:5557');

// Сокет для отправки сообщений
  fSocketSender := zmq_socket(fContext, ZMQ_PUSH);
  zmq_connect(fSocketSender, 'tcp://localhost:5558');

// Сокет для приема управляющх сигналов
  fSocketController := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocketController, 'tcp://localhost:5559');
  zmq_setsockopt(fSocketController, ZMQ_SUBSCRIBE, nil, 0);

  fPollItems[0].socket := fSocketReceiver;
  fPollItems[0].fd := 0;
  fPollItems[0].events := ZMQ_POLLIN;
  fPollItems[0].revents := 0;
  fPollItems[1].socket := fSocketController;
  fPollItems[1].fd := 0;
  fPollItems[1].events := ZMQ_POLLIN;
  fPollItems[1].revents := 0;

// Бесконечный цикл выполнения заданий
  while True do begin
    zmq_poll(fPollItems[0], 2, -1);
    if (fPollItems[0].revents and ZMQ_POLLIN) <> 0 then begin
      zmq_msg_init(fMsg);
      fLen := zmq_msg_recv(fMsg, fSocketReceiver, 0); // Получение задания
      SetLength(fStrMsg, fLen div SizeOf(Char)); // Перевод буфера сообщения в строку
      Move(zmq_msg_data(fMsg)^, PChar(fStrMsg)^, fLen);
      Writeln(fStrMsg); // Отображение в консоли процесса "работы"
      Sleep(StrToInt(fStrMsg)); // "Полезная" работа...
      zmq_send(fSocketSender, PChar(fDummy)^, 0, 0);   // Отправка сигнала сборщику результата
    end;
   // Все, что получаем из контроллера, считаем командой 'KILL'
    if (fPollItems[1].revents and ZMQ_POLLIN) <> 0 then
      Break; // Выход из цикла обработки
  end;
  zmq_close(fSocketReceiver);
  zmq_close(fSocketSender);
  zmq_close(fSocketController);
  zmq_ctx_destroy(fContext);
//  Readln(fDummy);

end.




Теперь запускаем: процесс "Sink", один или несколько процессов "Worker", процесс "Ventilator". В консоли процесса "Ventilator" жмем Enter и наблюдаем примерно такую картину: 16607848
За исключением того, что процессы "Worker" по завершению пакетного задания будут завершены: по завершению работы процесс "Sink" посылает сигнал "умри!" всем подписчикам (процессам "Worker").
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38760164
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Топология только что описанной сетевой задачи:
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38761368
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Обработка Crtl+C для консольных приложений Windows.
Казалось бы, нажали и хрясь - приложение убито. Если бы.

Как было сказано выше - можно получить зависон, особенно если приложение многопоточное.
В общем, надо бы пройтись по всем тредам, сообщить им о своих намерениях, чтобы те завершили работу с соетами

В общем, "все не так однозначно".

Все, что написано в документации - касается всяческих Linux.

Нам, дельфистам, придется использовать специальные обрабочики: подключаем в uses модуль Windows и настраиваем:


Код: pascal
1.
2.
3.
4.
5.
6.
uses
  ...Windows, ...;
...
begin
...
  Windows.SetConsoleCtrlHandler(CtrlCHandler, True);



Шаблон для CtrlCHandler:

Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
function console_handler( dwCtrlType: DWORD ): BOOL;
var
  i: Integer;
begin
  if CTRL_C_EVENT = dwCtrlType then
  begin
    // Выполняем "мягкое" завершение
...
  end else 
    result := False;
end;


Как выполнить "мягкое" завершение - зависит от текущей задачи.
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38761403
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Мультипоточность с ZeroMQ.

ZeroMQ предоставляет простой способ создания многопоточных приложений.
При этом не потребуются ни мьютексы, ни блокировки, ни прочие дела для организации межпоточного взаимодействия, кроме сообщений, отправляемых через сокеты ZMQ.

Есть одно железное правило для успешного многопоточного кодинга - "не разделять между потоками изменяющиеся данные". Обычно, когда два потока в приложении пытаются между собой разделять данные, то это выглядит как будто два алкаша пытаются разделить пиво. И чем больше алкашей собирается за столом, тем хуже ситуация.
Большинство многопоточных приложений похоже сборище пьяных алоголиков, учинивших драку в баре.

Одна известная фирма, производящая то ли рамки, то ли форточки, даже опубликовала статью " Решение 11 вероятных проблем вашего многопоточного кода ", в которой упомянуты всяческие ужастики вроде забытой синхронизации, незаблокированной модификации общих данных, и проч.

Для беспроблемного написания многопоточного кода с помощью ZeroMQ следует руководствоваться следующими правилами:

- Изолируйте данные внутри потока и никогда не разделяйте их между потоками. Единственным исключением являются контексты ZeroMQ, которые являются threadsafe.
- Держитесь подальше от классических механизмов параллелизма, таких как мьютексы, критические секции, семафоры и т.д. Это анти-паттерны в приложениях ZeroMQ.
- Создайте один ZeroMQ контекст в начале вашего процесса и передавайте его всем создаваемым потокам, с которыми будете взаимодействовать через InProc сокеты ZMQ.
- Для создания структуры вашего приложения используйте подключаемые (attach) потоки, и соедините их с их родительскими потоками через сокеты PAIR по протоколу InProc. Порядок работы: привязываем (zmq_bind()) сокет, а затем создаем дочерний поток, который коннектится к сокету родительского потока.
- Используйте отключаемые (detach) для имитации работы самостоятельных задач, с учетом своих условий. Подключите их по протоколу TCP. Позже вы можете переместить их в автономные процессы без значительного изменения кода.
- Все взаимодействия между потоками происходит как ZeroMQ сообщения, которые вы можете определить более или менее формально.
- Не разделять сокеты ZeroMQ между потоками. Сокеты ZeroMQ не потокобезопасны. Технически есть возможность передачи сокета от одного потока к другому, но это требует навыка. Единственное место, где разумно и оправдано разделение сокетов между потоками - это удаление сокетов в деструкторах ваших классовых оберток над ZMQ.


Например.

Предположим, в вашем приложении нужно более одного прокси, и вы хотите, чтобы каждый из них выполнятся в своем потоке. Очень легко допустить ошибку, создав фронтэнд и бэкэнд сокеты такого прокси в одном потоке, а затем передавая сокеты в прокси (который живет в другом потоке). Возможно, на первый раз все заработает, но глюки - неминуемы, причем в совершенно произвольные моменты времени.
Запоминаем: не используем (и не закрываем) сокеты нигде, кроме как потоках, их создавших.

Если следовать перечисленным правилам, можно легко создавать надежные многопоточные приложения. Логика приложения может размещаться потоках, процессах, или узлах сети, в соответствии с текущими планами по захвату мира.

ZeroMQ использует нативные потоки ОС (Windows), а не виртуальные "зеленые" потоки. Для отладки можно использовать стандартные средства, вроде ThreadChecker от Intel, чтобы увидеть, что ваше приложение делает. К недостаткам использования нативных потоков можно отнести, что API-интерфейсы "родной" многопоточности конкретной ОС не всегда портируются, и, к примеру, если вы используете огромное количество потоков (тысячи), то некоторые ОС просто не потянут такой нагрузки.
...
Перейдем к практике. Превратим наш старый сервер 16562041 , в нечто более работоспособное.
Старый сервер был однопоточным. Если обслуживание каждого запроса было легким, то это нормально: один ZMQ поток может работать на полной скорости ядра процессора без ожидания и выполнять очень много работы.
Но серверы из реальной жизни на каждый запрос делают более сложную работу. Одноядерного сервера может оказаться недостаточно, когда по серверу жахнет сразу 10000 клиентов. Сервер из реальной жизни будет запускать несколько рабочих потоков. После чего он будет принимать запросы так быстро, как это возможно и раздавать их своим своих рабочим потокам.
Рабочие потоки будут "перемалывать цифирь" и отправлять результаты обратно.

Конечно, это можно сделать с помощью прокси-брокера и внешних рабочих процессов, но часто легче начать один процесс, который займет все шестнадцать ядер, чем шестнадцать процессов, каждый из которых жрет одно ядро. Кроме того, рабочие процессы будут занимать линии связи и поглощать сетевой трафик и просто тормозить.
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38761427
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Многопоточный сервис:
Сервис
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
program RR_Service;

{$APPTYPE CONSOLE}
// Многопоточный сервис (сервер)
// Находится в известной клиентам конечной точке, "слушает" tcp порт 5555,
// к которому привязан сокет ZMQ_ROUTER
// Получает целое число, возводит в квадрат и отправляет обратно результат

uses
  SysUtils, ZMQ;

procedure Worker_Routine(aContext: Pointer); // Процедура потока
var
  fSocketReceiver: Pointer;
  fSrcVal: Cardinal;
  fResultVal: UInt64;
  i: integer;
begin
// Сокет для общения с диспетчером
  fSocketReceiver := zmq_socket(aContext, ZMQ_REP);
  zmq_connect(fSocketReceiver, 'inproc://worker');
  i := 0;
  while True do
  begin
    zmq_recv(fSocketReceiver, fSrcVal, SizeOf(fSrcVal), 0); // Запрос
    fResultVal := UInt64(fSrcVal) * UInt64(fSrcVal); // "Полезная работа"
    zmq_send(fSocketReceiver, fResultVal, SizeOf(fResultVal), 0); // Ответ
    Writeln('In thread, Iter: ', i, ' src=', fSrcVal, ' result=', fResultVal);
    Inc(i);
  end;
  zmq_close(fSocketReceiver);
end;

var
  fContext: Pointer;
  fSocketClients: Pointer;
  fSocketWorkers: Pointer;
  i: integer;
  fThreadId : Cardinal;
  fRez : Integer;
begin

  fContext := zmq_ctx_new(); // Инициализация

  fSocketClients := zmq_socket(fContext, ZMQ_ROUTER);
  fRez := zmq_bind(fSocketClients, 'tcp://*:5555'); // Привязка к конечной точке

  fSocketWorkers := zmq_socket(fContext, ZMQ_DEALER);
  fRez := zmq_bind(fSocketWorkers, 'inproc://worker');// Привязка к конечной точке

  Writeln('Starting service...');
  for i := 0 to 2 do  // Запуск пула рабочих потоков
    BeginThread(nil, 0, @Worker_Routine, fContext, 0, fThreadId);

// Соединение рабочих потоков с потоками клиентов через очередь
  zmq_proxy(fSocketClients, fSocketWorkers, nil);

  zmq_close(fSocketClients);
  zmq_close(fSocketWorkers);
  zmq_ctx_destroy(fContext);
  Readln;

end.




Клиент - все тот же:
Клиент
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
program RR_Client;

{$APPTYPE CONSOLE}
// Клиент
// Коннектится сокетом REQ к tcp://localhost:5555
// Шлет целое число сервису (серверу), обратно получает квадрат числа

uses
  SysUtils, ZMQ;
const
  c_ReqCnt = 100;
var
  fContext: Pointer;
  fSocketRequester: Pointer;
  fSrcVal: Cardinal;
  fResultVal: UInt64;
  i: integer;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketRequester := zmq_socket(fContext, ZMQ_REQ);
  zmq_connect(fSocketRequester, 'tcp://localhost:5555');
    // Коннект к сервису
  Randomize();
  for i := 0 to Pred(c_ReqCnt) do begin
    fSrcVal := Cardinal(Random(-1));
      // Генерация случайного целого
    zmq_send(fSocketRequester, fSrcVal, SizeOf(fSrcVal), 0); // Запрос
    Write('Iter: ', i, ' src=', fSrcVal);
    zmq_recv(fSocketRequester, fResultVal, SizeOf(fResultVal), 0); // Ответ
    Writeln(' result=', fResultVal);
  end;
  zmq_close(fSocketRequester);
  zmq_ctx_destroy(fContext);
  Readln;

end.




Прежде всего интересен сервис ("сервер"). Основной поток приложения поочередно транслирует запросы от сокета fSocketClients (сокет типа ZMQ_ROUTER) к сокету fSocketWorkers (сокет типа ZMQ_DEALER) и обратно, используя знакомый метод zmq_proxy().
То есть, основной поток является брокером ("прокси") сообщений.
Для обработки данных запущено три потока. Процедура потока - Worker_Routine(). В процедуре потока создается сокет fSocketReceiver типа (ZMQ_REP). Сокет fSocketReceiver рабочего потока связан с сокетом fSocketWorkers основного потока по inproc протоколу.
Сокет рабочего потока:
Код: pascal
1.
2.
3.
// Сокет для общения с диспетчером
  fSocketReceiver := zmq_socket(aContext, ZMQ_REP);
  zmq_connect(fSocketReceiver, 'inproc://worker');



Сокет основного потока:
Код: pascal
1.
2.
  fSocketWorkers := zmq_socket(fContext, ZMQ_DEALER);
  fRez := zmq_bind(fSocketWorkers, 'inproc://worker');// Привязка к конечной точке


Естественно, вместо inproc - протокола можно было бы использовать tcp, но пришлось бы занять порт и, кроме того, inproc гораздо быстрее.

Еще раз: работа сервиса.

Сервис запускает несколько рабочих потоков. Каждый рабочих поток создает сокет типа REP и затем в цикле обрабатывает запросы к сокету. Рабочие потоки представляют собой обычные однопоточные сервисы, разница - в транспорте (inproc вместо tcp), и в направлении операции "привязать - подключить" (bind-connect).

Сервер создает сокет типа ROUTER, чтобы общаться с клиентами и связывает его (сокет) с внешним интерфейсом по tcp.

Сервис создает сокет типа DEALER, чтобы общаться с рабочими процессами и связывает этот сокет с внутренним интерфейсом (с помощью inproc).

Сервер зарускает прокси, который соединяет оба сокета. Прокси получает входящие запросы от всех клиентов и распределяет эти запросы между рабочими потоками. При этом он правильно выполняет маршрутизацию ответов так, чтобы они вернулись туда, откуда приходили запросы.

То есть, цепочка запрос - ответ выглядит так: REQ-ROUTER-очередь-DEALER-REP.
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38761452
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Получилась вот такая структура:
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #38761456
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
В выводе сервиса видно, что номера итераций повторяются по три раза - по числу запущенных потоков.
...
Рейтинг: 0 / 0
25 сообщений из 260, страница 2 из 11
Форумы / Delphi [игнор отключен] [закрыт для гостей] / ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]