powered by simpleCommunicator - 2.0.61     © 2026 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / Delphi [игнор отключен] [закрыт для гостей] / ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
25 сообщений из 260, страница 6 из 11
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39200580
Фотография defecator
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Модератор форума
В Windows 8.1 "искаропки" интерфейса DirectPlay не оказалось.
А в Windows 7 "искаропки" он ещё есть
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39200600
Фотография Feg16
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
defecatorА в Windows 7 "искаропки" он ещё есть
Код: plaintext
Warning: Microsoft DirectPlay has been deprecated. 
Этого достаточно чтобы сморщив нос закрыть страницу :(
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39200602
Фотография Feg16
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
А то будет потом как с RedAlert - поставьте тот компонент, настройте протокол IPX
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39200603
Фотография defecator
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Модератор форума
Feg16defecatorА в Windows 7 "искаропки" он ещё есть
Код: plaintext
Warning: Microsoft DirectPlay has been deprecated. 
Этого достаточно чтобы сморщив нос закрыть страницу :(
какие тонкие нюхательные ощущения, я поражаюсь.
А с древним кодом нет, не работал никогда ? Тошнило тебя ?
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39200795
Фотография Feg16
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
defecatorА с древним кодом нет, не работал никогда ? Тошнило тебя ?Одно дело пилить на дядю говнокод на практике, а второе заменять один движок на другой. Смысл мне ставить раком проект, если он у пользователей попросту не станет работать?
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39205141
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
ZeroMQ Особенности High-level API CZMQ :

Автоматизация обслуживания сокетов. К примеру, при закрытии контекста все его сокеты обработка будут также закрываться. При этом в некоторых случаях для сокетов можно назначить таймайт.
...

Интересно:

Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
//  Create an attached thread. An attached thread gets a ctx and a PAIR
//  pipe back to its parent. It must monitor its pipe, and exit if the
//  pipe becomes unreadable. Do not destroy the ctx, the thread does this
//  automatically when it ends.
type 
  p_zthread_attached_fn = ^zthread_attached_fn;
  zthread_attached_fn = procedure(args: Pointer; ctx: p_zctx_t; pipe: Pointer); cdecl;
...
interface

  function zthread_fork(ctx: p_zctx_t; thread_fn: p_zthread_attached_fn; args: Pointer):
    Pointer; cdecl; external cZMQ_DllName;



zthread_fork создает и запускает "прикрепленную нить", фактически запуская cdecl - функцию типа zthread_attached_fn и автоматически создавая пару сокетов для общения с нитью, как и сказано в описании выше.

Так вот, контекст ZMQ (параметр ctx: p_zctx_t) не просто передается, а создается его копия в блоке данных нити.
В рамках нового контекста в нити и будет создаваться новый пул сокетов и связанных с ними коннектов, очередей и т.п.
Т.е., по завершении нити произойдет корректная финализация копии контекста и связанных с ним сокетов и проч.
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39331834
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччД, с аутентификацией не разбирался?

В тестах есть пример , но слишком простой.

С сервером понятно, непонятно как узнать клиенту что у него "Invalid username or password"?
На клиенте libzmq.dll получает "400", рвет соединение, больше его не пытается установить, но наружу никак это не сообщает. Снаружи мой код работает как будто все хорошо. zmq_send() отправляет, zmq_recv() висит в ожидании приема.

Нагуглил это , вроде тот же вопрос, но мало что там понял, у меня с английским не очень хорошо.
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39332932
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Может кому пригодиться. Все что нарыл - это включить мониторинг ( zmq_socket_monitor () ) и анализировать что будет после ZMQ_EVENT_DISCONNECTED, если после попытки установить соединение прекратятся, то считаем что сервер прислал команду отключиться.
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39332970
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Dima T,

выстраивая свой протокол, я сделал так, что в момент handshake клиент передает серверу свои текстовые идентификационные данные (имя компа, юзернэйм windows и т.п.) в отдельном фрейме. А после handshake, обменявшись открытыми ключами для шифрования (алг. Диффи-Хеллмана), в целях экономии трафика использую собственный открытый Д-Х ключ заодно и как 8-байтный UID для identity коннекта.
Сервер кэширует UID'ы и хранит их в течении заданного времени (5 сек без активности -> "свободен!).
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39332975
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Т.е., аутентификацией на сервере занимаюсь я сам.
Что удобно и позволяет выстраивать различные, удобные для моей левой пятки схемы.
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39333002
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Схема примерно вот такая.

Клиент получает от сервера сообщения:
- хартбит - посылается периодически, чтобы клиент знал: сервер жив;
- запрос выполнен - успешный ответ сервера на любой запрос клиента (кроме запроса "гуд бай, сервер!").
- запрос об идентификации - ответ сервера на любой запрос, когда сервер не знает о данном identity ничего, клиент должен представиться;

Сервер получает запросы от клиента:
- хартбит - посылается периодически, чтобы сервер знал: клиент жив.
- запрос - посылается клиентом по мере надобности.
- запрос с блоком идентификации (высылается на запрос сервера).
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39333515
kealon(Ruslan)
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччД,

я вот одного не понял, если что бы получить сообщение нужно сперва выделить под него память где тогда асинхронность?
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39333546
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
kealon(Ruslan)чччД,

я вот одного не понял, если что бы получить сообщение нужно сперва выделить под него память где тогда асинхронность?
Нет, конечно.

Не обязательно. Используй zmq_msg_recv() :

длина_сообщения := zmq_msg_recv(сокет, сообщение, флаги);

Если длина_сообщения >= 0, значит, все ОК.
А данные после приема можно получить вот так: zmq_msg_data (fMessage).

А асинхронность - в поллинге: http://delphi-and-zeromq.blogspot.ru/2014/11/05-zeromq-zmqpoll.html
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39333622
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччДТ.е., аутентификацией на сервере занимаюсь я сам.
Что удобно и позволяет выстраивать различные, удобные для моей левой пятки схемы.
Тоже склоняюсь к самодельной аутентификации.
Свой протокол это хорошо, возможно даже лучше для безопасности чем встроенный, т.к. можно всунуть перехват zmq_setsockopt() установки логина/пароля и готово. Или трафик послушать. В PLAIN режиме оно открытым текстом идет.

Единственный плюс встроенной что сервер рвет соединение если зацепились вовсе не не через ZMQ, если встроеная отключена - не рвет, просто не отвечает.
Еще встроенная показывает IP откуда коннект, без нее можно через монитор попробовать сопоставить. Я не определился, нужен ли вообще мне этот IP или без него проживу.

Хочу задействовать новые сокеты ZMQ_CLIENT, ZMQ_SERVER. Управление идентификацией гибче чем с DILLER-ROUTER. И побыстрее должно работать теоретически, т.к. маршрутизация по ID сессии (int32), а не по имени клиента (строка произвольной длины)
Смущает что они подключаются по
Код: plaintext
1.
#define ZMQ_BUILD_DRAFT_API


т.е. вроде как еще в варианте черновика. Есть опыт использования? Не глючат?
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39333648
чччД
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Dima T...
Хочу задействовать новые сокеты ZMQ_CLIENT, ZMQ_SERVER...
[/src]
т.е. вроде как еще в варианте черновика. Есть опыт использования? Не глючат?
Не пользовался пока, чуть-чуть почитал только.
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39333959
kealon(Ruslan)
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччД,

нда, не густо - нулевая обёртка над апи практически
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39333970
kealon(Ruslan)
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
чччД,
всё больше склоняюсь, что вот такой подход более практичен и в плане результата и в плане лёгкости разработки.
ещё бы для асинхронных операций чтения память выделять когда она требуется, а не заранее - тогда наверное можно и с классическим пулингом посостязаться по затратам ресурсов
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39333998
Товарищ младший сержант
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
kealon(Ruslan)чччД,

нда, не густо - нулевая обёртка над апи практически
Где нулевая обертка над апи?
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39334035
kealon(Ruslan)
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Товарищ младший сержантkealon(Ruslan)чччД,

нда, не густо - нулевая обёртка над апи практически
Где нулевая обертка над апи?

чччДА асинхронность - в поллинге: http://delphi-and-zeromq.blogspot.ru/2014/11/05-zeromq-zmqpoll.html
Код: pascal
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
// Процесс обработки для обоих сокетов
  while true do begin
 
    zmq_poll(@fItems[0], Length(fItems), -1);
    if ((fItems[0].revents and ZMQ_POLLIN) <> 0) then begin
      fSize := zmq_recv(fSocketReceiver, @fMsgBuff, SizeOf(fMsgBuff), 0);
      if fSize >= 0 then
        // Выполнение задания
      else
        break;
      if ((fItems[1].revents and ZMQ_POLLIN) <> 0) then begin
        fSize := zmq_recv(fSocketSubscriber, @fMsgBuff, SizeOf(fMsgBuff), 0);
        if fSize >= 0 then
    // Обработка данных о погоде
        else
          break;
      end;
    end;
  end;
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39334087
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
kealon(Ruslan)чччД,
всё больше склоняюсь, что вот такой подход более практичен и в плане результата и в плане лёгкости разработки.
ХЗ что проще. Ниже код сервера с многопоточной обработкой, букав поменьше и нет необходимости синхронизации доступа к очереди на обработку.
kealon(Ruslan)ещё бы для асинхронных операций чтения память выделять когда она требуется, а не заранее - тогда наверное можно и с классическим пулингом посостязаться по затратам ресурсов
zmq_msg_recv() выделяет ровно столько сколько надо по приходу сообщения.

Многопоточный эхо-серверСообщения принимаются и раздаются трем потокам на обработку.
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
#define ZMQ_BUILD_DRAFT_API
#include "zmq.h"

#include <assert.h>
#include <stdlib.h>
#include <thread>

void *ctx = NULL;
void *sock = NULL;

// поток обработки сообщений
void worker(int num) {
	printf("worker %d start\n", num);
	void* queue = zmq_socket(ctx, ZMQ_PULL);	assert(queue);
	int rc = zmq_connect(queue, "inproc://echo-queue"); assert(rc == 0);
	while(1) {
		// получение указателя из очереди
		zmq_msg_t p;
		rc = zmq_msg_init(&p); assert(rc == 0);
		rc = zmq_msg_recv(&p, queue, 0); assert(rc == sizeof(zmq_msg_t*));
		zmq_msg_t* msg = NULL;
		memcpy(&msg, zmq_msg_data(&p), sizeof(zmq_msg_t*));
		rc = zmq_msg_close(&p); assert(rc == 0);
		// обработка сообщения
		//printf("worker %d recv %d bytes from %X\n", num, zmq_msg_size(msg), zmq_msg_routing_id(msg));
		rc = zmq_msg_send(msg, sock, 0); assert(rc != -1); // отправка msg и освобождение памяти
		free(msg);
	}
	zmq_close(queue);
}

int main (void) {
	ctx = zmq_ctx_new (); assert (ctx);
	// сокет для сообщений от клиентов
	sock = zmq_socket(ctx, ZMQ_SERVER);	assert(sock);
	int rc = zmq_bind(sock, "tcp://*:12345"); assert(rc == 0); 
	// сокет для очереди обработки
	void* queue = zmq_socket(ctx, ZMQ_PUSH);	assert(queue);
	rc = zmq_bind(queue, "inproc://echo-queue"); assert(rc == 0);
	// Запуск потоков обработки
	std::thread w1(worker, 1), w2(worker, 2), w3(worker, 3);
	// прием сообщений от клиентов
	while (1) {
		zmq_msg_t* msg = (zmq_msg_t*)malloc(sizeof(zmq_msg_t)); assert(msg); // инициализация пустого сообщения
		rc = zmq_msg_init(msg); assert(rc == 0);// инициализация пустого сообщения
		rc = zmq_msg_recv(msg, sock, 0); assert(rc > 0); // ожидание приема, прием с выделением памяти
		// Постановка в очередь на обработку
		zmq_msg_t p;
		rc = zmq_msg_init_size(&p, sizeof(zmq_msg_t*)); assert(rc == 0);
		memcpy(zmq_msg_data(&p), &msg, sizeof(zmq_msg_t*));
		rc = zmq_msg_send(&p, queue, 0); assert(rc == sizeof(zmq_msg_t*));
	}
	zmq_close(sock);
	zmq_ctx_destroy(ctx);
	return 0;
}


Для переделки в полноценный сервер просто дописать поток обработки worker().

Провел небольшой тест на скорость. Клиент открывает 1000 соединений к серверу и в каждое шлет 100 запросов.
Соединение через 127.0.0.1. Проц i7 4 ядра. Win10
СоединенийВремя мсСкорость запрос/секПамять сервера Мб10004 82820 70038 Мб300023 57012 700122 Мбв состоянии простоя (без соединений) память, используемая сервером, 5-6 Мб.
код клиента
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
#define COUNT 1000 // предел 1023, для 3000 запускал три client.exe
int main (void)
{
	printf("Connecting to ECHO server\n");
	void *ctx = zmq_ctx_new(); assert(ctx);

	void *sock[COUNT];
	int rc;
	for(int i = 0; i < COUNT; i++) {
		sock[i] = zmq_socket(ctx, ZMQ_CLIENT); assert(sock[i]);
		rc = zmq_connect(sock[i], "tcp://127.0.0.1:12345"); assert(rc == 0);
	}


	int t = GetTickCount();

	for (int i = 0; i < 100; i++) {
		for(int j = 0; j < COUNT; j++) {
			zmq_msg_t msg;
			rc = zmq_msg_init_size(&msg, 5); assert(rc == 0);
			memcpy(zmq_msg_data(&msg), "Hello", 5);
			rc = zmq_msg_send(&msg, sock[j], 0); assert(rc == 5);
		}
		for (int j = 0; j < COUNT; j++) {
			zmq_msg_t msg;
			rc = zmq_msg_init(&msg); assert(rc == 0);
			rc = zmq_msg_recv(&msg, sock[j], 0); assert(rc == 5);
		}
	}
	printf("Time: %d\n", GetTickCount() - t);
	zmq_close(sock);
	zmq_ctx_destroy(ctx);
	return 0;
}

...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39334138
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Перемудрил я в коде сервера с указателями и похоже утечка памяти есть.
Надо передавать содержимое zmq_msg_t, это как понимаю структура-заголовок (64 байта) где все ссылки на данные сообщения.
Переделал, немного побыстрее стало. Сервер при простое занимает 2-3 Мб памяти.
исходник
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
#define ZMQ_BUILD_DRAFT_API
#include "zmq.h"

#include <assert.h>
#include <stdlib.h>
#include <thread>

void *ctx = NULL;
void *sock = NULL;

// рабочий поток обработки сообщений
void worker(int num) {
	printf("worker %d start\n", num);
	void* queue = zmq_socket(ctx, ZMQ_PULL);	assert(queue);
	int rc = zmq_connect(queue, "inproc://echo-queue"); assert(rc == 0);
	while(1) {
		// получение указателя из очереди
		zmq_msg_t p;
		rc = zmq_msg_init(&p); assert(rc == 0);
		rc = zmq_msg_recv(&p, queue, 0); assert(rc == sizeof(zmq_msg_t));
		zmq_msg_t* msg = (zmq_msg_t*)zmq_msg_data(&p);
		// обработка сообщения
		//printf("worker %d recv %d bytes from %X\n", num, zmq_msg_size(msg), zmq_msg_routing_id(msg));
		rc = zmq_msg_send(msg, sock, 0); assert(rc != -1); // отправка msg и освобождение памяти
		rc = zmq_msg_close(&p); assert(rc == 0); // освобождение памяти сообщения с указателем
	}
	zmq_close(queue);
}

int main (void) {
	ctx = zmq_ctx_new (); assert (ctx);
	// сокет для сообщений от клиентов
	sock = zmq_socket(ctx, ZMQ_SERVER);	assert(sock);
	int rc = zmq_bind(sock, "tcp://*:12345"); assert(rc == 0); 
	// сокет для очереди обработки
	void* queue = zmq_socket(ctx, ZMQ_PUSH);	assert(queue);
	rc = zmq_bind(queue, "inproc://echo-queue"); assert(rc == 0);
	// Запуск потоков обработки
	std::thread w1(worker, 1), w2(worker, 2), w3(worker, 3);
	// прием сообщений от клиентов
	while (1) {
		zmq_msg_t msg;
		rc = zmq_msg_init(&msg); assert(rc == 0);// инициализация пустого сообщения
		rc = zmq_msg_recv(&msg, sock, 0); assert(rc > 0); // ожидание приема, прием с выделением памяти
		// Постановка в очередь на обработку
		zmq_msg_t p;
		rc = zmq_msg_init_size(&p, sizeof(zmq_msg_t)); assert(rc == 0);
		memcpy(zmq_msg_data(&p), &msg, sizeof(zmq_msg_t));
		rc = zmq_msg_send(&p, queue, 0); assert(rc == sizeof(zmq_msg_t));
	}
	zmq_close(sock);
	zmq_ctx_destroy(ctx);
	return 0;
}

...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39334777
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
kealon(Ruslan)я вот одного не понял, если что бы получить сообщение нужно сперва выделить под него память где тогда асинхронность?
ИМХУ ты тут теплое с мягким в кучу замешал.
Асинхронность это алгоритмический подход, несинхронно, т.е. не ждать выполнения каждого шага, т.е. дал команду сделать то-то до тогда-то и забыл. Получил ответ "то-то сделано", запустил следующее, истек таймаут "тогда-то наступило и ничего не сделано", повторил команду "сделать то-то" или еще какие действия. А когда выделилась память - пофиг. Мысли ширше или ширее )))
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39334842
kealon(Ruslan)
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Dima Tkealon(Ruslan)я вот одного не понял, если что бы получить сообщение нужно сперва выделить под него память где тогда асинхронность?
ИМХУ ты тут теплое с мягким в кучу замешал.
Асинхронность это алгоритмический подход, несинхронно, т.е. не ждать выполнения каждого шага, т.е. дал команду сделать то-то до тогда-то и забыл. Получил ответ "то-то сделано", запустил следующее, истек таймаут "тогда-то наступило и ничего не сделано", повторил команду "сделать то-то" или еще какие действия. А когда выделилась память - пофиг. Мысли ширше или ширее )))
ага, коревато, но ты же понял, что я хотел спросить :-)

Dima Tkealon(Ruslan)чччД,
всё больше склоняюсь, что вот такой подход более практичен и в плане результата и в плане лёгкости разработки.
ХЗ что проще. Ниже код сервера с многопоточной обработкой, букав поменьше и нет необходимости синхронизации доступа к очереди на обработку.
kealon(Ruslan)ещё бы для асинхронных операций чтения память выделять когда она требуется, а не заранее - тогда наверное можно и с классическим пулингом посостязаться по затратам ресурсов
zmq_msg_recv() выделяет ровно столько сколько надо по приходу сообщения.

пример зачётный, именно то, что хотел
к твоему примеру очень неплохо должно ложиться, то что описано в статье
PS: мда, что-то слишком много вокруг этих корутин последнее время крутится, случайности неслучайны...
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39334926
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
kealon(Ruslan)PS: мда, что-то слишком много вокруг этих корутин последнее время крутится, случайности неслучайны...
ИМХУ это просто удобный синтаксис. Адаптированный к асинхронным алгоритмам.
Нынче повышение производительности железа де факто остановилось для синхронных алгоритмов, т.е. производительность одного ядра не меняется, а дополнительные ядра пользы не дают. Поэтому все направились на смену подходов, чтобы хоть как-то эти ядра хоть чем-то загрузить. Плюс сетевого взаимодействия стало больше.

А дальше возникает проблема удобства разработки и отладки, т.к. асинхронные алгоритмы в разы сложнее чем синхронные аналоги.
Например в C# много в эту сторону наработано: await/async, встроенный пул потоков, асинхронная модель на основе задач (TAP) и т.д. Все это создано в т.ч. чтобы убрать асинхронные сложности от глаз разработчика.
Там где подобного нет: один из вариантов - очереди сообщений, ZeroMQ тут очень даже в тему, особенно если требуется работа по сети.

PS Во втором варианте с копированием zmq_msg_t я тоже накосячил. Разработчики не советуют туда лезть напрямую.
Во всех описаниях функций zmq_msg_*() пишутNever access zmq_msg_t members directly, instead always use the zmq_msg family of functions.

Поправил так
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
#define ZMQ_BUILD_DRAFT_API
#include "zmq.h"

#include <assert.h>
#include <stdlib.h>
#include <thread>

void *ctx = NULL;
void *sock = NULL;

// рабочий поток обработки сообщений
void worker(int num) {
	printf("worker %d start\n", num);
	void* queue = zmq_socket(ctx, ZMQ_PULL);	assert(queue);
	int rc = zmq_connect(queue, "inproc://echo-queue"); assert(rc == 0);
	while(1) {
		// получение указателя из очереди
		zmq_msg_t p;
		rc = zmq_msg_init(&p); assert(rc == 0);
		rc = zmq_msg_recv(&p, queue, 0); assert(rc == sizeof(zmq_msg_t));
		zmq_msg_t* msg = (zmq_msg_t*)zmq_msg_data(&p);
		// обработка сообщения
		//printf("worker %d recv %d bytes from %X\n", num, zmq_msg_size(msg), zmq_msg_routing_id(msg));
		rc = zmq_msg_send(msg, sock, 0); assert(rc != -1); // отправка msg и освобождение памяти
		rc = zmq_msg_close(&p); assert(rc == 0); // освобождение памяти сообщения с указателем
	}
	zmq_close(queue);
}

int main (void) {
	ctx = zmq_ctx_new (); assert (ctx);
	// сокет для сообщений от клиентов
	sock = zmq_socket(ctx, ZMQ_SERVER);	assert(sock);
	int rc = zmq_bind(sock, "tcp://*:12345"); assert(rc == 0); 
	// сокет для очереди обработки
	void* queue = zmq_socket(ctx, ZMQ_PUSH);	assert(queue);
	rc = zmq_bind(queue, "inproc://echo-queue"); assert(rc == 0);
	// Запуск потоков обработки
	std::thread w1(worker, 1), w2(worker, 2), w3(worker, 3);
	// прием сообщений от клиентов
	while (1) {
		// Сообщение для отправки в очередь
		zmq_msg_t p;
		rc = zmq_msg_init_size(&p, sizeof(zmq_msg_t)); assert(rc == 0);
		zmq_msg_t* msg = (zmq_msg_t*)zmq_msg_data(&p);

		// Ожидание сообщений от клиентов
		rc = zmq_msg_init(msg); assert(rc == 0);// инициализация пустого сообщения
		rc = zmq_msg_recv(msg, sock, 0); assert(rc > 0); // ожидание приема, прием с выделением памяти

		// Постановка в очередь на обработку
		rc = zmq_msg_send(&p, queue, 0); assert(rc == sizeof(zmq_msg_t));
	}
	zmq_close(sock);
	zmq_ctx_destroy(ctx);
	return 0;
}
...
Рейтинг: 0 / 0
ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
    #39335878
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Поизучал что реально происходит в inproc - пришел к выводу что не надо никаких указателей для inproc соединений, там и так указатели передаются.
Точнее если сообщение маленькое (5 байт например), то само сообщение, если большое (100 байт пробовал), то указатель, а содержимое сообщения остается в памяти там же где и было. Т.е. ZMQ сама отлично решает как быстрее доставить.

Итого: сообщение можно принимать с TCP соединения и сразу отправлять в inproc очередь на обработку без какого-либо шаманства.
Код сервера стал меньше, понятнее и быстрее на 5%
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
#define ZMQ_BUILD_DRAFT_API
#include "zmq.h"

#include <assert.h>
#include <stdlib.h>
#include <thread>

void *ctx = NULL;
void *sock = NULL;

// поток обработки сообщений
void worker(int num) {
	printf("worker %d start\n", num);
	void* queue = zmq_socket(ctx, ZMQ_PULL); assert(queue);
	int rc = zmq_connect(queue, "inproc://echo-queue"); assert(rc == 0);

	while (1) {
		// получение сообщения из очереди
		zmq_msg_t msg;
		rc = zmq_msg_init(&msg); assert(rc == 0);
		rc = zmq_msg_recv(&msg, queue, 0); assert(rc > 0);
		// обработка сообщения
		printf("worker %d recv %d bytes at %X from %X\n", num, zmq_msg_size(&msg), zmq_msg_data(&msg), zmq_msg_routing_id(&msg));
		rc = zmq_msg_send(&msg, sock, 0); assert(rc != -1); // отправка msg и освобождение памяти
	}
	zmq_close(queue);
}

int main (void) {
	ctx = zmq_ctx_new (); assert (ctx);
	// сокет для сообщений от клиентов
	sock = zmq_socket(ctx, ZMQ_SERVER); assert(sock);
	int rc = zmq_bind(sock, "tcp://*:12345"); assert(rc == 0); 
	// сокет для очереди обработки
	void* queue = zmq_socket(ctx, ZMQ_PUSH);	assert(queue);
	rc = zmq_bind(queue, "inproc://echo-queue"); assert(rc == 0);
	// запуск потоков обработки
	std::thread w1(worker, 1), w2(worker, 2), w3(worker, 3);
	// прием сообщений от клиентов
	while (1) {
		// Ожидание сообщений от клиентов
		zmq_msg_t msg;
		rc = zmq_msg_init(&msg); assert(rc == 0);// инициализация пустого сообщения
		rc = zmq_msg_recv(&msg, sock, 0); assert(rc > 0); // ожидание приема, прием с выделением памяти
		printf("recv %d bytes at %X from %X\n", zmq_msg_size(&msg), zmq_msg_data(&msg), zmq_msg_routing_id(&msg));
		// Постановка в очередь на обработку
		rc = zmq_msg_send(&msg, queue, 0); assert(rc > 0);
	}
	zmq_close(sock);
	zmq_ctx_destroy(ctx);
	return 0;
}

...
Рейтинг: 0 / 0
25 сообщений из 260, страница 6 из 11
Форумы / Delphi [игнор отключен] [закрыт для гостей] / ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]