powered by simpleCommunicator - 2.0.61     © 2026 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / Delphi [игнор отключен] [закрыт для гостей] / Cервер обработки сообщений
13 сообщений из 38, страница 2 из 2
Cервер обработки сообщений
    #38749894
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччД...

Бывает, что нужно по-другому. Сервер должен оповестить клиентов о каком-то событии.
Клиенты отправляют серверу заявку, что они готовы получать сообщения о наступившем событии ("подписываются" на событие).
Одних клиентов могут интересовать одни события, других - другие.
А сервера вообще не волнует, кому из клиентов что нужно. Сервер - это как бы радиоприемник, вещающий в эфир. Кто слушает - молодец. А кто не слушает - тот ССЗБ.

Такая схема называется "Издатель - Подписчик" (Publisher-Subscriber).

Пример.

Автоматическая метеостанция измеряет температуру, атмосферное давление и скорость ветра. Результаты измерений время от времени (например, после завершения цикла измрений) передаются "всем заинтересованным лицам".
Кого-то интересует всё, кому-то нужна температура, кого-то волнует только скорость ветра.

Пишем код метеостанции (), то есть, сервер-издатель :

Сервер.
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
program PS_Server;
{$APPTYPE CONSOLE}
uses
  SysUtils, ZMQ;

var
  fContext: Pointer;
  fSocket: Pointer;
  fDummy: string;
  fMsgStr: string;
  fMessage: zmq_msg_t;

begin
  fContext := zmq_ctx_new();
  fSocket := zmq_socket(fContext, ZMQ_PUB);
  zmq_bind(fSocket, 'tcp://*:4040');
  Writeln('Publisher started...');
  Randomize;

  while True do begin
    Sleep(100); // Типа измеряет что-то
    // Температура
    fMsgStr := Format('Temperature : %d C', [20 - Random(40)]);
    zmq_msg_init(fMessage); // Инициализация zmq_msg_t
    zmq_msg_init_size(fMessage, Length(fMsgStr) * SizeOf(Char)); // Резервирование памяти
    // Копирование данных из строки в буфер сообщения:
    move(PChar(fMsgStr)^, zmq_msg_data(fMessage)^, Length(fMsgStr) * SizeOf(Char));
    zmq_msg_send(fMessage, fSocket, 0); // Пересылка
    zmq_msg_close(fMessage); // Всё

    // Атм. давление
    fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]);
    zmq_msg_init(fMessage);
    zmq_msg_init_size(fMessage, Length(fMsgStr) * SizeOf(Char));
    move(PChar(fMsgStr)^, zmq_msg_data(fMessage)^, Length(fMsgStr) * SizeOf(Char));
    zmq_sendmsg(fSocket, fMessage, 0);
    zmq_msg_close(fMessage);

    // Скорость ветра
    fMsgStr := Format('Wind : %d m/s', [Random(10)]);
    zmq_send(fSocket, PChar(fMsgStr)^, Length(fMsgStr) * SizeOf(Char), 0);
  end;
  zmq_ctx_destroy(fContext);
  Readln(fDummy);
end.



Инициализация: все то же самое, что и для работы по шаблону "Запрос - Ответ".
За исключением того сокет создается с опцией ZMQ_PUB (сокет - издатель):

Код: pascal
1.
zmq_socket(fContext, ZMQ_PUB);



Рабочий цикл: сервер только отправляет сообщения подписчикам (методы zmq_msg_send, zmq_sendmsg, zmq_send).
Попытка получить данные выбросит исключение.
....
В каждом рабочем цикле отправляется три сообщения (вернее, "публикуется"): о температуре, о давлении и о скорости ветра.
Для разнообразия показаны разные способы формирования сообщений.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38749895
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
При публикации температуры и давления используется структура типа zmq_msg_t:

Код: pascal
1.
fMessage: zmq_msg_t;


Служебная структура, ничего особенного в ней нет, просто массив из 32 байт.

Код: pascal
1.
2.
3.
4.
type
  zmq_msg_t = record
    _: Array[0..32-1] of Byte;
  end;



Чтобы переслать данные, нужно попросить библиотеку ZMQ инициализировать область памяти нужного размера. А затем заполнить этк область:

Код: pascal
1.
2.
3.
4.
    zmq_msg_init(fMessage); // Инициализация zmq_msg_t
    zmq_msg_init_size(fMessage, Length(fMsgStr) * SizeOf(Char)); // Резервирование памяти
    // Копирование данных из строки в буфер сообщения:
    move(PChar(fMsgStr)^, zmq_msg_data(fMessage)^, Length(fMsgStr) * SizeOf(Char));


А затем - отправить сообщение, используя метод zmq_msg_send или zmq_sendmsg:

Код: pascal
1.
    zmq_msg_send(fMessage, fSocket, 0); // Пересылка

или
Код: pascal
1.
    zmq_sendmsg(fSocket, fMessage, 0);


...
В нашем случае мы уже имеем готовый буфер с данными - строку fMsgStr:
Код: pascal
1.
2.
    // Атм. давление
    fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]);


Можно не заморачиваться с zmq_msg_t и сразу вызвать метод zmq_send:

Код: pascal
1.
    zmq_send(fSocket, PChar(fMsgStr)^, Length(fMsgStr) * SizeOf(Char), 0);
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38749897
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
То есть, сервер - издатель получается еще короче:

Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
program PS_Server;
{$APPTYPE CONSOLE}
uses
  SysUtils, ZMQ;

var
  fContext: Pointer;
  fSocket: Pointer;
  fDummy: string;
  fMsgStr: string;

begin
  fContext := zmq_ctx_new();
  fSocket := zmq_socket(fContext, ZMQ_PUB);
  zmq_bind(fSocket, 'tcp://*:4040');
  Writeln('Publisher started...');
  Randomize;

  while True do begin
    Sleep(100); // Типа измеряет что-то
    // Температура
    fMsgStr := Format('Temperature : %d C', [20 - Random(40)]);
    zmq_send(fSocket, PChar(fMsgStr)^, Length(fMsgStr) * SizeOf(Char), 0);

    // Атм. давление
    fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]);
    zmq_send(fSocket, PChar(fMsgStr)^, Length(fMsgStr) * SizeOf(Char), 0);

    // Скорость ветра
    fMsgStr := Format('Wind : %d m/s', [Random(10)]);
    zmq_send(fSocket, PChar(fMsgStr)^, Length(fMsgStr) * SizeOf(Char), 0);
  end;
  zmq_ctx_destroy(fContext);
  Readln(fDummy);
end.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38749898
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Еще раз: сервер - издатель публикует данные асинхронно, ему в общем случае начхать на клиентов - подписчиков.

То есть, если клиента запустить после запуска сервера, то все , что было опубликовано ранее, теряется.
Поэтому, вероятно, имеет смысл запускать сначала подписчиков, а потом уже сервер - издатель.
...
~~~~~~~~~~~~~~~~~~~~

Теперь разберёмся с клиентами (подписчиками).
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38749900
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Клиент.

Клиент-подписчик получает извещения, читает данные и показывает их с помощью Writeln().

Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
program PS_Client;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ;

var
  fContext: Pointer;
  fSocket: Pointer;
  fMessage: zmq_msg_t;
  fDummy: string;
  fMsgStr: string;
  fLen: Integer;

begin
  fContext := zmq_ctx_new(); // Инициализация
  fSocket := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocket, 'tcp://localhost:4040');
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nil, 0); // Настройка сокета

  Writeln('Subscriber started...');

  while True do begin
    zmq_msg_init(fMessage);
    fLen :=  zmq_msg_recv(fMessage, fSocket, 0); // Прием данных
    SetLength(fMsgStr, fLen div SizeOf(Char)); // Формирование буфера строки
    Move(zmq_msg_data(fMessage)^, PChar(fMsgStr)^, fLen); // Копирвоание данных
    Writeln(fMsgStr);
    zmq_msg_close(fMessage);
  end;
  zmq_ctx_destroy(fContext);
  Readln(fDummy);
end.



Инициализация: - все почти так же, как и для шаблона "Запрос - Ответ". Однако, сокет создается с опцией ZMQ_SUB, а после вызова zmq_connect() дополнительно вызывается метод zmq_setsockopt() с опцией ZMQ_SUBSCRIBE:

Код: pascal
1.
2.
3.
4.
  fContext := zmq_ctx_new();
  fSocket := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocket, 'tcp://localhost:4040');
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nil, 0);



Рабочий цикл. В цикле выполняется синхронное чтение сообщения из сокета и вывод его с помощью Writeln().
Так как сообщения могут быть разными и разной длины, мы используем метод zmq_msg_recv(), который работает со структурой zmq_msg_t.
При этом сообщение считывается полностью в зарезервированную системой область. Метод zmq_msg_recv() возвращает количество считанных байт. Кроме того, количество принятых байт можно узнать, используй метод
Код: pascal
1.
fLen := zmq_msg_size(fMessage)

.
Сами данные находятся в буфере, адрес которого можно узнать с помощью метода zmq_msg_data().

Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
  while True do begin
    zmq_msg_init(fMessage);
    fLen :=  zmq_msg_recv(fMessage, fSocket, 0); // Прием данных
    SetLength(fMsgStr, fLen div SizeOf(Char)); // Формирование буфера строки
    Move(zmq_msg_data(fMessage)^, PChar(fMsgStr)^, fLen); // Копирвоание данных
    Writeln(fMsgStr);
    zmq_msg_close(fMessage);
  end;



Все, клиент готов.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38749901
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Если запустить сервер - издатель и несколько клиентов - подписчиков, увидим, что все "гладко и сладко".
Вот они, красавчики:
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38749902
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Эту тупые клиенты-подписчики реагируют на все публикации.

А хорошо бы, одни чтобы реагировали на температуру, другие - на ветер, третьи - на что-нибудь еще.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38750505
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччД...

А хорошо бы, одни чтобы реагировали на температуру, другие - на ветер, третьи - на что-нибудь еще.

Как выяснилось, это сделать несложно. Клиент, настраивая сокет - подписчик, должен указать в параметрах строку фильтра.

Вместо:
Код: pascal
1.
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nil, 0); // Настройка сокета 



- следует указать, например 'Temperature':

Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
const
  cFilter1 = 'Temperature';
  cFilter2 = 'Pressure';
  cFilter3 = 'Wind';
begin
  fContext := zmq_ctx_new(); // Инициализация
  fSocket := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocket, 'tcp://localhost:4040');
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, PChar(cFilter1), SizeOf(cFilter1)); // Настройка сокета


Теперь этот подписчик получит только те сообщения, которые начинаются с 'Temperature'.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38750509
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Фильтров может быть добавлено несколько. В этом случае клиент получит только те сообщения, которые соответствуют любому из фильтров:
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
const
  cFilter1 = 'Temperature';
  cFilter2 = 'Pressure';
  cFilter3 = 'Wind';
begin
  fContext := zmq_ctx_new(); // Инициализация
  fSocket := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocket, 'tcp://localhost:4040');
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, PChar(cFilter1), SizeOf(cFilter1)); // Настройка сокета
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, PChar(cFilter2), SizeOf(cFilter2)); // Настройка сокета



Таким образом, получаем сообщения только о температура и о давлении:
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #38750566
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Замечания.

1. Сообщения фильтруются клиентом - подписчиком, не сервером-издателем. Так написано. То есть, клиент на транспортном уровне вроде бы получает все поступающие сообщения. Как мне кажется, это должно здорово снижать производительность системы в случае, когда сообщения длинные и поступают часто. Однако, тесты "на коленке" (много разных "длинных" сообщений + фильтр) показали, что это не так. Возможно, при включенной фильтрации клиент получает не сообщение целиком, а только его начальную часть, для сравнения с фильтром. Если сообщение "не годится", оставшаяся часть не принимается.
Возможно.

2. Сервер публикует сообщения когда нравится (т.е., асинхронно с клиентом), а клиент-подписчик читает их по готовности, начиная с момента коннекта к серверу-издателю. Таким образом, если клиент обрабатывает сообщения слишком медленно, очередь сообщений может стать слишком большой, и может случиться беда. ZeroMQ обрабатывает данную ситуацию в зависимости от того, как вы настроите сокет-подписчик и сокет-издатель.


Каждое соединение между исходящим сокетом (на сервере) и входящем (на клиенте) реализуется с помощью т.н. "труб" (pipes).
То есть, между сокетом - источником сообщения и сокетом - получателем создаются "трубы" (pipes), по которым "текут" сообщения.

Можно задать "емкость" трубы как на источнике, так и на получателе. То есть, "максимально допустимый уровень воды"(high-water mark - HWM).

Некоторые сокеты (типа PUB, PUSH) имеют только исходящие буферы, для них можно определить HWM на отправление.
Если сокеты принимающие (типа SUB, PULL, REQ, REP), то для них можно определить определить HWM на прием.
Есть сокеты, которые работают в обе стороны (DEALER, ROUTER, PAIR), для этих можно определить оба значения HWM.

Так вот, уровень HWM задается все тем же методом zmq_setsockopt() , с опциями:

ZMQ_SNDHWM : - задать high water mark для исходящих сообщений (для сокета - издателя)
ZMQ_RCVHWM : - задать high water mark для входящих сообщений (для сокета - подписчика)

По умолчанию HWM для обоих типов равен 1000 сообщений. Если буфер заполнен, то, в зависимости от типа сокета, оставшиеся сообщения либо игнорируются, либо выполняется блокировка процесса.
Пишут, что ZeroMQ не гарантирует, что сокет сможет принять столько сообщений, сколько указано при установке ZMQ_SNDHWM, реально граница может быть на уровне 60-70% от заданного HWM. Типа, "рекомендация" для системы.
...
Рейтинг: 0 / 0
Период между сообщениями больше года.
Cервер обработки сообщений
    #39352311
Товарищ младший сержант
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Кошмар, сколько ошибок. Например:
чччД
Код: pascal
1.
  zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, PChar(cFilter1), SizeOf(cFilter1)); // Настройка сокета


Теперь этот подписчик получит только те сообщения, которые начинаются с 'Temperature'.

Конечно, вместо SizeOf(cFilter1) следует передавать длину строки cFilter1 в байтах... не сглючило, так как все строки были длиной больше 4 байт, а работало правильно - так как строки отличались, начиная с первого символа.
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #39463705
Trester789
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Короче понятно, а как клиента отключить если, допустим в связке с ROUTER ? Например, не заплатил мне бабок за погоду...
...
Рейтинг: 0 / 0
Cервер обработки сообщений
    #39464180
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Trester789Короче понятно, а как клиента отключить если, допустим в связке с ROUTER ? Например, не заплатил мне бабок за погоду...

Библиотека ZMQ не контролирует подключение клиентов, это не ее уровень. Она обеспечивает коннект при наличии связи и автоматическое восстановление коннекта после сбоев связи.
Ну и занимается передачей/приемом сообщений.
В твоем случае сервер просто может (например) не отвечать на запросы клиентов, которые ему не "нравятся". Или, в соответствии с рекомендациями разработки протокола безопасности http://zmtp.org/ - отправлять "шум".

Вопрос идентификация клиентов можно реализовать самостоятельно, на уровне протокола (почитай рекомендации, ссылка выше).
...
Рейтинг: 0 / 0
13 сообщений из 38, страница 2 из 2
Форумы / Delphi [игнор отключен] [закрыт для гостей] / Cервер обработки сообщений
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]