powered by simpleCommunicator - 2.0.59     © 2025 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / C++ [игнор отключен] [закрыт для гостей] / очередь задания для потока
26 сообщений из 26, показаны все 2 страниц
очередь задания для потока
    #39204341
alexy_black
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
привет всем.
я тут приболел.. решил сделать вот такую штуку: создаешь объект класса, у него есть функция add - добавляешь задание (std::function<void()>) и это задание исполняется в другом потоке. ну смысл в том, что тебе может быть нужно выполнить несколько тысяч таких заданий и создавать поток для каждого - не нужно, а вот поставить их ждать в одном потоке - вполне можно.

так вот, что-то пошло не так (видимо температура дает свое :) )
вот весь незамысловатый код
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
#include <list>
#include <atomic>
#include <thread>
#include <functional>
#include <condition_variable>

namespace auth {
class task_queue final {
public:
	task_queue(const task_queue&) =delete;
	task_queue& operator = (const task_queue&) =delete;

	task_queue();
	task_queue(task_queue&& other);
	task_queue& operator = (task_queue&& other);
	~task_queue() noexcept ;

	void add(const std::function<void ()>& task);
	std::size_t size() const ;
private:
	std::atomic<bool> stop_{false};
	std::thread thread_;

	mutable std::mutex tasks_mutex_;
	std::list<std::function<void ()>> tasks_;

	std::condition_variable tasks_condition_;
};
} // auth namespace

// cpp file
#include "task_queue.h"
#include <iostream>

auth::task_queue::task_queue(task_queue&& other) : thread_(std::move(other.thread_)), tasks_(std::move(other.tasks_)) {}
auth::task_queue& auth::task_queue::operator = (task_queue&& other)
{
	tasks_ = std::move(other.tasks_);
	thread_ = std::move(other.thread_);
	return *this;
}

auth::task_queue::task_queue() : thread_([this](){
			while(!stop_) {
				std::unique_lock<std::mutex> lock(tasks_mutex_);
				tasks_condition_.wait(lock,[this](){ return 0<tasks_.size() || stop_; });

				std::list<std::function<void ()>> tasks;
				tasks.swap(tasks_);
				lock.unlock();

				for(auto task:tasks) task();
			}
		}) {}

auth::task_queue::~task_queue() noexcept
{
	stop_ = true ;
	std::unique_lock<std::mutex> lock(tasks_mutex_);
	tasks_condition_.notify_all();
	thread_.join();
//	thread_.detach();
}

void auth::task_queue::add(const std::function<void ()>& task)
{
	std::unique_lock<std::mutex> lock(tasks_mutex_);
	tasks_.push_back(task);
	lock.unlock();
	tasks_condition_.notify_one();
}

std::size_t auth::task_queue::size() const
{
	std::unique_lock<std::mutex> lock(tasks_mutex_);
	return tasks_.size();
}




а вот тест
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
#define BOOST_TEST_DYN_LINK
#define BOOST_TEST_MODULE thread queue test

#include <mutex>
#include <boost/test/unit_test.hpp>
#include <iostream>

#include "cpphttpx/cms/auth/task_queue.h"

BOOST_AUTO_TEST_SUITE( task_queue )
BOOST_AUTO_TEST_CASE( add_and_size )
{
	auth::task_queue task;
	BOOST_CHECK_NO_THROW( task.add([](){ std::this_thread::yield();std::this_thread::yield(); }) );
	BOOST_CHECK_NO_THROW( task.add([](){ std::this_thread::yield();std::this_thread::yield(); }) );
	BOOST_CHECK_NO_THROW( task.add([](){ std::this_thread::yield();std::this_thread::yield(); }) );
	BOOST_CHECK( 0<task.size() );
}
BOOST_AUTO_TEST_CASE( tasks )
{
	auth::task_queue task;
	std::timed_mutex checker;
	BOOST_REQUIRE( checker.try_lock_for(std::chrono::milliseconds(10)) );

	BOOST_CHECK_NO_THROW( task.add([&checker](){
			BOOST_REQUIRE( !checker.try_lock() );
			std::cout << "hello from thread tasks" << std::endl;
			checker.unlock();
			}) );

	BOOST_CHECK( checker.try_lock_for(std::chrono::seconds(1)) );
}
BOOST_AUTO_TEST_SUITE_END() // task_queue



вобщем под отладкой это дело не работает вобще - sigabrt . не под отладно - оно падает иногда, если сделать detach (что понятно, потому что поток может продложить работать на унчтоженным объекте), и тупо подвисает если сделать join (но не всегда).

тест вроде проходится (ну то есть чеки выполняются).

что тут не так? и может знает кто готовую реализацию, чтобы свой велосипед не изобретать? а то я уже заколебался (чего-то гугл мне ничего не сказал).
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204350
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Не помешает перепроверить останов, т.к. stop_ может поменяться между while(!stop_) и lock()
Код: plaintext
1.
2.
3.
4.
5.
auth::task_queue::task_queue() : thread_([this](){
			while(!stop_) {
				std::unique_lock<std::mutex> lock(tasks_mutex_);
				if(stop_) break;
				tasks_condition_.wait(lock,[this](){ return 0<tasks_.size() || stop_; });
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204359
alexy_black
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Dima T,

спасибо.. а если я вызываю wait, а условие уже true - то он все равно будет ждать по кто-нибудь не вызовет notify_all ?

я добавил эту строку, но зависон оставлся..

то есть у меня была такая идея: в деструкторе я жду пока очередь опустошится (tasks_mutex_ блокирую), потом жду пока поток завершится, обобработав все задания, но на следующий круг он уже не пойдет, так как стопер... но даже если убрать метекс в деструкторе проблема остается - то есть зависает поток, работающий с очередью, а гланвый поток висит в его ожидании..

но вот интересен вывод (добавил сразу после wait запись в std::cout)

Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
┬─[fleonis@winter:~/projects/tmp/cpphttpx_srv/build]─[17:00:56]                                                                                                                  
╰─>$ tests/cms/task_queue 
Running 2 test cases...
hello from task_queeu
hello from thread tasks

*** No errors detected
┬─[fleonis@winter:~/projects/tmp/cpphttpx_srv/build]─[17:00:56]                                                                                                                  
╰─>$ tests/cms/task_queue 
Running 2 test cases...
hello from task_queeu
hello from task_queeu
hello from thread tasks
hello from task_queeu

*** No errors detected
┬─[fleonis@winter:~/projects/tmp/cpphttpx_srv/build]─[17:00:57]                                                                                                                  
╰─>$ tests/cms/task_queue 
Running 2 test cases...
^[[A
^C⏎                                                                                                                                                                              ┬─[fleonis@winter:~/projects/tmp/cpphttpx_srv/build]─[17:01:36]
╰─>$ 

то есть в последнем случаи этот поток подвис даже не зайдя в кондишн вариабл
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204360
alexy_black
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
ой, что это у меня сообщением? его чего-то раздуло, теперь горизонтальный скрол появился.. не хотел, сори.
зы: модератора, можете поправить?
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204371
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
alexy_blackспасибо.. а если я вызываю wait, а условие уже true - то он все равно будет ждать по кто-нибудь не вызовет notify_all ?

я добавил эту строку, но зависон оставлся..
notify_all() будит только тех кто стоит на wait(), т.е. если один поток вызвал notify_all(), а после второй встал на wait() то остановится.
Данная ситуация крайне редка, но теоретически возможна.

Вот еще косяк, ты мутекс не освободил, наверно из-за этого виснет
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
auth::task_queue::~task_queue() noexcept
{
  {
	stop_ = true ;
	std::unique_lock<std::mutex> lock(tasks_mutex_);
	tasks_condition_.notify_all();
  }
	thread_.join();
//	thread_.detach();
}
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204384
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
И эта строчка лишняя
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
void auth::task_queue::add(const std::function<void ()>& task)
{
	std::unique_lock<std::mutex> lock(tasks_mutex_);
	tasks_.push_back(task);
	lock.unlock();
	tasks_condition_.notify_one();
}


Насколько я понимаю notify_one() надо вызывать при захваченном мутексе. В примерах так .
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204388
alexy_black
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Dima T,

о, стал гораздо реже зависать. если раньше я мог в консоле нажимать наверх и enter то теперь пришлось написать while...

но все равно подвис. я там вставил std::cout << __FILE__ << ":" << __LINE__ << std::endl; это чтобы узнать где, у обранужил, что на первом блокировании мутекса. то есть

Код: plaintext
1.
2.
3.
4.
5.
6.
7.
	std::cout << __FILE__ << ":" << __LINE__ << std::endl; // это напечаталось
	std::unique_lock<std::mutex> lock(tasks_mutex_);
	std::cout << __FILE__ << ":" << __LINE__ << std::endl; // это строка уже не появилась...
	if(stop_) break;
	std::cout << __FILE__ << ":" << __LINE__ << std::endl;
	if(tasks_.size()==0) tasks_condition_.wait(lock,[this](){ return 0<tasks_.size() || stop_; });
	std::cout << __FILE__ << ":" << __LINE__ << std::endl;



еще решил добавить такую проверку
Код: plaintext
1.
if(tasks_.size()==0)
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204394
alexy_black
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
о, вроде заработало!
Dima T спасибо

еще одна ошибка - нельзя похоже так поток инициализировать. то есть его надо было либо последним писать в заголовнике либо инициализировать уже внутри конструктора. может быть поэтому подвисал - поток начинал выполняться до того, как был инициализирован объект.. сейчс сделал в самом конструкторе swap - и все заработало :)

зы: ну, я сейчас уже пару минут тест гоню, вроде нет зависонов. но до полного "заработало" наверное еще далеко :)
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204403
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
У тебя еще архитектурный косяк в коде потока: ты список задач забрал, пока выполнял - новые накопились, а ты уснул на ожидании очередной новой. Добавь проверку что список пуст перед тем как уснуть.
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204449
alexy_black
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Dima TУ тебя еще архитектурный косяк в коде потока: ты список задач забрал, пока выполнял - новые накопились, а ты уснул на ожидании очередной новой. Добавь проверку что список пуст перед тем как уснуть.

ага, я его заметил 18996966

сейчас пытаюсь сделать конструктор перемещения и опрератор присваивания-перемещение..
оказалось мутекс не может переместиться..
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204484
alexy_black
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
такс, вроде сделал.. вот конечная версия, вдруг кому понадобится :)
запустил тест, потом открыл вкладку в браузере, пишу сообщение, но вроде пока не провалился

header
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
#pragma once

/*************************************************************************
 * Copyright © 2016 Hudyaev Alexy <hudyaev.alexy@gmail.com>;
 * This file is part of cpphttpx-cms.
 * Distributed under the GNU Affero General Public License.
 * See accompanying file copying (at the root of this repository)
 * or <http://www.gnu.org/licenses/> for details
 *************************************************************************/


#include <list>
#include <atomic>
#include <thread>
#include <functional>
#include <condition_variable>

namespace auth {
class task_queue final {
public:
	task_queue(const task_queue&) =delete;
	task_queue& operator = (const task_queue&) =delete;

	task_queue();
	task_queue(task_queue&& other);
	task_queue& operator = (task_queue&& other) ;
	~task_queue() noexcept ;

	void add(const std::function<void ()>& task);
	std::size_t size() const ;
private:
	void worker() ;

	mutable std::mutex tasks_mutex_;
	std::list<std::function<void ()>> tasks_;

	std::condition_variable tasks_condition_;

	std::atomic<bool> stop_{false};
	std::thread thread_;
};
} // auth namespace

sources
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
/*******************************************************************************
 * Copyright © 2016 Hudyaev Alexy <hudyaev.alexy@gmail.com>;
 *
 * This file is part of cpphtpx-cms
 *
 * CPPHTTPX-CMS is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * CPPHTTPX-CMS is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with cpphttpx-cms (copying file in the root of this repository). If not,
 * see <http://www.gnu.org/licenses/>
 * *****************************************************************************/


#include "task_queue.h"
#include <cassert>

auth::task_queue::task_queue(task_queue&& other)
{
	operator = (static_cast<task_queue&&>(other)) ;
}

auth::task_queue& auth::task_queue::operator = (auth::task_queue&& other)
{
	assert( !other.stop_ );
	other.stop_=true;

	std::unique_lock<std::mutex> own_lock(tasks_mutex_);
	std::unique_lock<std::mutex> thr_lock(other.tasks_mutex_);
	tasks_ = std::move(other.tasks_);

	std::thread tmp(std::bind(&auth::task_queue::worker,this));
	thread_.swap(tmp);

	return *this;
}

auth::task_queue::task_queue()
{
	std::thread tmp(std::bind(&auth::task_queue::worker,this));
	thread_.swap(tmp);
}

auth::task_queue::~task_queue() noexcept
{
	stop_ = true ;

	{
		std::unique_lock<std::mutex> lock(tasks_mutex_);
		tasks_condition_.notify_one();
	}

	if(thread_.joinable()) thread_.join();
}

void auth::task_queue::add(const std::function<void ()>& task)
{
	std::unique_lock<std::mutex> lock(tasks_mutex_);
	tasks_.push_back(task);
	tasks_condition_.notify_one();
}

std::size_t auth::task_queue::size() const
{
	std::unique_lock<std::mutex> lock(tasks_mutex_);
	return tasks_.size();
}

void auth::task_queue::worker()
{
	while(!stop_) {
		std::list<std::function<void ()>> tasks;

		std::unique_lock<std::mutex> lock(tasks_mutex_);
		if(stop_) break;
		if(tasks_.size()==0) tasks_condition_.wait(lock,[this](){ return 0<tasks_.size() || stop_; });

		tasks.swap(tasks_);
		lock.unlock();

		for(auto task:tasks) task();
	}
}

test
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
#define BOOST_TEST_DYN_LINK
#define BOOST_TEST_MODULE thread queue test

#include <mutex>
#include <boost/test/unit_test.hpp>

#include "cpphttpx/cms/auth/task_queue.h"

void wait_fnc ()
{
	std::this_thread::yield();
	std::this_thread::yield();
}

BOOST_AUTO_TEST_SUITE( task_queue )
BOOST_AUTO_TEST_CASE( add_is_ok )
{
	auth::task_queue task;
	BOOST_CHECK_NO_THROW( task.add(wait_fnc) );
}
BOOST_AUTO_TEST_CASE( tasks )
{
	auth::task_queue task;
	std::timed_mutex checker;
	BOOST_REQUIRE( checker.try_lock_for(std::chrono::milliseconds(10)) );

	BOOST_CHECK_NO_THROW( task.add([&checker](){
			BOOST_REQUIRE( !checker.try_lock() );
			checker.unlock();
			}) );

	BOOST_CHECK( checker.try_lock_for(std::chrono::seconds(1)) );
}
BOOST_AUTO_TEST_CASE( move )
{
	std::timed_mutex checker;
	checker.lock();

	auth::task_queue task_first;

	BOOST_CHECK_NO_THROW( task_first.add([&checker](){wait_fnc();checker.unlock();}) );
	auth::task_queue task_second( std::move(task_first) );

	BOOST_CHECK( checker.try_lock_for(std::chrono::seconds(1)) );
}

BOOST_AUTO_TEST_SUITE_END() // task_queue



тест запускал вот так (у меня fish оболочка)
Код: fish
while tests/cms/task_queue; end;
долго крутился, но не запнулся.. потом еще напишу тест присваинвания..
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204518
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Недавно делал подобное. В итоге родил аналог виндового Event , на этом и остановился. Книжку про С++14 прочитал, продолжить пока руки не доходят.

Задача подобная: быстро вставлять в очередь на обработку в рабочем потоке, затем там не спеша разгребать в другом. Остановился на "быстро вставлять в очередь", твоя идея со swap() списка интересная (спишу у тебя), но есть сомнения использовать list или vector. ИМХУ vector побыстрее будет, пока перевыделение памяти не потребуется.
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204525
alexy_black
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Dima T,

да, кстати, наверное действительно vector.. просто я сначала хотел сделать push_back и pop_front, но потом подумал о swap, а list остался..

но даже не знаю.. просто у меня может получится так, что резко появится огромная куча заданий для очереди, потом их может не быть долгое время. по сравнению со скоростью вставки в лист, решение поставленной в очередь задачи - очень долго, поэтому не думаю что это важно в моем случаи.. можно сделать шаблон - где храить параметр шаблона :)
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204530
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
alexy_blackпо сравнению со скоростью вставки в лист, решение поставленной в очередь задачи - очень долго
В моем случае это несравниваемые величины. У меня рабочий поток должен максимально быстро вставлять задания в очередь, а с какой скоростью они обрабатываются - неважно.
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204536
Dimitry Sibiryakov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
А тупо реализовать паттерн producer-consumer не судьба?..
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204544
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Dimitry SibiryakovА тупо реализовать паттерн producer-consumer не судьба?..
Тут разве не он реализован?
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204575
Dimitry Sibiryakov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Dima TТут разве не он реализован?
Не похоже.
Posted via ActualForum NNTP Server 1.5
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204595
alexy_black
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
блин.. все равно похоже придется реализовывать это в виде шаблона..
внезапно, если туда передать лямбду с переменной, которая может быть только перемещена (то есть в захвате лямбды переместить переменную), то этот код не будет компилится :(

патерн продюсер-консюмер расчитан на то, что один поток данные производит, а другой обрабатывает.. тут я хотел чтобы функции запускались в потоке. отличе в том, что этот класс не содержит логики работы и может обрабатывать данные как ему нужно, а если бы это был ресивер - то он бы должен был знать как обрабатывать данные... если я праильно понимаю.
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204612
Dimitry Sibiryakov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
alexy_blackесли я праильно понимаю.
Неправильно понимаешь. Паттерну совершенно всё равно какие данные скармливаются консумеру
и что тот с ними делает. Он всего лишь описывает модель взаимодействия потоков и
организацию очереди.
Posted via ActualForum NNTP Server 1.5
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204617
alexy_black
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
гы, таки работать с температурой, не айс )))) https://habrahabr.ru/post/188234/ thread pull надо было искать )))

но мне мой код больше нравится. я расчитывал на то, чтобы я мог быстро добавить туда задание и занятся чем-нибудь другим. а в реализации на хабре при добавлении следующего задания ты будешь ждать...

я туда хочу boost::fiber::promise скормить, чтобы он нужню фибру стартовал когда закончит, но не получилось.. так что сейчас переписываю как шаблон.

Dimitry Sibiryakovalexy_blackесли я праильно понимаю.
Неправильно понимаешь. Паттерну совершенно всё равно какие данные скармливаются консумеру
и что тот с ними делает. Он всего лишь описывает модель взаимодействия потоков и
организацию очереди.


так в том-то и дело, что данные, а не задачи. а если задачи - то получается это и есть реализация указанного патерна.
а чтобы реализовать thread_pull нужно я так понимаю несколько таких объектов, и добавлять задание в тот, в котором их меньше.
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204621
Dimitry Sibiryakov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
alexy_blackтак в том-то и дело, что данные, а не задачи.
Указатель на процедуру - тоже данные.
Posted via ActualForum NNTP Server 1.5
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204623
Фотография Изопропил
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
alexy_blackthread_pull
глаз режет

pool
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204629
alexy_black
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Изопропилalexy_blackthread_pull
глаз режет

poolэто такой стягиватель потоков :) тянет потоки с соседского компа :)
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204666
Dima T
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
alexy_blackнужно я так понимаю несколько таких объектов, и добавлять задание в тот, в котором их меньше.
Не самая лучшая реализация. Во-первых каждый раз искать менее загруженный. Во-вторых может поток загружен одним заданием, но оно долгое и последующие 10 успеет выполнить другой поток.

ИМХУ если тебе надо более сложное управление обработкой, то посмотри на готовые либы. Например ZeroMQ
...
Рейтинг: 0 / 0
очередь задания для потока
    #39204893
alexy_black
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Dima Talexy_blackнужно я так понимаю несколько таких объектов, и добавлять задание в тот, в котором их меньше.
Не самая лучшая реализация. Во-первых каждый раз искать менее загруженный. Во-вторых может поток загружен одним заданием, но оно долгое и последующие 10 успеет выполнить другой поток.

ИМХУ если тебе надо более сложное управление обработкой, то посмотри на готовые либы. Например ZeroMQ

это я просто предположил как бэ.. но мне кажется на счет thread pool - ты никак не сможешь узнать какая функция будет дольше выполняться.. там надо наверное делать как я хотел изначально - std::list и первый поток, который освободился, берет первый элемент от туда, а вставляешь элементы в конец. но у меня такой цели не было, нужен был просто один поток.

вот добавил тут функции
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
namespace details {
	template<typename T>
	struct fake_copyable {
		fake_copyable(T&& t) : target(std::forward<T>(t)) {}
		fake_copyable(fake_copyable&& )=default ;
		fake_copyable(const fake_copyable&) {throw std::logic_error("apptemt to copy a fake copyable object");}
		template<typename... Args>
		auto operator()(Args&&... a) { return target(std::forward<Args>(a)...); }

		T target ;
	};
}

// ....

	template< class Function, class ... Args >
	boost::fibers::future<
	    std::result_of_t< std::decay_t< Function >( std::decay_t< Args > ... ) >
	>
	async( Function && fn, Args && ... args) {
		using result_t = std::result_of_t< std::decay_t< Function >( std::decay_t< Args > ... )> ;
		using pt_t = boost::fibers::packaged_task< result_t( std::decay_t< Args > ... ) > ;

		pt_t pt{ /*std::allocator_arg, salloc, */std::forward< Function >( fn) };
		boost::fibers::future< result_t > f{ pt.get_future() };

		details::fake_copyable<pt_t> ca_pt(std::move(pt));
		tasks_.push_back(std::move(ca_pt));
		tasks_condition_.notify_one();

		return f;
	}




эта штука стартует в потоке функцию, а когда она сгенерирует значение, стратует фибру, которая ожидает нужного future. минус - исключение в runtime если забыл что этот copyable объект насамом деле никакой ни copyable. но поскольку это спрятано внутри реализации, то не должно такого произойти.
оригинальная boost::fibers::async эту функцию запускает не в потоке, а в другой фибре. что меня не устраивает, поскольку основной поток должен работать над другими задачами.
...
Рейтинг: 0 / 0
очередь задания для потока
    #39206315
Доктар123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Dimitry SibiryakovА тупо реализовать паттерн producer-consumer не судьба?..

+1

Что бы небыло проблем нужно работать с настоящими атомарными
наборами семафоров... .
А не с защелками...

Задача гарантированно решается атомарным набором из 2 семафоров без тычков пальцем в небо.
...
Рейтинг: 0 / 0
26 сообщений из 26, показаны все 2 страниц
Форумы / C++ [игнор отключен] [закрыт для гостей] / очередь задания для потока
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]