powered by simpleCommunicator - 2.0.61     © 2026 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / Delphi [игнор отключен] [закрыт для гостей] / Cервер обработки сообщений
38 сообщений из 38, показаны все 2 страниц
Cервер обработки сообщений
    #38743676
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Есть сотня клиентов, каждый из которых ~ сто раз в секунду что-то хочет от сервера.
Это "что-то" (чего хотят клиенты от сервера) небольшое и уже (почти) готовое - "только спроси, сразу отдам".

Протокол tcp.

Возможно, клиентов будет не сто, а тыща.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743679
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Давно хотел ZeroMQ потрогать.

Кроме чтения описания и небольших тестов - не пробовал.

Только другим советы давал...


Для Delphi есть библиотека - оболочка: https://github.com/bvarga/delphizmq
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743681
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Засада:

The package contains a wrapper (zmq.pas), and a higher level api (zmqapi.pas). It should work with ZMQ 2.2.x, and with 3.2.x.

А последний Stable Release на сегодня - 4.0.4.

Берем предыдущую 3.2.4, от греха подальше: http://miru.hk/archive/ZeroMQ-3.2.4~miru1.0-x86.exe

Из всей инсталляции нужна только библиотека libzmq-v90-mt-3_2_4.dll (ядро системы обмена сообщения, скомпилированная MS VS 2008, "релизный" вариант) -> переименовываем в libzmq.dll и помещаем в каталог с будущими исполняемыми файлами.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743682
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Не забываем, что для приложений, созданных в MS VS 2008, требуется "редистрибутабле пацк": http://www.microsoft.com/ru-ru/download/details.aspx?id=5582
Скорее всего, он у всех уже установлен, но мало ли.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743683
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Инсталлер можно не грузить, а построить библиотечку самостоятельно, из исходников: http://download.zeromq.org/zeromq-3.2.4.zip

Для построения нужна MS VS 2008 Pro для С++. Можно обойтись и бесплатной Express версией MS VS, но тогда пакет библиотек для построения в Windows придется грузить отдельно.

В инструкции пишут, что можно обойтись и MinGW: http://zeromq.org/build:mingw Однако, лично мне этого сделать так и не удалось.

В общем, MS VS 2008. Можно и более новую студию но, пишут, что приложения не будет работать на Win2K и WinXP.

Распаковываем скаченные исходники, открываем каталог builds\msvc, открываем из MS VS "решение" msvc.sln
Выбираем конфигурация "Release" и даем команду "построить решение" (F7).
Идём пить чай, это не дельфи, это минут на десять.
...
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743685
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Готово.
В подкаталоге \lib появилась нужная библиотека: libzmq.dll. Какая-то она подозрительно маленькая (всего 186кБ), по сравнению с теми вариантами, что в инсталляторе были, но, надеюсь, это не беда :
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743687
Фотография Judo
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччД - а зачем ты все это написал ?
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743688
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Кроме "любезно приложенной мной" () dll - ки, для дельфикодинга нужен фал, описывающих функции в этой библиотеке.
Это есть вот здесь: https://codeload.github.com/bvarga/delphizmq/zip/master

Из всего, что скачано, для работы потребуется совсем чуть-чуть. Нужны два файлика:

1. zmq.pas
2. zmq.inc

Если хочется работать не с функциями dll, а использовать дельфийские классы - обертки, то можно использоват еще два файлика:

3. zhelpers.pas
4. zmqapi.pas

Как работать - примерно описано в файле README.md.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743690
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Всё! Будем писать сверхскоростной сервер обработки сообщений. :)

...пошел варить кофе.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743692
Фотография Judo
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
READMEYou should use the higher level api, which'll save you a lot of time, and incidentally
the code'll be easier to read.

Очередная реинкарнация )
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743693
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Дополнение к ТЗ ( 16562010 ): сервер получает беззнаковое 32 - разрядное целое и возвращает квадрат этого числа в беззнаковом 64м целом.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743698
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
В общем, все вышеупомянутые .pas и inc файлы складываем в одно место и указываем Delphi, чтобы она это место знало (например, добавляем директорию в library path).

И так, сервер, готов:
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
program ConsoleServer;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ;
var
  fContext: Pointer;
  fResponder: Pointer;
  fStatus: Cardinal;
  fInValue: Cardinal;
  fOutValue: UInt64;

begin
  fContext := zmq_ctx_new();
  fResponder := zmq_socket(fContext, ZMQ_REP);
  fStatus := zmq_bind(fResponder, 'tcp://*:5555');
  assert(fStatus = 0);
  Writeln('Starting...');
  while (True) do begin
    zmq_recv(fResponder, fInValue, SizeOf(fInValue), 0);
    Writeln('Received: ', fInValue);
    fOutValue := fInValue * fInValue;
    zmq_send(fResponder, fOutValue, SizeOf(fOutValue), 0);
  end;

end.


Сервер в цикле слушает порт №5555, читает из порта число и пишет значение квадрата числа.

Надеюсь, он будет работать безупречно. А пока он только компилируется, стартует и выводит Starting...
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743706
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Клиент:

Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
program ConsoleClient;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ;

const
  c_iter = 999999;
var
  context: Pointer;
  requester: Pointer;
  i: Integer;
  f_ScrValue: Cardinal;
  f_qValue: UInt64;

begin
  Writeln('Client starting...');
  Randomize();

  context := zmq_ctx_new();
  requester := zmq_socket(context, ZMQ_REQ);
  zmq_connect(requester, 'tcp://localhost:5555');

  for i := 0 to c_iter do begin
    f_ScrValue := Random(-1);
    Writeln('Sending ', f_ScrValue);
    zmq_send(requester, f_ScrValue, SizeOf(f_ScrValue), 0);
    zmq_recv(requester, f_qValue, SizeOf(f_qValue), 0);
    Writeln('Received ', f_qValue);
  end;

  zmq_close(requester);
  zmq_ctx_destroy(context);
end.



Судя по выводимым сообщениям, все работает правильно.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743707
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Нужно убрать из циклов тормозящие Writeln(), запустить сервер на удаленном компе и оценить скорость работы с несколькими клиентами...
...
...завтра.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743728
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Меряю.
Разное количество клиентов. Клиенты запускаются и ждут, пока запустится сервер.
Сервер выполняет 1 млн запросов и выводит сообщение о времени выполнения.
...
При попытке выполнить 1024 коннекта получил сообщение "assertion failed fds.size() <= fd_setsize"...
Посмотрел в исходниках dll на этот самый fd_setsize - он там действительно FD_SETSIZE равен 1024.
Просто уменьшил число клиентов до 1000.
...
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743729
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
..
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743732
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
То есть, после двух сотен клиентов - на пределе, еле справляется.
Нужно попробовать разнести серверную части в несколько потоков.

...
Вообще, клиенты могут сами регулировать нагрузку.
Все, что нужно - выполнить привязку сокета к двум верверам:


Код: pascal
1.
2.
 zmq_connect(requester, 'tcp://Host1:5555');
 zmq_connect(requester, 'tcp://Host2:5555');


- и все. Запросы будут поочередно отсылаться разным серверам.
Но хочется выжать все из одного процесса сервера.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743742
prog123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
чччДТо есть, после двух сотен клиентов - на пределе, еле справляется.
Нужно попробовать разнести серверную части в несколько потоков.

...
Вообще, клиенты могут сами регулировать нагрузку.
Все, что нужно - выполнить привязку сокета к двум верверам:


Код: pascal
1.
2.
 zmq_connect(requester, 'tcp://Host1:5555');
 zmq_connect(requester, 'tcp://Host2:5555');


- и все. Запросы будут поочередно отсылаться разным серверам.
Но хочется выжать все из одного процесса сервера.


Профилировщиком шмальни, где он супостат чешетца..
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38743821
s62
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччД,
А написать сервер на портах завершения ввода/вывода (IO completion ports)? Пишут, что это чуть не лучший вариант под Windows для обработки многих запросов.
http://msdn.microsoft.com/ru-ru/library/windows/desktop/aa365198(v=vs.85).aspx
http://habrahabr.ru/post/145140/
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38744151
fd00ch
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччД, не пробовал в тот же сценарий запрячь что-либо из этого набора , чтобы потом можно было как-то сравнить результаты?
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38748646
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Интересно, что длина сообщений может быть любой (почти: Integer), просто короткие сообщения доставляются прямо в теле сообщения, а для длинных специально автоматически выделяется память и в сообщении - адрес начала блока.
После чтения следующего сообщения предыдущее "пропадает".
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38748653
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
...в общем, выяснилось, что клиентов не будет больше сотни, и скорость обработки вполне приемлима.

...
~~~~~~~~~~~~~~~~~~~~~~~~

Таким образом, был рассмотрен режим обмена сообщениями по схеме "Запрос-Ответ" (Request-Reply).
Схема работы "Запрос-Ответ": сервер слушает сокет, принимает сообщения от всех желающих, и отвечает на запросы.

...
~~~~~~~~~~~~~~~~~~~~~~~~

Бывает, что нужно по-другому. Сервер должен оповестить клиентов о каком-то событии.
Клиенты отправляют серверу заявку, что они готовы получать сообщения о наступившем событии ("подписываются" на событие).
Одних клиентов могут интересовать одни события, других - другие.
А сервера вообще не волнует, кому из клиентов что нужно. Сервер - это как бы радиоприемник, вещающий в эфир. Кто слушает - молодец. А кто не слушает - тот ССЗБ.

Такая схема называется "Издатель - Подписчик" (Publisher-Subscriber).

Пример.

Автоматическая метеостанция измеряет температуру, атмосферное давление и скорость ветра. Результаты измерений время от времени (например, после завершения цикла измрений) передаются "всем заинтересованным лицам".
Кого-то интересует всё, кому-то нужна температура, кого-то волнует только скорость ветра.

Пишем код метеостанции (), то есть, сервер-издатель :
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38748801
dred2k
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Через сокеты не тестил ? Synapse, к примеру.
Интересно, сильно уступит по скорости.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38748966
Dimitry Sibiryakov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччДА сервера вообще не волнует, кому из клиентов что нужно. Сервер - это как бы
радиоприемник, вещающий в эфир. Кто слушает - молодец. А кто не слушает - тот ССЗБ.
В смысле, используешь UDP broadcast или TCP multicast?
Posted via ActualForum NNTP Server 1.5
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38749337
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Dimitry SibiryakovчччДА сервера вообще не волнует, кому из клиентов что нужно. Сервер - это как бы
радиоприемник, вещающий в эфир. Кто слушает - молодец. А кто не слушает - тот ССЗБ.
В смысле, используешь UDP broadcast или TCP multicast?

ZeroMQ вроде точно не использует UDP.
Как реализовано - можно посмотреть в исходниках libzmq.dll.

Прикладной программист выбирает транспортный уровень для сокета с помощью zmq_bind:

Код: pascal
1.
  zmq_connect(requester, 'tcp://localhost:5555');


...

Заявлено, что ZeroMQ позволяет обмениваться между разными системами "в сети", между процессами и между потоками в рамках процесса.

Я ничего, кроме tcp пока не пробовал.

В документации пишут, что в данны момент поддерживается работа на базе TCP, IPC (на POSIX), inproc, TIPC ("Transparent IPC" от Ericsson), SCTP, PGM, NORM и SOCKS5 (насчет последнего упоминают особо - типа, теперь ZeroMQ может и через сеть Tor ходить)
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38749894
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччД...

Бывает, что нужно по-другому. Сервер должен оповестить клиентов о каком-то событии.
Клиенты отправляют серверу заявку, что они готовы получать сообщения о наступившем событии ("подписываются" на событие).
Одних клиентов могут интересовать одни события, других - другие.
А сервера вообще не волнует, кому из клиентов что нужно. Сервер - это как бы радиоприемник, вещающий в эфир. Кто слушает - молодец. А кто не слушает - тот ССЗБ.

Такая схема называется "Издатель - Подписчик" (Publisher-Subscriber).

Пример.

Автоматическая метеостанция измеряет температуру, атмосферное давление и скорость ветра. Результаты измерений время от времени (например, после завершения цикла измрений) передаются "всем заинтересованным лицам".
Кого-то интересует всё, кому-то нужна температура, кого-то волнует только скорость ветра.

Пишем код метеостанции (), то есть, сервер-издатель :

Сервер.
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
program PS_Server;
{$APPTYPE CONSOLE}
uses
  SysUtils, ZMQ;

var
  fContext: Pointer;
  fSocket: Pointer;
  fDummy: string;
  fMsgStr: string;
  fMessage: zmq_msg_t;

begin
  fContext := zmq_ctx_new();
  fSocket := zmq_socket(fContext, ZMQ_PUB);
  zmq_bind(fSocket, 'tcp://*:4040');
  Writeln('Publisher started...');
  Randomize;

  while True do begin
    Sleep(100); // Типа измеряет что-то
    // Температура
    fMsgStr := Format('Temperature : %d C', [20 - Random(40)]);
    zmq_msg_init(fMessage); // Инициализация zmq_msg_t
    zmq_msg_init_size(fMessage, Length(fMsgStr) * SizeOf(Char)); // Резервирование памяти
    // Копирование данных из строки в буфер сообщения:
    move(PChar(fMsgStr)^, zmq_msg_data(fMessage)^, Length(fMsgStr) * SizeOf(Char));
    zmq_msg_send(fMessage, fSocket, 0); // Пересылка
    zmq_msg_close(fMessage); // Всё

    // Атм. давление
    fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]);
    zmq_msg_init(fMessage);
    zmq_msg_init_size(fMessage, Length(fMsgStr) * SizeOf(Char));
    move(PChar(fMsgStr)^, zmq_msg_data(fMessage)^, Length(fMsgStr) * SizeOf(Char));
    zmq_sendmsg(fSocket, fMessage, 0);
    zmq_msg_close(fMessage);

    // Скорость ветра
    fMsgStr := Format('Wind : %d m/s', [Random(10)]);
    zmq_send(fSocket, PChar(fMsgStr)^, Length(fMsgStr) * SizeOf(Char), 0);
  end;
  zmq_ctx_destroy(fContext);
  Readln(fDummy);
end.



Инициализация: все то же самое, что и для работы по шаблону "Запрос - Ответ".
За исключением того сокет создается с опцией ZMQ_PUB (сокет - издатель):

Код: pascal
1.
zmq_socket(fContext, ZMQ_PUB);



Рабочий цикл: сервер только отправляет сообщения подписчикам (методы zmq_msg_send, zmq_sendmsg, zmq_send).
Попытка получить данные выбросит исключение.
....
В каждом рабочем цикле отправляется три сообщения (вернее, "публикуется"): о температуре, о давлении и о скорости ветра.
Для разнообразия показаны разные способы формирования сообщений.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38749895
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
При публикации температуры и давления используется структура типа zmq_msg_t:

Код: pascal
1.
fMessage: zmq_msg_t;


Служебная структура, ничего особенного в ней нет, просто массив из 32 байт.

Код: pascal
1.
2.
3.
4.
type
  zmq_msg_t = record
    _: Array[0..32-1] of Byte;
  end;



Чтобы переслать данные, нужно попросить библиотеку ZMQ инициализировать область памяти нужного размера. А затем заполнить этк область:

Код: pascal
1.
2.
3.
4.
    zmq_msg_init(fMessage); // Инициализация zmq_msg_t
    zmq_msg_init_size(fMessage, Length(fMsgStr) * SizeOf(Char)); // Резервирование памяти
    // Копирование данных из строки в буфер сообщения:
    move(PChar(fMsgStr)^, zmq_msg_data(fMessage)^, Length(fMsgStr) * SizeOf(Char));


А затем - отправить сообщение, используя метод zmq_msg_send или zmq_sendmsg:

Код: pascal
1.
    zmq_msg_send(fMessage, fSocket, 0); // Пересылка

или
Код: pascal
1.
    zmq_sendmsg(fSocket, fMessage, 0);


...
В нашем случае мы уже имеем готовый буфер с данными - строку fMsgStr:
Код: pascal
1.
2.
    // Атм. давление
    fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]);


Можно не заморачиваться с zmq_msg_t и сразу вызвать метод zmq_send:

Код: pascal
1.
    zmq_send(fSocket, PChar(fMsgStr)^, Length(fMsgStr) * SizeOf(Char), 0);
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38749897
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
То есть, сервер - издатель получается еще короче:

Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
program PS_Server;
{$APPTYPE CONSOLE}
uses
  SysUtils, ZMQ;

var
  fContext: Pointer;
  fSocket: Pointer;
  fDummy: string;
  fMsgStr: string;

begin
  fContext := zmq_ctx_new();
  fSocket := zmq_socket(fContext, ZMQ_PUB);
  zmq_bind(fSocket, 'tcp://*:4040');
  Writeln('Publisher started...');
  Randomize;

  while True do begin
    Sleep(100); // Типа измеряет что-то
    // Температура
    fMsgStr := Format('Temperature : %d C', [20 - Random(40)]);
    zmq_send(fSocket, PChar(fMsgStr)^, Length(fMsgStr) * SizeOf(Char), 0);

    // Атм. давление
    fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]);
    zmq_send(fSocket, PChar(fMsgStr)^, Length(fMsgStr) * SizeOf(Char), 0);

    // Скорость ветра
    fMsgStr := Format('Wind : %d m/s', [Random(10)]);
    zmq_send(fSocket, PChar(fMsgStr)^, Length(fMsgStr) * SizeOf(Char), 0);
  end;
  zmq_ctx_destroy(fContext);
  Readln(fDummy);
end.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38749898
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Еще раз: сервер - издатель публикует данные асинхронно, ему в общем случае начхать на клиентов - подписчиков.

То есть, если клиента запустить после запуска сервера, то все , что было опубликовано ранее, теряется.
Поэтому, вероятно, имеет смысл запускать сначала подписчиков, а потом уже сервер - издатель.
...
~~~~~~~~~~~~~~~~~~~~

Теперь разберёмся с клиентами (подписчиками).
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38749900
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Клиент.

Клиент-подписчик получает извещения, читает данные и показывает их с помощью Writeln().

Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
program PS_Client;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ;

var
  fContext: Pointer;
  fSocket: Pointer;
  fMessage: zmq_msg_t;
  fDummy: string;
  fMsgStr: string;
  fLen: Integer;

begin
  fContext := zmq_ctx_new(); // Инициализация
  fSocket := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocket, 'tcp://localhost:4040');
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nil, 0); // Настройка сокета

  Writeln('Subscriber started...');

  while True do begin
    zmq_msg_init(fMessage);
    fLen :=  zmq_msg_recv(fMessage, fSocket, 0); // Прием данных
    SetLength(fMsgStr, fLen div SizeOf(Char)); // Формирование буфера строки
    Move(zmq_msg_data(fMessage)^, PChar(fMsgStr)^, fLen); // Копирвоание данных
    Writeln(fMsgStr);
    zmq_msg_close(fMessage);
  end;
  zmq_ctx_destroy(fContext);
  Readln(fDummy);
end.



Инициализация: - все почти так же, как и для шаблона "Запрос - Ответ". Однако, сокет создается с опцией ZMQ_SUB, а после вызова zmq_connect() дополнительно вызывается метод zmq_setsockopt() с опцией ZMQ_SUBSCRIBE:

Код: pascal
1.
2.
3.
4.
  fContext := zmq_ctx_new();
  fSocket := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocket, 'tcp://localhost:4040');
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nil, 0);



Рабочий цикл. В цикле выполняется синхронное чтение сообщения из сокета и вывод его с помощью Writeln().
Так как сообщения могут быть разными и разной длины, мы используем метод zmq_msg_recv(), который работает со структурой zmq_msg_t.
При этом сообщение считывается полностью в зарезервированную системой область. Метод zmq_msg_recv() возвращает количество считанных байт. Кроме того, количество принятых байт можно узнать, используй метод
Код: pascal
1.
fLen := zmq_msg_size(fMessage)

.
Сами данные находятся в буфере, адрес которого можно узнать с помощью метода zmq_msg_data().

Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
  while True do begin
    zmq_msg_init(fMessage);
    fLen :=  zmq_msg_recv(fMessage, fSocket, 0); // Прием данных
    SetLength(fMsgStr, fLen div SizeOf(Char)); // Формирование буфера строки
    Move(zmq_msg_data(fMessage)^, PChar(fMsgStr)^, fLen); // Копирвоание данных
    Writeln(fMsgStr);
    zmq_msg_close(fMessage);
  end;



Все, клиент готов.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38749901
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Если запустить сервер - издатель и несколько клиентов - подписчиков, увидим, что все "гладко и сладко".
Вот они, красавчики:
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38749902
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Эту тупые клиенты-подписчики реагируют на все публикации.

А хорошо бы, одни чтобы реагировали на температуру, другие - на ветер, третьи - на что-нибудь еще.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38750505
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччД...

А хорошо бы, одни чтобы реагировали на температуру, другие - на ветер, третьи - на что-нибудь еще.

Как выяснилось, это сделать несложно. Клиент, настраивая сокет - подписчик, должен указать в параметрах строку фильтра.

Вместо:
Код: pascal
1.
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nil, 0); // Настройка сокета 



- следует указать, например 'Temperature':

Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
const
  cFilter1 = 'Temperature';
  cFilter2 = 'Pressure';
  cFilter3 = 'Wind';
begin
  fContext := zmq_ctx_new(); // Инициализация
  fSocket := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocket, 'tcp://localhost:4040');
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, PChar(cFilter1), SizeOf(cFilter1)); // Настройка сокета


Теперь этот подписчик получит только те сообщения, которые начинаются с 'Temperature'.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38750509
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Фильтров может быть добавлено несколько. В этом случае клиент получит только те сообщения, которые соответствуют любому из фильтров:
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
const
  cFilter1 = 'Temperature';
  cFilter2 = 'Pressure';
  cFilter3 = 'Wind';
begin
  fContext := zmq_ctx_new(); // Инициализация
  fSocket := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocket, 'tcp://localhost:4040');
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, PChar(cFilter1), SizeOf(cFilter1)); // Настройка сокета
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, PChar(cFilter2), SizeOf(cFilter2)); // Настройка сокета



Таким образом, получаем сообщения только о температура и о давлении:
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38750566
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Замечания.

1. Сообщения фильтруются клиентом - подписчиком, не сервером-издателем. Так написано. То есть, клиент на транспортном уровне вроде бы получает все поступающие сообщения. Как мне кажется, это должно здорово снижать производительность системы в случае, когда сообщения длинные и поступают часто. Однако, тесты "на коленке" (много разных "длинных" сообщений + фильтр) показали, что это не так. Возможно, при включенной фильтрации клиент получает не сообщение целиком, а только его начальную часть, для сравнения с фильтром. Если сообщение "не годится", оставшаяся часть не принимается.
Возможно.

2. Сервер публикует сообщения когда нравится (т.е., асинхронно с клиентом), а клиент-подписчик читает их по готовности, начиная с момента коннекта к серверу-издателю. Таким образом, если клиент обрабатывает сообщения слишком медленно, очередь сообщений может стать слишком большой, и может случиться беда. ZeroMQ обрабатывает данную ситуацию в зависимости от того, как вы настроите сокет-подписчик и сокет-издатель.


Каждое соединение между исходящим сокетом (на сервере) и входящем (на клиенте) реализуется с помощью т.н. "труб" (pipes).
То есть, между сокетом - источником сообщения и сокетом - получателем создаются "трубы" (pipes), по которым "текут" сообщения.

Можно задать "емкость" трубы как на источнике, так и на получателе. То есть, "максимально допустимый уровень воды"(high-water mark - HWM).

Некоторые сокеты (типа PUB, PUSH) имеют только исходящие буферы, для них можно определить HWM на отправление.
Если сокеты принимающие (типа SUB, PULL, REQ, REP), то для них можно определить определить HWM на прием.
Есть сокеты, которые работают в обе стороны (DEALER, ROUTER, PAIR), для этих можно определить оба значения HWM.

Так вот, уровень HWM задается все тем же методом zmq_setsockopt() , с опциями:

ZMQ_SNDHWM : - задать high water mark для исходящих сообщений (для сокета - издателя)
ZMQ_RCVHWM : - задать high water mark для входящих сообщений (для сокета - подписчика)

По умолчанию HWM для обоих типов равен 1000 сообщений. Если буфер заполнен, то, в зависимости от типа сокета, оставшиеся сообщения либо игнорируются, либо выполняется блокировка процесса.
Пишут, что ZeroMQ не гарантирует, что сокет сможет принять столько сообщений, сколько указано при установке ZMQ_SNDHWM, реально граница может быть на уровне 60-70% от заданного HWM. Типа, "рекомендация" для системы.
...
Рейтинг: 0 / 0
Период между сообщениями больше года.
Cервер обработки сообщений
    #39352311
Товарищ младший сержант
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Кошмар, сколько ошибок. Например:
чччД
Код: pascal
1.
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, PChar(cFilter1), SizeOf(cFilter1)); // Настройка сокета


Теперь этот подписчик получит только те сообщения, которые начинаются с 'Temperature'.

Конечно, вместо SizeOf(cFilter1) следует передавать длину строки cFilter1 в байтах... не сглючило, так как все строки были длиной больше 4 байт, а работало правильно - так как строки отличались, начиная с первого символа.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #39463705
Trester789
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Короче понятно, а как клиента отключить если, допустим в связке с ROUTER ? Например, не заплатил мне бабок за погоду...
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #39464180
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Trester789Короче понятно, а как клиента отключить если, допустим в связке с ROUTER ? Например, не заплатил мне бабок за погоду...

Библиотека ZMQ не контролирует подключение клиентов, это не ее уровень. Она обеспечивает коннект при наличии связи и автоматическое восстановление коннекта после сбоев связи.
Ну и занимается передачей/приемом сообщений.
В твоем случае сервер просто может (например) не отвечать на запросы клиентов, которые ему не "нравятся". Или, в соответствии с рекомендациями разработки протокола безопасности http://zmtp.org/ - отправлять "шум".

Вопрос идентификация клиентов можно реализовать самостоятельно, на уровне протокола (почитай рекомендации, ссылка выше).
...
Рейтинг: 0 / 0
38 сообщений из 38, показаны все 2 страниц
Форумы / Delphi [игнор отключен] [закрыт для гостей] / Cервер обработки сообщений
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]