powered by simpleCommunicator - 2.0.59     © 2026 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / Caché, Ensemble, DeepSee, MiniM, IRIS, GT.M [игнор отключен] [закрыт для гостей] / Пул потоков
9 сообщений из 9, страница 1 из 1
Пул потоков
    #35935211
Leron
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Написал простенький пул, и оказалось что работает он медленнее, чем если бы напрямую запустить выполнение задачи. Просмотрев результаты MONLBL обнаружил, что блокировка глобала (заменяет мютекс, т.к. иного не обнаружил в cache) - самая ресурсоемкая операция. Чем можно заменить виндовый мютекс в cache?
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
40        10010   0.060316   0.060316 WaitTask(Task) 
41        10010   0.029907   0.029907  Set $ZT="HandleWaitTaskError"
42        10010   0.034258   0.034258  New TaskNum Set (TaskNum,Task)=""
43        10010   0.019723   0.019723  For {
44      2765081  12.882675  12.882680  	QUIT:($G(^CacheTempUsrPoolEthalon("poolinfo", "Terminated"),0)=1)		
45      2765075 407.020361 407.020361  	L +^CacheTempUsrPoolEthalon("tasks")
46      2765077  14.034835  14.034835  	Set:($p($G(^CacheTempUsrPoolEthalon("tasks"),0),"#", 2)>0) TaskNum=$O(^CacheTempUsrPoolEthalon("tasks",""),1,Task)	
47      2765079   4.771903   4.771903  	if (Task'="")&&(TaskNum'="") { 			
48        10000   0.208864   0.208864  		Kill ^CacheTempUsrPoolEthalon("tasks", TaskNum)
49        10000   0.092081   0.092081  		Set:$p($G(^CacheTempUsrPoolEthalon("tasks"),0),"#", 2)>=1 $p(^CacheTempUsrPoolEthalon("tasks"),"#", 2)=$p($G(^CacheTempUsrPoolEthalon("tasks"),0),"#", 2)-1
50            0   0          0         	}		
51      2765086  20.894297  20.894306  	L -^CacheTempUsrPoolEthalon("tasks")		
52      2765089   5.084789   5.084789  	Quit:Task'=""
53      2755091   7.510073   7.510077  	h 0.001
54      2755093   4.934930   4.934932  }
55        10000   0.045289   0.045289  QUIT Task

Исходник:
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
222.
223.
224.
225.
226.
227.
#;#Include Defines

#; глобал для временных переменных пула
#define jobpool ^CacheTempUsrPoolEthalon

#define InitUserData(%threadarg)	
#define FreeUserData 

#define OnStartThread ##continue
	Do HIGH^%PRIO							##continue	
	$$$SetPoolThreadInfo($j,"Busy", 0 )		##continue
	$$$SetPoolThreadInfo($j,"Started", 1 )	##continue	
	L +$$$jobpool("poolinfo")				##continue
	$$$SetPoolInfo("RunThreadCount", $$$GetPoolInfo("RunThreadCount", 0 ) +  1 ) ##continue
	L -$$$jobpool("poolinfo")				##continue
	$$$TRACETEXT($$$datetime_"[ThreadProc]: поток "_$j_" стартовал.")

#define OnStopThread ##continue	
	$$$SetPoolThreadInfo($j,"Started", 0 )	##continue	
	K $$$jobpool("threads",$j)				##continue	
	L +$$$jobpool("poolinfo")				##continue
	$$$SetPoolInfo("RunThreadCount", $$$GetPoolInfo("RunThreadCount", 0 ) -  1 ) ##continue
	L -$$$jobpool("poolinfo")				##continue
	$$$TRACETEXT($$$datetime_"[ThreadProc]: поток "_$j_" остановлен.")


#Define GetPoolThreadInfo(%thread,%key,%default)		$G($$$jobpool("threads",%thread,%key),%default)
#Define SetPoolThreadInfo(%thread,%key,%value)			Set $$$jobpool("threads",%thread,%key)=%value
#Define GetPoolInfo(%key,%default)						$G($$$jobpool("poolinfo", %key),%default)
#Define SetPoolInfo(%key,%value)						Set $$$jobpool("poolinfo", %key)=%value
#Define datetime $ZDT($ZTIMESTAMP, 4 , 1 , 3 )

#Define TRACETEXT(%text)
#define AddLog(%level,%text)
#define SOBAKA(%text)

#;==================================================================
#;
#; Обработать данные
#;
#;==================================================================	
HandleTask(Task)
	Set $ZT="HandleHandleTaskError"	
	$$$SOBAKA("ThreadId="_$j_"  Task="_Task)
	n i
	f i= 1 : 1 : 1200  s $bit(^a((Task*i)# 32000 ),i)=''(Task# 1 )
	QUIT  1 
HandleHandleTaskError
	Set $zt = ""
	$$$AddLog($$$LogLvlError, "HandleTask: Error ze="_$ze)
	QUIT  0 

#;==================================================================
#;
#; Добавить задачу в очередь
#;
#;==================================================================
AddTask(Task)
	Set $ZT="HandleAddTaskError"
	#; флаг блокировки очереди. на случай ошибки проверить его и разблокировать очередь если флаг= 1 
	New Locked Set Locked= 0 	
	$$$TRACETEXT($$$datetime_"[AddTask]: Добавление задачи "_Task_" ...")	
	G:($$$GetPoolInfo("RunThreadCount", 0 )= 0 ) HandleAddTaskErrorEmptyPoolSize		
	For {
		QUIT:($$$GetPoolInfo("Terminated", 0 )= 1 )
		L +$$$jobpool("tasks")
		Set Locked= 1 				
		#; если очередь задач не превышена
		if ($p($G($$$jobpool("tasks"), 0 ),"#",  2 )<$$$GetPoolInfo("MaxTaskSize", 0 )) {
			#; ^task("tasks")=A#B
			#; A - сколько всего было добавлено сообщений в очередь
			Set $p($$$jobpool("tasks"),"#",  1 )=$p($G($$$jobpool("tasks"), 0 ),"#",  1 )+ 1 
			#; B - количество задач в очереди
			Set $p($$$jobpool("tasks"),"#",  2 )=$p($G($$$jobpool("tasks"), 0 ),"#",  2 )+ 1 
			#; ^task("tasks",TaskNum)=Task
			Set $$$jobpool("tasks", $p($$$jobpool("tasks"),"#",  1 ))=Task								
			$$$TRACETEXT($$$datetime_"[AddTask]: Задача "_Task_" добавлена!")
			L -$$$jobpool("tasks")
			Set Locked= 0 
			QUIT	
		}
		L -$$$jobpool("tasks")
		Set Locked= 0 
	}
	QUIT  1 
HandleAddTaskErrorEmptyPoolSize
	Set $ZT = ""
	$$$AddLog($$$LogLvlError, "AddTask : Empty Pool Size!")	
	QUIT  0 
HandleAddTaskError
	Set $ZT = ""
	if (Locked= 1 ) { L -$$$jobpool("tasks") }
	$$$AddLog($$$LogLvlError, "AddTask : Error ze="_$ze)
	QUIT  0 
	
#;==================================================================
#;
#; ждать задачи для потока, вернуть ее ID и удалить задачу из очереди
#; 
#;==================================================================
WaitTask(Task)	
	Set $ZT="HandleWaitTaskError"
	New TaskNum Set (TaskNum,Task)=""
	For {
		QUIT:($$$GetPoolInfo("Terminated", 0 )= 1 )		
		L +$$$jobpool("tasks")
		#; получить ID задачи если очередь не пуста
		Set:($p($G($$$jobpool("tasks"), 0 ),"#",  2 )> 0 ) TaskNum=$O($$$jobpool("tasks",""), 1 ,Task)	
		if (Task'="")&&(TaskNum'="") { 			
			#; удаляем задачу из очереди
			Kill $$$jobpool("tasks", TaskNum)
			Set:$p($G($$$jobpool("tasks"), 0 ),"#",  2 )>= 1  $p($$$jobpool("tasks"),"#",  2 )=$p($G($$$jobpool("tasks"), 0 ),"#",  2 )- 1 
		}		
		L -$$$jobpool("tasks")		
		Quit:Task'=""
		h  0 . 001 
	}
	QUIT Task
HandleWaitTaskError
	Set $ZT=""
	$$$AddLog($$$LogLvlError, "WaitTask : Error $ZE="_$ZE)		
	QUIT ""
	
#;==================================================================
#;
#; Потоковая функция
#;
#;==================================================================			
ThreadProc(arg1)	
	Set $ZT="HandleThreadProcError"
	New Task Set Task=""			
	$$$OnStartThread	
	$$$InitUserData(arg1)	
	For {				
		Do WaitTask(.Task)
		QUIT:Task=""
	
		#; установить флаг, что поток занят
		$$$SetPoolThreadInfo($j,"Busy", 1 )
		$$$SetPoolThreadInfo($j,"Task",Task)		
		
		#;^------ обработать задачу ------
		$$$TRACETEXT($$$datetime_"[ThreadProc]: поток "_$j_" принял задачу "_Task)
		Do HandleTask(Task)		
		#;$------ обработать задачу ------
					
		$$$SetPoolThreadInfo($j,"Busy", 0 )
		$$$SetPoolThreadInfo($j,"Task","")	
	}
	$$$FreeUserData	
	$$$OnStopThread	
	QUIT  1 
HandleThreadProcError
	Set $ZT=""
	$$$FreeUserData
	$$$OnStopThread
	$$$AddLog($$$LogLvlError, "ThreadProc : Error $ZE="_$ZE)	
	QUIT  0 
	
#;==================================================================
#;
#; инициализация пула потоков. 
#; ThreadCount - максимальное количество потоков
#; MaxTaskSize - максимальный размер очереди задач
#; ThreadFuncArg - аргумент потоковой функции
#;==================================================================
StartPool(ThreadCount= 0 ,MaxTaskSize= 0 ,ThreadFuncArg="")
	Set $ZT="HandleStartPoolError"
	QUIT:(ThreadCount= 0 )||(MaxTaskSize= 0 )  0 
	#; проверка, возможно пул уже стартовал
	QUIT:($$$GetPoolInfo("Initialized", 0 )= 1 )  0 	
	Do StopPool	
	$$$TRACETEXT($$$datetime_"[StartPool]: Старт пула потоков...")	
	L +$$$jobpool	
	K $$$jobpool
	L -$$$jobpool
	$$$SetPoolInfo("MaxPoolSize",ThreadCount)
	$$$SetPoolInfo("MaxTaskSize",MaxTaskSize)
	$$$SetPoolInfo("Terminated", 0 )
	$$$SetPoolInfo("StartTime",$$$datetime)			
	For i= 1 : 1 :ThreadCount { Do CreateThread(ThreadFuncArg) }	
	$$$SetPoolInfo("Initialized", 1 )		
	$$$TRACETEXT($$$datetime_"[StartPool]: Пул стартовал!")
	QUIT  1 
HandleStartPoolError
	Set $ZT=""
	$$$SetPoolInfo("Initialized", 0 )
	$$$AddLog($$$LogLvlError, "StartPool : Error $ZE="_$ZE)	
	QUIT  0 
	
		
#; запуск одного потока	
CreateThread(arg)
	QUIT:($$$GetPoolInfo("Terminated", 0 )= 1 )||($$$GetPoolInfo("RunThreadCount", 0 )>=$$$GetPoolInfo("MaxPoolSize", 0 ))
	J ThreadProc(arg)
	QUIT  1 
	
#; остановить все потоки
StopPool
	$$$TRACETEXT($$$datetime_"[StopPool]: Остановка пула...")
	$$$SetPoolInfo("Initialized", 0 )	
	$$$SetPoolInfo("Terminated", 1 )	
	$$$SetPoolInfo("StopTime",$$$datetime)
	h  0 . 2 
	$$$TRACETEXT($$$datetime_"[StopPool]: Пул остановлен! Потоков:"_$$$GetPoolInfo("RunThreadCount", 0 ))	
	QUIT

test
	n i
	d StopPool
	h  1 
	d StartPool( 10 , 100 )
	s start=$ZTIMESTAMP
	#;w "time start with thread:"_$$$datetime,!
	f i= 1 : 1 : 10000  { d AddTask(i) }
	#;w "time end with thread:"_$$$datetime,!
	s stop=$ZTIMESTAMP
	d SubDate^DateFunc(stop,start,.time)
	w "time end with thread:"_$ZT($PIECE(time,",", 2 ), 1 , 3 ),!
	
	s start=$ZTIMESTAMP
	#;w "time start without thread:"_$$$datetime,!
	f i= 1 : 1 : 10000  { d HandleTask(i) }
	#;w "time end without thread:"_$$$datetime,!
	s stop=$ZTIMESTAMP
	d SubDate^DateFunc(stop,start,.time)
	w "time end without thread:"_$ZT($PIECE(time,",", 2 ), 1 , 3 ),!
	q
...
Рейтинг: 0 / 0
Пул потоков
    #35937744
MaWr
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
А если сделать так:
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
WaitTask(Task)	
	Set $ZT="HandleWaitTaskError"
	New TaskNum,Quit Set (TaskNum,Task)=""
	Set TaskNum=""
	Set Quit= 0 
	For {
		if $$$GetPoolInfo("Terminated", 0 )= 1  Set Quit= 1  Quit
		#; получить ID задачи если очередь не пуста
		Set TaskNum=$O($$$jobpool("tasks",TaskNum), 1 ,Task)
		if TaskNum'="" L +$$$jobpool("tasks",TaskNum): 0  Quit:$T
		h  0 . 01 
	}
	QUIT:Quit ""
	Kill $$$jobpool("tasks",TaskNum)
	If $I($$$jobpool("taskCount"),- 1 )
	L -$$$jobpool("tasks",TaskNum)
	QUIT Task
HandleWaitTaskError
	Set $ZT=""
	$$$AddLog($$$LogLvlError, "WaitTask : Error $ZE="_$ZE)		
	QUIT ""
Ну и соответственно в
AddTask(Task)
вместо
Код: plaintext
1.
Set $p($$$jobpool("tasks"),"#",  2 )=$p($G($$$jobpool("tasks"), 0 ),"#",  2 )+ 1 
написать
Код: plaintext
1.
If $I($$$jobpool("taskCount"))
...
Рейтинг: 0 / 0
Пул потоков
    #35939061
Turk
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Код: plaintext
Do HIGH^%PRIO
Это так обязательно? В этом случае потоки обработки заданий будут обращаться к $$$jobpool("tasks") (в WaitTask) намного чаще, чем туда будут попадать задания (в AddTask). Очередь обращений к $$$jobpool("tasks") будет примерно следующей: T1-...-T10-M-T1-...-T10-M-... (T1...T10 - потоки обработки, M - главный поток, добавляющий задания). И это в лучшем случае, на деле главный потом будет получать доступ к $$$jobpool("tasks") намного реже. Как следствие, очень значительное время работы будет уходить на холостую проверку $$$jobpool("tasks").
Попробуйте изменять это значение на LOW или NORMAL. Либо можете динамически менять приоритет, например:
Код: plaintext
1.
2.
3.
4.
5.
Do LOW^%PRIO  //или NORMAL, но не HIGH
Do WaitTask(.Task)
#; LOW -> NORMAL = "Ниже среднего"; HIGH -> NORMAL = "Средний"
Do HIGH^%PRIO
Do NORMAL^%PRIO  // если нужно HIGH, то закомментировать эту строку
...
Рейтинг: 0 / 0
Пул потоков
    #35939619
Leron
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
MaWr
Ваш пример, к сожалению, некорректно,на мой взгляд, работает в месте:
Код: plaintext
1.
Set TaskNum=$O($$$jobpool("tasks",TaskNum), 1 ,Task)
if TaskNum'="" L +$$$jobpool("tasks",TaskNum): 0  Quit:$T 

В то время, когда TaskNum получен, очередь не заблокирована, и другой поток может в этот момент принять ту же самую задачу
кстати что и показало тестирование, в логе есть сообщения типа
ThreadId=2344 Task=1
ThreadId=244 Task=1
ThreadId=8456 Task=1
Задача обрабатывается только один раз.

Turk
Пробовал как Вы сказали, результат даже хуже (по времени обработки).

Может отказаться от очереди задач? т.е. для каждого потока будет свой узел глобала и при старте потоков блокировать этот узел. в WaitTask поставить ожидание разблокировки узла (для каждого потока свой узел).
получится что то типа

Код: plaintext
1.
2.
3.
4.
5.
6.
WaitTask(TaskNum)
  ...
   L +$$$jobpool("tasks",TaskNum)
   считать номер задачи   
    L -$$$jobpool("tasks",TaskNum)
   обработка задачи
   ...

Код: plaintext
1.
2.
3.
4.
AddTask
   найти узел $$$jobpool("tasks",TaskNum), который заблокирован
   записать туда задачу
   L -$$$jobpool("tasks",TaskNum)
   L +$$$jobpool("tasks",TaskNum)
...
Рейтинг: 0 / 0
Пул потоков
    #35942111
MaWr
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Leron MaWr
Ваш пример, к сожалению, некорректно,на мой взгляд, работает в месте:
Код: plaintext
1.
Set TaskNum=$O($$$jobpool("tasks",TaskNum), 1 ,Task)
if TaskNum'="" L +$$$jobpool("tasks",TaskNum): 0  Quit:$T 

В то время, когда TaskNum получен, очередь не заблокирована, и другой поток может в этот момент принять ту же самую задачу
кстати что и показало тестирование, в логе есть сообщения типа
ThreadId=2344 Task=1
ThreadId=244 Task=1
ThreadId=8456 Task=1
Задача обрабатывается только один раз.


После получения TaskNum в моем варианте происходит блокирование
Код: plaintext
L +$$$jobpool("tasks",TaskNum): 0 
И если данный узел уже кем-то заблокирован, то берется следующий и т.д.
Идея моего подхода заключается в использовании блокировки со временем ожидания 0, в отличие от Вашей бесконечной.
...
Рейтинг: 0 / 0
Пул потоков
    #35942618
Turk
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Leron
Turk
Пробовал как Вы сказали, результат даже хуже (по времени обработки).
Странно. Так как снижение нагрузки с 10 высокоприоритетных потоков обработки с повышение нагрузки одного потока формирования очереди должно было как-то позитивно отразиться.
Пробовали все возможные варианты? Динамическое изменение приоритета тоже требует определенных временных затрат, поэтому скорее вариант со статическим приоритетом будет эффективнее.
Большое число потоков отработки тоже может играть отрицательную роль (если только у вас не 10+ ядер).
...
Рейтинг: 0 / 0
Пул потоков
    #35947273
Leron
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
MaWr , извеняюсь, я поторопился, когда протестировал то что Вы предложили и не особо разбираясь, почему неправильно работает пул. Ваша идея ожидания с таймаутом правильная, но ошибка в другом -
допустим в очереди 10 задач (максимум тоже 10) с номерами 1..10,
значение $$$jobpool("taskCount") тоже 10,
добавляем новую задачу(AddTask), в AddTask ждем пока размер очереди станет <10,
в этот момент задача под №1 ушла из очереди, установив счетчик $$$jobpool("taskCount") в 9,
в AddTask: установим счетчик $$$jobpool("taskCount") в 10, добавляем новую задачу на позицию 10 (в Вашем варианте) поверх старой задачи, а ведь под №10 уже была задача, и на обработку она никогда не поступит.
Вот так будут теряться задачи, ну а почему потоки получали одинаковые задачи, я так и не понял...


Turk , попробуйте протестируйте, исходник вполне компилится (если убрать вызовы Date^DateFunc)

вот Исходник:

Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
222.
223.
224.
225.
226.
227.
228.
#;#Include Defines

#; глобал для временных переменных пула
#define jobpool ^CacheempUsrPoolEthalon

#define InitUserData(%threadarg)	
#define FreeUserData 

#define OnStartThread ##continue
	#;Do HIGH^%PRIO							##continue	
	$$$SetPoolThreadInfo($j,"Busy", 0 )		##continue
	$$$SetPoolThreadInfo($j,"Started", 1 )	##continue	
	L +$$$jobpool("poolinfo")				##continue
	$$$SetPoolInfo("RunThreadCount", $$$GetPoolInfo("RunThreadCount", 0 ) +  1 ) ##continue
	L -$$$jobpool("poolinfo")				##continue
	$$$TRACETEXT($$$datetime_"[ThreadProc]: поток "_$j_" стартовал.")

#define OnStopThread ##continue	
	$$$SetPoolThreadInfo($j,"Started", 0 )	##continue	
	K $$$jobpool("threads",$j)				##continue	
	L +$$$jobpool("poolinfo")				##continue
	$$$SetPoolInfo("RunThreadCount", $$$GetPoolInfo("RunThreadCount", 0 ) -  1 ) ##continue
	L -$$$jobpool("poolinfo")				##continue
	$$$TRACETEXT($$$datetime_"[ThreadProc]: поток "_$j_" остановлен.")


#Define GetPoolThreadInfo(%thread,%key,%default)		$G($$$jobpool("threads",%thread,%key),%default)
#Define SetPoolThreadInfo(%thread,%key,%value)			Set $$$jobpool("threads",%thread,%key)=%value
#Define GetPoolInfo(%key,%default)						$G($$$jobpool("poolinfo", %key),%default)
#Define SetPoolInfo(%key,%value)						Set $$$jobpool("poolinfo", %key)=%value
#Define datetime $ZDT($ZTIMESTAMP, 4 , 1 , 3 )

#define LogLvlError  1 
#define AddLog(%level,%text) s ^LogErrors($i(^LogErrors))="["_%level_"] : "_%text
#define SOBAKA(%x) s ^sobaka($i(^sobaka))=%x
#Define TRACETEXT(%text)
#;==================================================================
#;
#; Обработать данные
#;
#;==================================================================	
HandleTask(Task)
	Set $ZT="HandleHandleTaskError"	
	$$$SOBAKA("ThreadId="_$j_"  Task="_Task)
	n i f i= 1 : 1 : 100  s $bit(^a((Task*i)# 32000 ),i)=''(Task# 1 )
	QUIT  1 
HandleHandleTaskError
	Set $zt = ""
	$$$AddLog($$$LogLvlError, "HandleTask: Error ze="_$ze)
	QUIT  0 

#;==================================================================
#;
#; Добавить задачу в очередь
#;
#;==================================================================
AddTask(Task)
	Set $ZT="HandleAddTaskError"
	#; флаг блокировки очереди. на случай ошибки проверить его и разблокировать очередь если флаг= 1 
	New Locked Set Locked= 0 	
	$$$TRACETEXT($$$datetime_"[AddTask]: Добавление задачи "_Task_" ...")	
	G:($$$GetPoolInfo("RunThreadCount", 0 )= 0 ) HandleAddTaskErrorEmptyPoolSize		
	For {
		QUIT:($$$GetPoolInfo("Terminated", 0 )= 1 )
		L +$$$jobpool("tasks")
		Set Locked= 1 				
		#; если очередь задач не превышена
		if ($$$jobpool("TaskCount")<$$$GetPoolInfo("MaxTaskCount", 0 )) {			
			#; ^task("tasks",TaskNum)=Task
			Set $$$jobpool("tasks", $I($$$jobpool("TaskCountAll")))=Task
			#; количество задач в очереди +
			if $I($$$jobpool("TaskCount")) {}								
			$$$TRACETEXT($$$datetime_"[AddTask]: Задача "_Task_" добавлена!")
			L -$$$jobpool("tasks")
			Set Locked= 0 
			QUIT	
		}
		L -$$$jobpool("tasks")
		Set Locked= 0 
	}
	QUIT  1 
HandleAddTaskErrorEmptyPoolSize
	Set $ZT = ""
	$$$AddLog($$$LogLvlError, "AddTask : Empty Pool Size!")	
	QUIT  0 
HandleAddTaskError
	Set $ZT = ""
	if (Locked= 1 ) { L -$$$jobpool("tasks") }
	$$$AddLog($$$LogLvlError, "AddTask : Error ze="_$ze)
	QUIT  0 
	
#;==================================================================
#;
#; ждать задачи для потока, вернуть ее ID и удалить задачу из очереди
#; 
#;==================================================================
WaitTask(Task)	
	Set $ZT="HandleWaitTaskError"
	New TaskNum,Quit Set (TaskNum,Task)=""
	Set TaskNum=""
	Set Quit= 0 
	For {
		if $$$GetPoolInfo("Terminated", 0 )= 1  Set Quit= 1  Quit
		#; получить ID задачи если очередь не пуста
		Set TaskNum=$O($$$jobpool("tasks",TaskNum), 1 ,Task)
		if TaskNum'="" L +$$$jobpool("tasks",TaskNum): 0  Quit:$T
		h  0 . 001 
	}
	QUIT:Quit ""
	Kill $$$jobpool("tasks",TaskNum)
	If $I($$$jobpool("TaskCount"),- 1 )
	L -$$$jobpool("tasks",TaskNum)
	QUIT Task
HandleWaitTaskError
	Set $ZT=""
	$$$AddLog($$$LogLvlError, "WaitTask : Error $ZE="_$ZE)		
	QUIT ""

#;==================================================================
#;
#; Потоковая функция
#;
#;==================================================================			
ThreadProc(arg1)	
	Set $ZT="HandleThreadProcError"
	New Task Set Task=""			
	$$$OnStartThread	
	$$$InitUserData(arg1)	
	For {				
		Do LOW^%PRIO
		Do WaitTask(.Task) QUIT:Task=""
		Do HIGH^%PRIO
	
		#; установить флаг, что поток занят
		$$$SetPoolThreadInfo($j,"Busy", 1 )
		$$$SetPoolThreadInfo($j,"Task",Task)		
		
		#;^------ обработать задачу ------
		$$$TRACETEXT($$$datetime_"[ThreadProc]: поток "_$j_" принял задачу "_Task)
		Do HandleTask(Task)		
		#;$------ обработать задачу ------
					
		$$$SetPoolThreadInfo($j,"Busy", 0 )
		$$$SetPoolThreadInfo($j,"Task","")	
	}
	$$$FreeUserData	
	$$$OnStopThread	
	QUIT  1 
HandleThreadProcError
	Set $ZT=""
	$$$FreeUserData
	$$$OnStopThread
	$$$AddLog($$$LogLvlError, "ThreadProc : Error $ZE="_$ZE)	
	QUIT  0 
	
#;==================================================================
#;
#; инициализация пула потоков. 
#; ThreadCount - максимальное количество потоков
#; MaxTaskCount - максимальный размер очереди задач
#; ThreadFuncArg - аргумент потоковой функции
#;==================================================================
StartPool(ThreadCount= 0 ,MaxTaskCount= 0 ,ThreadFuncArg="")
	Set $ZT="HandleStartPoolError"
	QUIT:(ThreadCount= 0 )||(MaxTaskCount= 0 )  0 
	#; проверка, возможно пул уже стартовал
	QUIT:($$$GetPoolInfo("Initialized", 0 )= 1 )  0 	
	Do StopPool	
	$$$TRACETEXT($$$datetime_"[StartPool]: Старт пула потоков...")	
	L +$$$jobpool	
	K $$$jobpool
	L -$$$jobpool
	Set $$$jobpool("TaskCount")= 0 
	Set $$$jobpool("TaskCountAll")= 0 
	$$$SetPoolInfo("MaxPoolSize",ThreadCount)
	$$$SetPoolInfo("MaxTaskCount",MaxTaskCount)
	$$$SetPoolInfo("Terminated", 0 )
	$$$SetPoolInfo("StartTime",$$$datetime)			
	For i= 1 : 1 :ThreadCount { Do CreateThread(ThreadFuncArg) }	
	$$$SetPoolInfo("Initialized", 1 )		
	$$$TRACETEXT($$$datetime_"[StartPool]: Пул стартовал!")
	QUIT  1 
HandleStartPoolError
	Set $ZT=""
	$$$SetPoolInfo("Initialized", 0 )
	$$$AddLog($$$LogLvlError, "StartPool : Error $ZE="_$ZE)	
	QUIT  0 
	
		
#; запуск одного потока	
CreateThread(arg)
	QUIT:($$$GetPoolInfo("Terminated", 0 )= 1 )||($$$GetPoolInfo("RunThreadCount", 0 )>=$$$GetPoolInfo("MaxPoolSize", 0 ))
	J ThreadProc(arg)
	QUIT  1 
	
#; остановить все потоки
StopPool
	$$$TRACETEXT($$$datetime_"[StopPool]: Остановка пула...")
	$$$SetPoolInfo("Initialized", 0 )	
	$$$SetPoolInfo("Terminated", 1 )	
	$$$SetPoolInfo("StopTime",$$$datetime)
	h  0 . 2 
	$$$TRACETEXT($$$datetime_"[StopPool]: Пул остановлен! Потоков:"_$$$GetPoolInfo("RunThreadCount", 0 ))	
	QUIT

test
	k ^sobaka,^a,^b
	n i
	d StopPool
	h  1 
	d StartPool( 8 , 30 )
	s start=$ZTIMESTAMP
	#;w "time start with thread:"_$$$datetime,!
	f i= 1 : 1 : 20000  { d AddTask(i) }
	#;w "time end with thread:"_$$$datetime,!
	s stop=$ZTIMESTAMP
	d SubDate^DateFunc(stop,start,.time)
	w "time end with thread:"_$ZT($PIECE(time,",", 2 ), 1 , 3 ),!
	
	s start=$ZTIMESTAMP
	#;w "time start without thread:"_$$$datetime,!
	f i= 1 : 1 : 20000  { d HandleTask(i) }
	#;w "time end without thread:"_$$$datetime,!
	s stop=$ZTIMESTAMP
	d SubDate^DateFunc(stop,start,.time)
	w "time end without thread:"_$ZT($PIECE(time,",", 2 ), 1 , 3 ),!
	h  2 
	d StopPool
	q
...
Рейтинг: 0 / 0
Пул потоков
    #35953314
servit
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
To Leron

Возможно, Вам пригодится параметр JobServers
...
Рейтинг: 0 / 0
Пул потоков
    #35956190
Turk
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
AddTask():
Убираем блокировку на очереди, т.к. здесь мы не работает с существующими заданиями, а только добавляем новое (по уникальному индексу TaskCountAll).
Блокировка на TaskCount нужна только в том случае, если AddTask() может вызываться из различных потоков. При одном потоке (наш пример) блокировку можно закомментировать.
Получаем след. код:
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
#;==================================================================
#;
#; Добавить задачу в очередь
#;
#;==================================================================
AddTask(Task)
	Set $ZT="HandleAddTaskError"
	New (Task)
	#; флаг блокировки TaskCount. на случай ошибки проверить его и разблокировать TaskCount, если флаг= 1 
	//Set Locked= 0 
	#; флаг выхода из цикла
	Set DoStop= 0 
	$$$TRACETEXT($$$datetime_"[AddTask]: Добавление задачи "_Task_" ...")
	G:($$$GetPoolInfo("RunThreadCount", 0 )= 0 ) HandleAddTaskErrorEmptyPoolSize
	while (DoStop= 0 ) {
		QUIT:($$$GetPoolInfo("Terminated", 0 )= 1 )
		#; блокировка TaskCount
		//L +$$$jobpool("TaskCount")
		//Set Locked= 1 
		#; если очередь задач не превышена
		if ($$$jobpool("TaskCount")<$$$GetPoolInfo("MaxTaskCount", 0 )) {
			#; ^task("tasks",TaskNum)=Task
			Set $$$jobpool("tasks", $I($$$jobpool("TaskCountAll")))=Task
			#; количество задач в очереди +
			if $I($$$jobpool("TaskCount")) {}	
			$$$TRACETEXT($$$datetime_"[AddTask]: Задача "_Task_" добавлена!")
			Set DoStop= 1 
		}
		#; разблокировка TaskCount
		//L -$$$jobpool("TaskCount")
		//Set Locked= 0 
		h  0 . 001 
	}
	QUIT  1 
HandleAddTaskErrorEmptyPoolSize
	Set $ZT = ""
	$$$AddLog($$$LogLvlError, "AddTask : Empty Pool Size!")
	QUIT  0 
HandleAddTaskError
	Set $ZT = ""
	//L:($g(Locked, 0 )= 1 ) -$$$jobpool("TaskCount")
	$$$AddLog($$$LogLvlError, "AddTask : Error ze="_$ze)
	QUIT  0 

WaitTask():
В предложенном варианте с нулевой блокировкой проблема была в том, что одно задание могло попасть на обработку нескольким потокам, т.к. $order проходили все и после блокировки текущий элемент очереди не проверялся на существование. (Например, 2 потока прошли $order, потом 1й поток заблокировал элемент, удалил его из очереди и разблокировал его. Потом 2й поток успешно повторяет действия 1го, хотя на самом деле этого элемента уже нет в очереди.)
Блокировка на TaskCount здесь не нужна, т.к. декремент ничего плохо не сделает для проверки в AddTask().
Получаем след. код:
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
#;==================================================================
#;
#; ждать задачи для потока, вернуть ее ID и удалить задачу из очереди
#; 
#;==================================================================
WaitTask(Task)
	Set $ZT="HandleWaitTaskError"
	New (Task)
	Set (TaskNum,Task)=""
	Set Quit= 0 
	For {
		if $$$GetPoolInfo("Terminated", 0 )= 1  Set Quit= 1  Quit
		#; получить ID задачи если очередь не пуста
		Set TaskNum=$O($$$jobpool("tasks",TaskNum))
		if TaskNum'=""
		{
			L +$$$jobpool("tasks",TaskNum):0
			if ($T)
			{
				Set Task=$g($$$jobpool("tasks",TaskNum))
				if (Task'="")
				{
					Kill $$$jobpool("tasks",TaskNum)
					If $I($$$jobpool("TaskCount"),- 1 )
				}
				L -$$$jobpool("tasks",TaskNum)
				Quit:(Task'="")
			}
		}
		h  0 . 001 
	}
	QUIT:Quit ""
	QUIT Task
HandleWaitTaskError
	Set $ZT=""
	$$$AddLog($$$LogLvlError, "WaitTask : Error $ZE="_$ZE)
	QUIT ""

Про динамическое изменение приоритета для потоков обработки я уже писал выше. Лучше оставлять все потоки на среднем приоритете, а производительности добиваться за счет оптимизации алгоритма.
...
Рейтинг: 0 / 0
9 сообщений из 9, страница 1 из 1
Форумы / Caché, Ensemble, DeepSee, MiniM, IRIS, GT.M [игнор отключен] [закрыт для гостей] / Пул потоков
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]