powered by simpleCommunicator - 2.0.59     © 2025 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / C++ [игнор отключен] [закрыт для гостей] / Распараллеливание без покобезопасности. Реальное использование теории акторов.
1 сообщений из 1, страница 1 из 1
Распараллеливание без покобезопасности. Реальное использование теории акторов.
    #39420456
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Написал микро библиотеку lite_thread.h для распараллеливания. Внутри все: документация и исходники. Написана на С++11, кроссплатформенная.

Основной принцип: создание сообщения и отправка его обработчику. Дальше библиотека сама создает поток и запускает обработчик.

Пример простейшего использования
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
#include "../lite_thread.h"

void actor1(lite_msg_t* msg, void* env) {// Обработчик сообщения
	uint32_t* x = lite_msg_data<uint32_t>(msg); // Указатель на содержимое сообщения
	assert(x != NULL); // NULL при sizeof(uint32_t) != msg->size()
	work(*x); // Обработка содержимого сообщения
}

void main() {
	// Отправка 10 сообщений
	for (int i = 100; i < 110; i++) {
		lite_msg_t* msg = lite_msg_create<uint32_t>(); // Создание сообщения
		uint32_t* x = lite_msg_data<uint32_t>(msg); // Указатель на содержимое сообщения
		*x = i; // Заполнение сообщения
		lite_thread_run(msg, actor1); // Отправка msg в actor1()
	}
	// Ожидание завершения работы
	lite_thread_end();
}



actor1() запускается в отдельном потоке. Думаю первая мысль возникнет: "зачем переизобретать std::thread ?"
Главное тут в том что actor1() не должен быть потокобезопасным в отличии от std::thread, т.е. дополнительный поток будет всего 1 и там последовательно будут обработаны все сообщения. Т.е. один поток вызовет 10 раз actor1().
Поэтому не надо никаких синхронизаций внутри actor1(), ни надо threadsafe контейнеров, которые в разы тормознее потоконебезопасных аналогов (std::vector, std::map и т.д.)

Если actor1() потокобезопасен то ему можно глубину распараллеливания указать, т.е. в скольки потоках одновременно его можно запускать:
Код: plaintext
1.
lite_actor_parallel(3, actor1); // Глубина распараллеливания 3 потока


Можно задать ограничение группе акторов на использование конкретного ресурса, например сколько потоков можно занимать одновременно.

Сборкой мусора занимается библиотека, т.е. сообщения явно удалять не надо. Память выделяется и освобождается библиотекой.
Внутри передаются указатели, т.е. само сообщение никуда не копируется. Библиотека гарантирует что в один момент времени сообщение обрабатывается только в одном потоке.
Отсюда ограничения по использованию сообщений:
- нельзя отправлять одно сообщение дважды;
- нельзя читать/писать сообщение после отправки, т.к. оно может быть уже в обработке или удалено.

Немного пописав функции понял что очень не удобно окружение запихивать внутрь, поэтому добавил базовый класс обертку lite_worker_t, надо просто унаследоваться от него и прописать свой метод recv()
Код: plaintext
1.
2.
3.
4.
class my_worker_t : public lite_worker_t {
	void recv(lite_msg_t* msg) override {
	}
}


Потом выяснилось что акторы-объекты надо где-то хранить и как-то их находить, поэтому объекты хранит библиотека, удаляет в конце работы, объектам присваиваются имена и есть поиск по имени.
Так же в lite_worker_t добавлено полноценное ограничение обрабатываемых типов сообщений, проверка по typeid(T).hash_code(). В lite_msg_data<T>() это не проверяется, т.к. typeid() заметно подтормаживает.

По скорости работы результаты такие: strеss_test (тест для отладки, написал для отлова ошибок) передает 35+ млн. сообшений в секунду при ограничении 8 потоков на i7-3770К 3.5 ГГц (4 ядра + HT)

Собственно этот простейший пример главное ради чего я ее написал. Чтобы именно так и использовать. Давно хотел что-то подобное написать, спасибо mayton`у за топик про модель акторов . Натолкнул на мысль. Подобрал теоретическую базу под мой велосипед.

Модель акторов интересная штука, простая в понимании, но сложная в применении. Чтобы что-то сделать надо сначала научиться строить алгоритмы акторами.

Как я понял теорию: актор это атомарная часть бизнес-логики, т.е. разбиваем общий алгоритм на атомарные части и запускаем параллельно то что можно делать параллельно.

Пример распараллеливания реального алгоритма на основе местного теста скорости ЯП . Для тех кто тот топик не читал суть задачи: сгенерить картинку img.ppm размером 512*512.
Целиком исходник тут card_raytracer.cpp

Исходный однопоточный алгоритм был такой
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
void original() {
	FILE *out = fopen(FILE_NAME, "w");
	assert(out != NULL);
	fprintf(out, "P6 %d %d 255 ", WIDTH, HEIGHT);
	Vector g = !Vector(-6, -16, 0);
	Vector a = !(Vector(0, 0, 1) ^ g) * .002;
	Vector b = !(g ^ a) * .002;
	Vector c = (a + b) * -256 + g;
	for (int y = HEIGHT; y--;) {
		for (int x = WIDTH; x--;) {
			Vector p(13, 13, 13);
			for (int r = 64; r--;) {
				Vector t = a * (Random() - .5) * 99 + b * (Random() - .5) * 99;
				p = sampler(Vector(17, 16, 8) + t, !(t * -1 + (a * (Random() + x) + b * (y + Random()) + c) * 16)) * 3.5 + p;
			}
			p.print(out);
		}
	}
	fclose(out);
}


Т.е. создает файл, в двух вложенных циклах (высота, ширина) последовательно обсчитывает каждый пиксель изображения и сохраняет в файл.

По модели акторов создается три актора:
1. Считатель. Обсчитывает один пиксель. Код Считателя потокобезопасный, т.к. не имеет меняющегося окружения, поэтому его можно запускать параллельно.
2. Упорядочиватель. Однопоточный. Восстанавливает порядок следования сообщений.
3. Писатель. Однопоточный. Пишет результат в файл.

В начале работы (actor_start(int threads)) создается 260 тыс. сообщений, заданий на обсчет каждой точки и отправляются Считателю. По окончанию обсчета точки Считатель отправляет результат Упорядочивателю, который кэширует пришедшие не по порядку сообщения и отправляет Писателю сообщения в соответствии с изначальным порядком.
Код на акторах
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
struct msg_t {
	size_t idx;
	int x;
	int y;
	Vector result;
};

// Писатель (однопоточный)
class alignas(64) writer_t : public lite_worker_t {
	FILE *out;
public:
	writer_t() {
		type_add(lite_msg_type<msg_t>()); // Разрешение принимать сообщение типа msg_t
		out = fopen(FILE_NAME, "w");
		assert(out != NULL);
		fprintf(out, "P6 %d %d 255 ", WIDTH, HEIGHT);
	}

	~writer_t() {
		fclose(out);
	}

	// Обработка сообщения
	void recv(lite_msg_t* msg) {
		msg_t* d = lite_msg_data<msg_t>(msg);
		assert(d != NULL);
		d->result.print(out);
	}
};


// Считатель (потокобезопасный)
class alignas(64) worker_t : public lite_worker_t {
	Vector g = !Vector(-6, -16, 0);
	Vector a = !(Vector(0, 0, 1) ^ g) * .002;
	Vector b = !(g ^ a) * .002;
	Vector c = (a + b) * -256 + g;
	lite_actor_t* order; // Упорядочиватель

public:
	worker_t() {
		order = lite_actor_get("order"); // Получение упорядочивателя по имени
		assert(order != NULL);
		type_add(lite_msg_type<msg_t>()); // Разрешение принимать сообщение типа msg_t
	}

	// Расчет одного пикселя
	void calc(int x, int y, Vector& p) {
		p.init(13, 13, 13);
		for (int r = 64; r--;) {
			Vector t = a * (Random() - .5) * 99 + b * (Random() - .5) * 99;
			p = sampler(Vector(17, 16, 8) + t, !(t * -1 + (a * (Random() + x) + b * (y + Random()) + c) * 16)) * 3.5 + p;
		}
	}

	// Прием сообщения
	void recv(lite_msg_t* msg) override {
		msg_t* d = lite_msg_data<msg_t>(msg); // Указатель на содержимое
		assert(d != NULL);
		calc(d->x, d->y, d->result); // Расчет
		lite_thread_run(msg, order); // Отправка
	}

};

// Запуск расчета
void actor_start(int threads) {
	// Создание акторов
	// Писатель
	lite_actor_t* writer = lite_actor_create<writer_t>("writer");
	// Упорядочиватель (из lite_thread_util.h)
	lite_order_create<msg_t>("order", writer);
	// Считатель 
	lite_actor_t* worker = lite_actor_create<worker_t>("worker");	
	lite_actor_parallel(threads, worker); // Глубина распараллеливания
	// Ограничение количества потоков
	lite_thread_max(threads);
	// Создание сообщений
	size_t idx = 0; // номер сообщения
	for (int y = HEIGHT; y--;) {
		for (int x = WIDTH; x--;) {
			// Создание сообщения
			lite_msg_t* msg = lite_msg_create<msg_t>();
			// Заполнение
			msg_t* d = lite_msg_data<msg_t>(msg);
			d->idx = idx++;
			d->x = x;
			d->y = y;
			// Отправка
			lite_thread_run(msg, worker);
		}
	}

	lite_thread_end(); // Ожидание окончания расчета
}


Букав поболее чем в исходном, но это понятный код и главное распараллеленый. Представьте как это параллелить в классическом стиле, гораздо больше букав, отладка синхронизации и т.д. и т.п. Я за час на акторы переписал.
Распараллеливание идеальноеi7-6700K (4 ГГц, 4 ядра без HT) Win10
ТестВремя секОригинальный вариант11.61 поток11.62 потока5.83 потока3.954 потока3.016 потоков3.008 потоков2.99


Виртуалка Debian8 там же 1 ядро
ТестВремя секОригинальный вариант10.21 поток10.72 потока10.84 потока10.8

i7-3770K (3.5 ГГц 4 ядра + HT) Win10
ТестВремя секОригинальный вариант15.91 поток15.92 потока8.13 потока5.574 потока4.506 потоков3.828 потоков3.41
А теперь о недостатках теории, точнее о моем неумении ее применять: если кто заметил Упорядочивателя в коде нет. Я его в lite_thread_util.h вынес в виде шаблона, как универсальную подзадачу. Только она не очень универсальная получилась, начни обсчитывать две картинки и он их в кучу смешает. Надеюсь осознание теории будет приходить по мере ее использования.

Что не доделано:
Не знаю как сделать передачу полноценных объектов в сообщении. Сейчас передаются только структуры, т.е. конструктор/деструктор не вызываются. Память выделяется malloc()
Конструктор вызвать не проблема, проблема с деструктором из-за сборки мусора, библиотека при удалении сообщения видит только void* и знает его размер.
Надо как-то при создании объекта сообщить как его уничтожать, т.е. надо как-то поместить в сообщение не только содержимое объекта, но и какую-то универсальную обертку void delete_object(void* obj), вызов которой корректно уничтожит объект. Как это сделать я не знаю, настолько глубоко в С++ не силен.

PS Кто будет пользоваться - пишите, советы по доделке принимаются. Реальных применений в продакшене пока нет, но stress_test достаточно жестокий, отлично отлавливает ошибки в коде, запускал его многократно, он работает стабильно.
...
Рейтинг: 0 / 0
1 сообщений из 1, страница 1 из 1
Форумы / C++ [игнор отключен] [закрыт для гостей] / Распараллеливание без покобезопасности. Реальное использование теории акторов.
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]