powered by simpleCommunicator - 2.0.59     © 2025 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / C++ [игнор отключен] [закрыт для гостей] / std::conditional_variable.notify_one() "будит" один и тот же поток
12 сообщений из 12, страница 1 из 1
std::conditional_variable.notify_one() "будит" один и тот же поток
    #38459617
HellFighter
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
На выходных я решил по изучать возможности стандартной библиотеки С++11. Завёл MSVC2012 и начал писать класс пула потоков с очередью заданий. В результате, в пока далеко не законченном варианте, получился примерно такой код:

ThreadPool.h:
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
#pragma once

#include <vector>
#include <map>
#include <thread>
#include <atomic>
#include <memory>
#include <random>
#include <deque>
#include <future>
#include <mutex>             
#include <condition_variable>

#include <iostream>

typedef std::tuple<std::packaged_task<int(void*,void*)>,void*,void*> ThreadPoolTask;

class ThreadPool
{
private:
	std::vector< std::pair< std::thread,std::unique_ptr< std::atomic<bool> > > > threads;
	static void threadProcess(ThreadPool *self, const std::atomic<bool> *const active);

	std::mutex cvLockMutex;
	std::condition_variable cvThreadAwait;

	std::deque<ThreadPoolTask> tasks;

	const bool getHeadTask(ThreadPoolTask& task);

public:
	ThreadPool(const uint32_t& threadsNumber = 0);
	virtual ~ThreadPool(void);

	void startNew(const uint32_t& threadsNumber = 1);
	void terminateAll();
	const uint32_t getCount() const;

	void addTask(ThreadPoolTask& task, const bool& queueBegin = false);
};


ThreadPool.cpp:
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
#include "ThreadPool.h"

//-------------------------------------------------------------------------------------------------
ThreadPool::ThreadPool(const uint32_t& threadsNumber)
	: threads(0)
{
	if(threadsNumber)
		this->startNew(threadsNumber);
}
//-------------------------------------------------------------------------------------------------
ThreadPool::~ThreadPool(void)
{
	this->terminateAll();
}
//-------------------------------------------------------------------------------------------------
void ThreadPool::threadProcess(ThreadPool *self,  const std::atomic<bool> *const active)
{
	std::cout << "START - Thread ID # " << std::this_thread::get_id() << std::endl;

	std::random_device randomDevice;

	std::unique_lock<std::mutex> cvLock(self->cvLockMutex);

	ThreadPoolTask task;

	while(active->load())
	{
		if(self->getHeadTask(task))
		{
			std::cout << "Thread ID # " << std::this_thread::get_id() << std::endl;

			std::get<0>(task).operator()(std::get<1>(task),std::get<2>(task));

			std::this_thread::sleep_for(std::chrono::milliseconds(randomDevice()%1000));
		}
		else
		{
			// Перейти в ожидание
			self->cvThreadAwait.wait(cvLock);
		}

	}

	std::cout << "END - Thread ID # " << std::this_thread::get_id() << std::endl;
}
//-------------------------------------------------------------------------------------------------
void ThreadPool::startNew(const uint32_t& threadsNumber)
{
	for(uint32_t i = 0; i < threadsNumber; ++i)
	{
		std::atomic<bool> *activitiFlag = new std::atomic<bool>(true);
		this->threads.push_back( std::pair<std::thread,std::unique_ptr<std::atomic<bool>>>( std::thread(ThreadPool::threadProcess, this, activitiFlag) ,std::unique_ptr<std::atomic<bool>>(activitiFlag) ) );
	}
}
//-------------------------------------------------------------------------------------------------
void ThreadPool::terminateAll()
{
	for(auto& thread : this->threads)
		thread.second->store(false);

	this->cvThreadAwait.notify_all();

	for(auto& thread : this->threads)
		thread.first.join();

	this->threads.clear();
}
//-------------------------------------------------------------------------------------------------
const uint32_t ThreadPool::getCount() const
{
	return this->threads.size();
}
//-------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------
void ThreadPool::addTask(ThreadPoolTask& task, const bool& queueBegin)
{
	(queueBegin ? this->tasks.push_front(std::move(task)) : this->tasks.push_back(std::move(task)));

	this->cvThreadAwait.notify_one();
}
//-------------------------------------------------------------------------------------------------
const bool ThreadPool::getHeadTask(ThreadPoolTask& task)
{
	if(this->tasks.empty())
		return false;

	task = std::move(this->tasks.front());
	this->tasks.pop_front();

	return true;
}
//-------------------------------------------------------------------------------------------------



Для тестов использовалось консольное приложение:
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
int funk(void*,void*)
{
	wcout << L"\t\t\tРабота в потоке!" << endl;
	return 0;
}

int main(/**/)
{
	setlocale(LC_ALL,"RUS");

	wcout << L"Старт программы" << endl;

	ThreadPool tp;

	tp.addTask(ThreadPoolTask(std::packaged_task<int(void*,void*)>(funk),nullptr,nullptr));

	tp.startNew(5);

	_getch();

	tp.addTask(ThreadPoolTask(std::packaged_task<int(void*,void*)>(funk),nullptr,nullptr));
	tp.addTask(ThreadPoolTask(std::packaged_task<int(void*,void*)>(funk),nullptr,nullptr));

	_getch();

	tp.addTask(ThreadPoolTask(std::packaged_task<int(void*,void*)>(funk),nullptr,nullptr));
	tp.addTask(ThreadPoolTask(std::packaged_task<int(void*,void*)>(funk),nullptr,nullptr));
	tp.addTask(ThreadPoolTask(std::packaged_task<int(void*,void*)>(funk),nullptr,nullptr));

	_getch();

	tp.addTask(ThreadPoolTask(std::packaged_task<int(void*,void*)>(funk),nullptr,nullptr));
	tp.addTask(ThreadPoolTask(std::packaged_task<int(void*,void*)>(funk),nullptr,nullptr));

	_getch();

	tp.terminateAll();

	_getch();

	wcout << L"Завершение программы" << endl;

}



Всё работает отлично (не считая отсутствия синхронизации, в том числе при выводе в консоль, но это не законченный вариант).

Но вот только при вызове каждой группы tp.addTask из консольной программы, вызов this->cvThreadAwait.notify_one(); в методе void ThreadPool::addTask(ThreadPoolTask& task, const bool& queueBegin) будит один и тот же поток! (привёл в файле картинку вывода для наглядности) Если вместо notify_one() использовать notify_all() , то, как и задумано, вызываются случайные потоки на каждое задание.

Что я делаю не верно, что вызывается один и тот же поток? При этом при загрузке следующей группы заданий возможно будет использоваться другой поток, но точно так же только он один. Или может я не правильно понимаю нормальное поведение notify_one() ?
...
Рейтинг: 0 / 0
std::conditional_variable.notify_one() "будит" один и тот же поток
    #38459733
Фотография Anatoly Moskovsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
HellFighter,

Потому что, глядя на функцию потока, как только какой нибудь поток получает сообщение, он уже никогда не вызывает wait() снова, таким образом не разблокируя мьютекс и не давая шанса другим потокам получить следующие сообщения.
...
Рейтинг: 0 / 0
std::conditional_variable.notify_one() "будит" один и тот же поток
    #38459755
locked
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
HellFighter,

твои задания исполняются с залоченным мютексом.
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
void ThreadPool::threadProcess(ThreadPool *self,  const std::atomic<bool> *const active)
{
	std::cout << "START - Thread ID # " << std::this_thread::get_id() << std::endl;

	std::random_device randomDevice;

	std::unique_lock<std::mutex> cvLock(self->cvLockMutex);

	ThreadPoolTask task;

	while(active->load())
	{
		if(self->getHeadTask(task))
		{
                        cvLock.unlock();
			std::cout << "Thread ID # " << std::this_thread::get_id() << std::endl;

			std::get<0>(task).operator()(std::get<1>(task),std::get<2>(task));

			std::this_thread::sleep_for(std::chrono::milliseconds(randomDevice()%1000));
                        cvLock.lock();
		}
		else
		{
			// Перейти в ожидание
			self->cvThreadAwait.wait(cvLock);
		}

	}

	std::cout << "END - Thread ID # " << std::this_thread::get_id() << std::endl;
}



Добавление в tasks нужно защитить мютексом
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
void ThreadPool::addTask(ThreadPoolTask& task, const bool& queueBegin)
{
	std::unique_lock<std::mutex> cvLock(cvLockMutex);
	(queueBegin ? this->tasks.push_front(std::move(task)) : this->tasks.push_back(std::move(task)));

	this->cvThreadAwait.notify_one();
}
...
Рейтинг: 0 / 0
std::conditional_variable.notify_one() "будит" один и тот же поток
    #38460544
HellFighter
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Спасибо за ответы. Видимо я не совсем понимаю принцип действия conditional_variable, раз ошибка действительно в этом.
Сейчас код мне не доступен, дома проверю как будет с исправлениями.
А кто-нибудь может мне коротко и доступно объяснить как работает conditional_variable, чтобы больше не возникало таких непониманий? Я читал на cplusplus.com, но видимо что-то не до конца понял..... (Я вообще впервые работаю с условными переменными в принципе)

В моём понимании при вызове notify() поток должен автоматически разблокировать mutex захваченный conditional_variable , видимо нет?
...
Рейтинг: 0 / 0
std::conditional_variable.notify_one() "будит" один и тот же поток
    #38460581
locked
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
HellFighterСпасибо за ответы. Видимо я не совсем понимаю принцип действия conditional_variable, раз ошибка действительно в этом.
Сейчас код мне не доступен, дома проверю как будет с исправлениями.
А кто-нибудь может мне коротко и доступно объяснить как работает conditional_variable, чтобы больше не возникало таких непониманий? Я читал на cplusplus.com, но видимо что-то не до конца понял..... (Я вообще впервые работаю с условными переменными в принципе)

В моём понимании при вызове notify() поток должен автоматически разблокировать mutex захваченный conditional_variable , видимо нет?
Наоборот. При входе в wait нить блокируется и мютекс освобождается а по notify_... одна или все нити разблокируются и конкурируют за мютекс, захватившая его нить выходит из wait.
...
Рейтинг: 0 / 0
std::conditional_variable.notify_one() "будит" один и тот же поток
    #38460993
HellFighter
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Проверил код дома - само собой помогло =)
А так-же большое спасибо за объяснение, кажется теперь я понимаю как это работает =)

А подскажите, не слишком ли накладно получается тут работа с мьютексом?
В потоке для выполнения одного задания происходят:
1. Захват мьютекса
2. Освобождение мьютекса при ожидании на условной переменной
3. Взятие мьютекса при пробуждении на условной переменной
4. Освобождение мьютекса после взятия задания из очереди
5. Обратный захват мьютекса после выполнения задания (повторение п.1 ?)

Ведь на сколько я знаю работа с мьютексами очень затратна по ресурсам (хотя в данном случае наверное избавление от цикла ожидания задания даёт больший выигрыш).

Может можете подсказать, есть более "правильный" вариант организации работы пула потоков с пополняющейся очередью заданий?
...
Рейтинг: 0 / 0
std::conditional_variable.notify_one() "будит" один и тот же поток
    #38461219
locked
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
HellFighterПроверил код дома - само собой помогло =)
А так-же большое спасибо за объяснение, кажется теперь я понимаю как это работает =)

А подскажите, не слишком ли накладно получается тут работа с мьютексом?
В потоке для выполнения одного задания происходят:
1. Захват мьютекса
2. Освобождение мьютекса при ожидании на условной переменной
3. Взятие мьютекса при пробуждении на условной переменной
4. Освобождение мьютекса после взятия задания из очереди
5. Обратный захват мьютекса после выполнения задания (повторение п.1 ?)

Ведь на сколько я знаю работа с мьютексами очень затратна по ресурсам (хотя в данном случае наверное избавление от цикла ожидания задания даёт больший выигрыш).

Может можете подсказать, есть более "правильный" вариант организации работы пула потоков с пополняющейся очередью заданий?
На самом деле не все так страшно. Один lock/unlock на круг. Как ты себе представляешь доступ к очереди задач без блокировок? Другое дело что есть много места для оптимизации. Например STL контейнеры не самое лучшее решение для очереди.
...
Рейтинг: 0 / 0
std::conditional_variable.notify_one() "будит" один и тот же поток
    #38461628
HellFighter
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
locked,

Так вроде 2 lock/unlock на круг о_О Те что в коде и те что происходят внутри wait , нет?

Было бы интересно выслушать все замечания по поводу возможных оптимизаций =)

По поводу STL - конечно изначально именно в нём и пытался потренироваться) Но почему не лучшее решение использовать STL? Что вместо, статические массивы? Тогда либо очередь фиксированной длины, либо код по управлению размером массива, аналогичный STL-евскому.
+ При использовании STL можно вообще шаблонное задание сделать (правда я не стал тк решил что с помошью 2х указателей переданных в обработчик можно решить любую задачу, например pThis и pMyDataStructure)
...
Рейтинг: 0 / 0
std::conditional_variable.notify_one() "будит" один и тот же поток
    #38461660
locked
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
HellFighterlocked,
Так вроде 2 lock/unlock на круг о_О


Если очередь пуста - зачем тогда нужен пул?
...
Рейтинг: 0 / 0
std::conditional_variable.notify_one() "будит" один и тот же поток
    #38462565
HellFighter
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
lockedHellFighterlocked,
Так вроде 2 lock/unlock на круг о_О


Если очередь пуста - зачем тогда нужен пул?

А как это относится к цитате? о_О Не понял вопроса...
...
Рейтинг: 0 / 0
std::conditional_variable.notify_one() "будит" один и тот же поток
    #38462576
Фотография Anatoly Moskovsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
HellFighter,

Он имеет в виду - если вы не будете отпускать мьютекс пока обрабатываете сообщение, то только один поток будет обрабатывать сообщения, а остальные будут ждать, что делает пул потоков бесполезным.
...
Рейтинг: 0 / 0
std::conditional_variable.notify_one() "будит" один и тот же поток
    #38464026
HellFighter
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Всем спасибо, с вопросом разобрался =)
...
Рейтинг: 0 / 0
12 сообщений из 12, страница 1 из 1
Форумы / C++ [игнор отключен] [закрыт для гостей] / std::conditional_variable.notify_one() "будит" один и тот же поток
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]