powered by simpleCommunicator - 2.0.61     © 2026 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / Java [игнор отключен] [закрыт для гостей] / Любителям поломать голову над многопоточным кодом
25 сообщений из 53, страница 1 из 3
Любителям поломать голову над многопоточным кодом
    #38515241
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Столкнулся с интересной проблемой, нужна помощь:)

Задача с рабочего проекта(веб). Смысл вот в чем - имеется визард, на каждой странице пользователь заполняет поля, жмет Некст, в это время посылается запрос на веб-сервис на их сохранение и после этого рендерится страница. Вызов веб-сервиса долгий, не хочется мучать юзера, поэтому появилась мысль ложить таск по сохранению атрибутов в некое хранилище и выполнять его асинхронно.

Доп требования -

1) Каждый юзер имеет свой уникальный entityNumber
2) Нельзя дать выполняться двум таскам одновременно для одинакового entityNumber(более старый может перетереть более новый, если новый выполнится быстрее)
3) Допустим выполняется таск для entityNumber =1. Пока он выполняется, юзер успел нажать два раза Некст, имеет смысл посылать только последний таск для этого entityNumber(приложение устроено так, что Некст с 4 страницы содержит все атрибуты с 1 по 4 страницы). То есть если выполняется таск для 1 страницы, в это время пришли со 2 и с 3, надо сделать так чтобы 2 вообще не выполнялся, а сразу 3(когда закончится таки 1).
4) Вот тут косяк :) Предположим юзер по ошибке нажал Ф5. Я в контроллере хочу убедиться, что в хранилище нет выполняющихся для entityNumber юзера. Если выполняются - подождать, пока они выполнятся.

Моя идея была такова - Сохранять таски в ArrayBlockingQueue. Запустить поток, который лочит очередь, берет первый таск, если в очереди есть еще таск с данным entityNumber - реджектаем его, если нет - то добавляет в список выполняющихся entityNumber и посылает его в экзекьютор. Если уже выполняется таск с таким entityNumber, то помещаем таск в конец очереди.
Скажу сразу, я знаю что один лок - это потенциальное проседание производительности, но меня устраивает подождать 0.1с, вместо нынешних 15 на веб-сервис:) Все работало неплохо, пока не появилось требование 4) - моя имплементация покрывает 1-3, а 4) никак не пойму как сделать.

Вот мой код -

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
package test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Pool {

	static Pool pool;
 	ArrayBlockingQueue<Task> queue = new ArrayBlockingQueue<Task>(1000);
 	Lock lock = new ReentrantLock();
	volatile boolean stopped;
 	Watcher watcher = null;
	List<Long> runned = new ArrayList<Long>();

	private Pool() {

	}

	public static synchronized Pool getInstance() {
		if (pool == null) {
			pool = new Pool();
			pool.init();
 		}
 		return pool;
 	}
	
 	public void waitForMemberComplete(long entity){
		 for (;;) {
 			try {
				 lock.lock();
				 if (runned.contains(entity)) {
					Thread.sleep(100);
					System.out.println("Member task runned, sleep");
					continue;
 				}
				 boolean needSleep = false;
				 for (Task t : queue) {
 					if (t.getEntityNumber() == entity) {
						System.out.println("Member task in queue, sleep");
						 needSleep = true;
						 break;
					 }
				 }
				 if (needSleep) {
					 Thread.sleep(100);
					 continue;
				 }
				 System.out.println("Member has no pending tasks");
				 return;
			} catch (InterruptedException e) {
				 e.printStackTrace();
			} finally {
 				lock.unlock();
			 }
 		}
		
	 }

 	public void put(Task task) {
		 if (stopped) {
			 return;
		}
		try {
 			lock.lock();
			queue.add(task);
			System.out.println("Task added for " + task.getEntityNumber());
 		} finally {
			lock.unlock();
		}
	}

	 public void destroy() {
 		stopped = true;
		watcher.shutdown();
 	}

	 ArrayBlockingQueue<Task> getQueue() {
		 return queue;
	}

        private void init() {
 		watcher = new Watcher(pool, lock, runned);
		watcher.start();
	}

	boolean isStopped() {
		return stopped;
	 }
	
	public static void main(String[] args) throws InterruptedException {
		Task t = new Task(0);
		Task t2 = new Task(0);
		Task t3 = new Task(0);
		Task t4 = new Task(1);
		Task t5 = new Task(2);
		Pool.getInstance().put(t);
		Thread.sleep(100);
		Pool.getInstance().put(t2);
		Thread.sleep(100);
		Pool.getInstance().put(t3);
		Pool.getInstance().put(t4);
		Pool.getInstance().put(t5);
		Pool.getInstance().waitForMemberComplete(0);
	}
}

class Task implements Runnable {
	long entity;
	public Task (long entity) {
		this.entity = entity;
	}
	public long getEntityNumber() {
		return entity;
	}

	@Override
	public void run() {
		try {
			Thread.sleep(300);
			System.out.println("Task done");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

class Watcher extends Thread {

	Pool pool;
	Lock lock;
	ExecutorService executor;
	List<Long> runned;

	public Watcher(Pool pool, Lock lock, List<Long> runned) {
		this.pool = pool;
		this.lock = lock;
		this.runned = runned;
		executor = new CustomExecutor(runned, lock);
	}

	public void shutdown() {
		List<Runnable> notRunned = executor.shutdownNow();
		// Maybe save in logs?
	}

	public void run() {
		for (;;) {
			try {
				Thread.sleep(20);
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			}
			if (pool.isStopped()) {
				return;
			}
			Task toRun = null;
			try {
				toRun = pool.getQueue().take();
			} catch (InterruptedException e) {
				return;
			}
			try {
				lock.lock();
				boolean hasNewest = false;
				for (Task t : pool.getQueue()) {
					if (toRun.getEntityNumber() == t.getEntityNumber()) {
						System.out.println("Member has newest task " + t.getEntityNumber());
						hasNewest = true;
						break;
					}
				}
				if (hasNewest) {
					continue;
				}
				if (runned.contains(toRun.getEntityNumber())) {
					pool.getQueue().add(toRun);
					System.out.println("Member task running, put in end queue " + toRun.getEntityNumber());
					continue;
				}
				runned.add(toRun.getEntityNumber());
				executor.execute(toRun);
				System.out.println("Task scheduled for member" + toRun.getEntityNumber());
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				lock.unlock();
			}
		}
	}
}

final class CustomExecutor extends ThreadPoolExecutor {

	List<Long> runned;
	Lock lock;

	public CustomExecutor(List<Long> runned, Lock lock) {
		super(5, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
		this.runned = runned;
		this.lock = lock;
	}

	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		super.afterExecute(r, t);
		try {
			lock.lock();
			Task runnable = (Task) r;
			runned.remove(runnable.getEntityNumber());
			System.out.println("Task completed, executor can accept new tasks for " + runnable.getEntityNumber());
		} finally {
			lock.unlock();
		}
	}
}



Собственно проблема понятна, она вот тут - toRun = pool.getQueue().take();
То есть может случиться так, что не выполняется ни одного таска с энтити, и его нет в очереди, но его мы только что вытащили из очереди, поэтому waitForMemberComplete() работает неправильно. Ложить его под Лок тоже нельзя по понятным причинам. Что посоветуете? Можно и абсолютно другие варианты, если знаете что может помочь. Ну и попутно гляньте, может есть еще ошибки, замечания?
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515271
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник,
Долгий не веб сервис а рендеринг. Поэтому задача imho надумана. AJAX быстрый.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515306
cdtyjv
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник ,
Так, ну код плохой от и до. Выкинуть его, и переписать с чистого листа.

Опишите свою задачу в более общем виде, без лишних деталей вроде HTTP, entityId, и прочего ненужного. Оно должно выглядеть примерно так:
1) Есть N клиентов, каждый из которых имеет уникальный идентификатор.
2) Существует M типов задач, который может стартовать каждый клиент. Эти задачи упорядочены. То есть, если взять две задачи M1 и M2, то между ними можно однозначно поставить знаки <, > или =.
3) Клиент может одновременно стартовать несколько задач.
4) ...

Тогда станет более понятно, в чем заключается ваша задача, и мы сможем вам помочь.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515327
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
cdtyjv,
Плюс учесть, что визард сам по себе Синхронный процесс.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515415
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123забыл ник,
Долгий не веб сервис а рендеринг. Поэтому задача imho надумана. AJAX быстрый.
cdtyjv,
Плюс учесть, что визард сам по себе Синхронный процесс.


Спасибо Петро, хороший ты человек, но иногда пишешь ну совсем не в ту степь. Как у меня работают вебсервисы, и как работает ajax я уж разобрался) И задача не надуманна, она есть такая как есть.
cdtyjvзабыл ник,
Так, ну код плохой от и до. Выкинуть его, и переписать с чистого листа.

Ну еще бы, его же написал не свеном:) Ладно, оставим пассажи. Ужасный код - это код, который не выполняет задачу или выглядит непонятным. Мне не важен каждый выжанный такт процессора, мне нужна всего лишь корректность, потому решение с одним локом на все - меня устраивает, да хоть на чистых синхронайзед. Слипы тоже расставлены в основном чтобы тестить.
Если не сложно, разверните свою мысль чем ужасен.
cdtyjvОпишите свою задачу в более общем виде, без лишних деталей вроде HTTP, entityId, и прочего ненужного.

Ну ок, я старался так и делать, дело в том что она нетривиальна именно в подробностях, и лично мне проще иметь конкретные требования, чем абстрактные.
Если бы я мог сформулировать формальные требования, я бы уж структуру нагуглил. Но, хорошо, постараюсь объяснить задачу еще раз, моя вина, что много деталей наверное.

Итак,

1) Есть N клиентов, каждый из которых имеет уникальный идентификатор.
2) Тип тасков один и тот же, отличается только атрибутами. Таск выполняется долго(от 5 до 20 с)
3) Если выполняется задача для какого-то клиента, нельзя позволить старт таска с таким же уникальным идентификатором. Таким образом в одно и тоже время может выполняться один таск с данным айди, но сколько угодно, если айди разные.
4) Пока выполняется таск, могут придти несколько тасков с данным айди. Когда таск отработает, нужно взять самый последний таск(по времени) для этгого айди - остальные скипнуть.
5) Нужна возможность сделать блокирующий вызов, который вернется только если нету выполняющихся тасков с данным айди, и нет тасков в очереди ожидания с данным айди. Дополнительно, если вызван этот блокирующий метод, то пока он не закончится, новые таски с данным айди гарантированно не попадут в очередь(это обеспечивается в данный момент, детали не важны).
6) Таски могут приходить как им вздумается, но для данного айди они упорядочены во времени.
Ну как-то так, или опять непонятно?
cdtyjvТогда станет более понятно, в чем заключается ваша задача, и мы сможем вам помочь.

Собственно для этого и писал)
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515437
cdtyjv
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник ,
Ок, простейший блокирующий вариант. Отсутствие багов не гарантированно, код призван продемонстрировать идею.
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
public class TaskBlocking {
    /** Монитор. */
    private static final Object mux = new Object();
    
    /** Задачи. */
    private static final Map<Integer, TaskRunnable> tasks = new HashMap<>();

    /**
     * Добавить задачу.
     * 
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(Integer id, Task task) {
        boolean isNew = false;
        
        TaskRunnable taskRun;
        
        synchronized (mux) {
            taskRun = tasks.get(id);
            
            if (taskRun != null)
                taskRun.addTask(task);
            else {
                taskRun = new TaskRunnable(id, task);
                
                isNew = true;
            }
        }
        
        if (isNew)
            submit(taskRun);
    }

    /**
     * Подождать окончания выполнения задач для данного идентификатора.
     * 
     * @param id Идентификатор.
     * @throws InterruptedException В случае прерывания.
     */
    public static void awaitCompletion(Integer id) throws InterruptedException {
        TaskRunnable taskRun;
        
        synchronized (mux) {
            taskRun = tasks.get(id);
            
            if (taskRun == null)
                return; // Нет активных задач.
        }
        
        taskRun.awaitCompletion();
    }

    /**
     * Отдать TaskRunnable в Executor. 
     * 
     * @param taskRun TaskRunnable. 
     */
    private static void submit(TaskRunnable taskRun) {
        /*
            Тут кладете Runnable в Executor. Делаем это вне synchronized, так как в зависимости от конфигурации пула,
            задача может начать выполнение в этом же потоке. 
            Как быть с RejectedExecutionException, решайте сами.
        */
    }
    
    /**
     * Класс, выполняющий задачи.
     */
    private static class TaskRunnable implements Runnable {
        /** Идентификатор. */
        private final int id;

        /** Latch, означающий окончание выполнение задач для текушего id. */
        private final CountDownLatch completeLatch = new CountDownLatch(1);
        
        /** Текущая задача. */
        private Task task;

        /**
         * Конструктор.
         * 
         * @param id Идентификатор.
         * @param task Задача.
         */
        public TaskRunnable(int id, Task task) {
            assert Thread.holdsLock(mux); // Должны быть внутри synchronized для видимости.
            
            this.id = id;
            this.task = task;
        }

        /**
         * Добавить следующую задачу.
         * 
         * @param task Следующая задача.
         */
        public void addTask(Task task) {
            synchronized (mux) {
                this.task = task;
            }
        }
        
        /** {@inheritDoc} */
        @Override 
        public void run() {
            while (true) {
                Task task0;
                
                synchronized (mux) {
                    task0 = task;
                    
                    if (task0 == null) {
                        // Все, больше нет задач в очереди.
                        tasks.remove(id);
                        
                        completeLatch.countDown(); 
                        
                        return;
                    }
                    else
                        task = null; // Занулим следующую задачу.
                }
                
                try {
                    task0.run(); // Выполнить задачу.
                }
                catch (Exception e) {
                    // Обработка на ваше усмотрение.
                }
            }
        }

        /**
         * Подождать окончания выполнения задачи.
         *
         * @throws InterruptedException В случае прерывания.
         */
        public void awaitCompletion() throws InterruptedException {
            completeLatch.await();
        }
    }

    /**
     * Ваша задача.
     */
    private static class Task implements Runnable {
        /** {@inheritDoc} */
        @Override 
        public void run() {
            // Логика задачи.
        }
    }
}

...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515440
Лагман
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
У меня на слове "ложить" произошел эксепшн.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515441
Alex Kuznetsov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник,

Есть предложение:
1. организовать один поток на одного клиента, который будет обрабатывать очередь хранящуюся в потоке,
2. поток обернуть в отдельный класс, который реализует механизм добавления задачи в очередь ( с отслеживанием типа и времени старта задачи) ,
3. для обеспечения уникальности очереди для клиента - запретить открытие второй сессии с одним и тем-же идентификатором клиента
4. контроллер в данном случае тупо отсылает идентификатор и параметры задачи в класс обёртку потока

Если нет желания организовывать для каждого клиента поток, то можно сделать один класс, который организует добавление задачи в список, который содержит в себе очереди для каждого клиента. Также организовать пул потоков, которые выбирают и обрабатывают задачи из очередей.

Как-то так, при первом приближении.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515445
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Alex Kuznetsov забыл ник,

Есть предложение:
1. организовать один поток на одного клиента, который будет обрабатывать очередь хранящуюся в потоке,
2. поток обернуть в отдельный класс, который реализует механизм добавления задачи в очередь ( с отслеживанием типа и времени старта задачи) ,
3. для обеспечения уникальности очереди для клиента - запретить открытие второй сессии с одним и тем-же идентификатором клиента
4. контроллер в данном случае тупо отсылает идентификатор и параметры задачи в класс обёртку потока


Думал об этом, но поток на каждого клиента - слишком жирно, этого как раз и хочу избежать, именно поэтому в решении будет экзекьютор, пока еще не знаю как, но потоки плодить точно не хочу.

Alex KuznetsovЕсли нет желания организовывать для каждого клиента поток, то можно сделать один класс, который организует добавление задачи в список, который содержит в себе очереди для каждого клиента. Также организовать пул потоков, которые выбирают и обрабатывают задачи из очередей.

Как-то так, при первом приближении


Да, с этого примерно и начинал, но слишком громоздко, очень большое пространство состояний и трудно написать корректно. Собственно вот и пришел к моему варианту, сейчас думаю допиливать его или все-таки попробовать такой вариант с нуля опять.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515447
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
ЛагманУ меня на слове "ложить" произошел эксепшн.

Пфф, try нельзя опускать...
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515449
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
cdtyjv забыл ник ,
Ок, простейший блокирующий вариант. Отсутствие багов не гарантированно, код призван продемонстрировать идею.


Сейчас смотрю, на первый взгляд не совсем то что мне нужно, сейчас попробую детально разобраться. Вроде как в вашем варианте могут выполняться два таска с одним айди, в любом случае спасибо за то что уделили время. Позже отпишусь
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515458
Alex Kuznetsov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник... слишком громоздко, очень большое пространство состояний и трудно написать корректно... Вот здесь не понял... что сложно? добавить задачу в очередь? сделать пул потоков, которые будут выбирать задачи для исполнения? и к тому-же что означает фраза "очень большое пространство состояний" ? Чем определяется что оно большое? Сколько состояний? и состояний чего?

В общем как всегда, глаза боятся, а руки делают.

Тут вот ещё одна идея родилась: не плодить поток на каждого клиента, а сделать очередь для каждого клиента и пул потоков, которые выбирают задачи из очередей. просто помимо очереди для каждого клиента, должна быть ещё одна очередь, содержащая ссылки на задачи из очередей клиентов, ну и собственно тогда потоки выполняют задачи и после выполнения удаляют их из обеих очередей. Идея в том, что класс, который ведёт очередь задач клиентов сам следит за её заполнением и в случае успешного добавления задачи в очередь - просто дополняет ссылку на задачу в глобальную очередь. Ну и понятно, что поток из пула обращается к глобальной очереди за задачей и обрабатывает её...
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515470
cdtyjv
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никcdtyjv забыл ник ,
Ок, простейший блокирующий вариант. Отсутствие багов не гарантированно, код призван продемонстрировать идею.


Сейчас смотрю, на первый взгляд не совсем то что мне нужно, сейчас попробую детально разобраться. Вроде как в вашем варианте могут выполняться два таска с одним айди, в любом случае спасибо за то что уделили время. Позже отпишусьЭто бага. Сейчас:
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
    /**
     * Добавить задачу.
     *
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(Integer id, Task task) {
        boolean isNew = false;

        TaskRunnable taskRun;

        synchronized (mux) {
            taskRun = tasks.get(id);

            if (taskRun != null)
                taskRun.addTask(task);
            else {
                taskRun = new TaskRunnable(id, task);

                isNew = true;
            }
        }

        if (isNew)
            submit(taskRun);
    }

Должно быть:
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
    /**
     * Добавить задачу.
     *
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(Integer id, Task task) {
        boolean isNew = false;

        TaskRunnable taskRun;

        synchronized (mux) {
            taskRun = tasks.get(id);

            if (taskRun != null)
                taskRun.addTask(task);
            else {
                taskRun = new TaskRunnable(id, task);

                TaskRunnable oldTaskRun = tasks.put(id, taskRun);
                
                assert oldTaskRun == null;
                
                isNew = true;
            }
        }

        if (isNew)
            submit(taskRun);
    }

...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515474
cdtyjv
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Alex KuznetsovТут вот ещё одна идея родилась: не плодить поток на каждого клиента, а сделать очередь для каждого клиента и пул потоков, которые выбирают задачи из очередей. просто помимо очереди для каждого клиента, должна быть ещё одна очередь, содержащая ссылки на задачи из очередей клиентов, ну и собственно тогда потоки выполняют задачи и после выполнения удаляют их из обеих очередей. Идея в том, что класс, который ведёт очередь задач клиентов сам следит за её заполнением и в случае успешного добавления задачи в очередь - просто дополняет ссылку на задачу в глобальную очередь. Ну и понятно, что поток из пула обращается к глобальной очереди за задачей и обрабатывает её...См. мой код выше. Не надо никаких очередей. Одной мапки "id -> Runnable" достаточно.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515518
Alexey Tomin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никСтолкнулся с интересной проблемой, нужна помощь:)


Не могу предложить решение, но предлагаю разделить задачу на две чести.
1. Класс "выполни шаг хх для пользователя уу".
2. Собственно интерфейс пользователя.

Мне кажется, что http плохо подходит для длинных задач. Тут ajax нужен. Он поможет сделать п.2 так, чтобы пользователь не путался. Ну или выдавать страницы "ждите ответа". Но ajax красивее.

А если описать задачи 1 и 2 как независимые, с чётким контрактом на каждую операцию (а взаимодействие между ними- нечто вроде модного REST)- то решение, как мне кажется, придёт само.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515521
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
cdtyjvЭто бага.

Все равно есть проблема. Смотрите, в вашем варианте пришел таск с айди=1 например, стал выполняться (тупо спит 20 сек), через 2с приходит новый таск с айди =1, в вашем коде он реджектнется. А мне нужно чтобы он выполнился сразу после того как выполнится первый. Если пока выполняется первый пришли еще таски с айди =1, то должен выполниться только последний.
Я говорю, задача далеко не такая тривиальная как кажется. Вот Я буквально добавил пару сообщений логирования и метод мэйн -
Просто запустите

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TaskBlocking {
    /** Монитор. */
    private static final Object mux = new Object();
    
    /** Задачи. */
    private static final Map<Integer, TaskRunnable> tasks = new HashMap<Integer, TaskRunnable>();
    
    static ExecutorService e = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
    	Task t = new Task();
    	Task t2 = new Task();
    	addTask(1, t);
    	addTask(1, t2);
    }
    
    /**
     * Добавить задачу.
     *
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(Integer id, Task task) {
        boolean isNew = false;

        TaskRunnable taskRun;
        System.out.println("Trying add task");
        synchronized (mux) {
        	System.out.println("Get lock");
            taskRun = tasks.get(id);

            if (taskRun != null) {
            	System.out.println("Task already exist");
                taskRun.addTask(task);
            }
            else {
            	System.out.println("Put new task");
                taskRun = new TaskRunnable(id, task);

                TaskRunnable oldTaskRun = tasks.put(id, taskRun);
                
                assert oldTaskRun == null;
                
                isNew = true;
            }
        }
        System.out.println("Before end");
        if (isNew){
        	submit(taskRun);
        	System.out.println("Task submitted");
        }
        System.out.println("Add finished");
    }

    /**
     * Подождать окончания выполнения задач для данного идентификатора.
     * 
     * @param id Идентификатор.
     * @throws InterruptedException В случае прерывания.
     */
    public static void awaitCompletion(Integer id) throws InterruptedException {
        TaskRunnable taskRun;
        
        synchronized (mux) {
            taskRun = tasks.get(id);
            
            if (taskRun == null)
                return; // Нет активных задач.
        }
        
        taskRun.awaitCompletion();
    }

    /**
     * Отдать TaskRunnable в Executor. 
     * 
     * @param taskRun TaskRunnable. 
     */
    private static void submit(TaskRunnable taskRun) {
    	e.submit(taskRun);
        /*
            Тут кладете Runnable в Executor. Делаем это вне synchronized, так как в зависимости от конфигурации пула,
            задача может начать выполнение в этом же потоке. 
            Как быть с RejectedExecutionException, решайте сами.
        */
    }
    
    /**
     * Класс, выполняющий задачи.
     */
    private static class TaskRunnable implements Runnable {
        /** Идентификатор. */
        private final int id;

        /** Latch, означающий окончание выполнение задач для текушего id. */
        private final CountDownLatch completeLatch = new CountDownLatch(1);
        
        /** Текущая задача. */
        private Task task;

        /**
         * Конструктор.
         * 
         * @param id Идентификатор.
         * @param task Задача.
         */
        public TaskRunnable(int id, Task task) {
            assert Thread.holdsLock(mux); // Должны быть внутри synchronized для видимости.
            
            this.id = id;
            this.task = task;
        }

        /**
         * Добавить следующую задачу.
         * 
         * @param task Следующая задача.
         */
        public void addTask(Task task) {
            synchronized (mux) {
                this.task = task;
            }
        }
        
        /** {@inheritDoc} */
        @Override 
        public void run() {
            while (true) {
                Task task0;
                
                synchronized (mux) {
                    task0 = task;
                    
                    if (task0 == null) {
                        // Все, больше нет задач в очереди.
                        tasks.remove(id);
                        
                        completeLatch.countDown(); 
                        
                        return;
                    }
                    else
                        task = null; // Занулим следующую задачу.
                }
                
                try {
                    task0.run(); // Выполнить задачу.
                }
                catch (Exception e) {
                    // Обработка на ваше усмотрение.
                }
            }
        }

        /**
         * Подождать окончания выполнения задачи.
         *
         * @throws InterruptedException В случае прерывания.
         */
        public void awaitCompletion() throws InterruptedException {
            completeLatch.await();
        }
        

    }

    /**
     * Ваша задача.
     */
    private static class Task implements Runnable {
        /** {@inheritDoc} */
        @Override 
        public void run() {
            System.out.println("Task started");
            try {
				Thread.sleep(5000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
            System.out.println("Task stopped");
        }
    }
}


...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515523
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никЗадача с рабочего проекта(веб). Смысл вот в чем - имеется визард, на каждой странице пользователь заполняет поля, жмет Некст, в это время посылается запрос на веб-сервис на их сохранение и после этого рендерится страница. Вызов веб-сервиса долгий, не хочется мучать юзера, поэтому появилась мысль ложить таск по сохранению атрибутов в некое хранилище и выполнять его асинхронно.
и чем данная задача отличается от классики - "Набор товаров в корзину"? И потом, один Run - купить?
Если это классический визард, то почему таск идёт 20сек....и неужели не важно на 3-ем шаге визарда, куда повернул пользователь во втором шаге? (набрасываем задачи в потоке)

Alexey Tomin
он тут на веб сервис ссылается. Который писал не он, и он долгий...20сек.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515525
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Alex Kuznetsovзабыл ник... слишком громоздко, очень большое пространство состояний и трудно написать корректно... Вот здесь не понял... что сложно? добавить задачу в очередь? сделать пул потоков, которые будут выбирать задачи для исполнения? и к тому-же что означает фраза "очень большое пространство состояний" ? Чем определяется что оно большое? Сколько состояний? и состояний чего?

В общем как всегда, глаза боятся, а руки делают.

Тут вот ещё одна идея родилась: не плодить поток на каждого клиента, а сделать очередь для каждого клиента и пул потоков, которые выбирают задачи из очередей. просто помимо очереди для каждого клиента, должна быть ещё одна очередь, содержащая ссылки на задачи из очередей клиентов, ну и собственно тогда потоки выполняют задачи и после выполнения удаляют их из обеих очередей. Идея в том, что класс, который ведёт очередь задач клиентов сам следит за её заполнением и в случае успешного добавления задачи в очередь - просто дополняет ссылку на задачу в глобальную очередь. Ну и понятно, что поток из пула обращается к глобальной очереди за задачей и обрабатывает её...

Вот вы мыслите почти как я вначале. Я тоже думал об очереди для каждого айди и многих читателях. Потом я подумал - а нафиг мне много читателей, если можно создать один поток, так называемый распределятор, тогда и несколько очередей не нужно - распределятор просто получает лок, пробегает по очереди - выкидывает лишние таски, если таск уже выполняется с таким айди - кладет его обратно в хвост, и тд. Вот собственно мы и пришли к моему варианту.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515526
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никЯ говорю, задача далеко не такая тривиальная как кажется.
ты против одного потока на юзверя. Как тогда работает контейнер сервлетов при потоке на вызов сервлета (хотя и с настройками).
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515529
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Alexey Tominзабыл никСтолкнулся с интересной проблемой, нужна помощь:)


Не могу предложить решение, но предлагаю разделить задачу на две чести.
1. Класс "выполни шаг хх для пользователя уу".
2. Собственно интерфейс пользователя.

Мне кажется, что http плохо подходит для длинных задач. Тут ajax нужен. Он поможет сделать п.2 так, чтобы пользователь не путался. Ну или выдавать страницы "ждите ответа". Но ajax красивее.

А если описать задачи 1 и 2 как независимые, с чётким контрактом на каждую операцию (а взаимодействие между ними- нечто вроде модного REST)- то решение, как мне кажется, придёт само.

Не-не-не, вы все не так поняли. Поясняю.
Есть визард, юзер заполнил страницу, нажал некст - по бизнес-логике мне надо заслать данные в веб-сервис, но он калечный и время выполнения 5-20 сек, которые юзер вынужден ждать, прежде чем ему покажется новый скрин(оно и так на аяксе работает).
Так как страница 2 содержит в скрытых полях и данные с первой страницы, и так далее - то пришла идея - а зачем мучать пользователя, то есть мы ацептаем данные, ложим их в какую-то очередь, рендерим следующую страницу. Пока пользователь заполняет следующую страницу, таск выполняется асинхронно.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515530
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123забыл никЗадача с рабочего проекта(веб). Смысл вот в чем - имеется визард, на каждой странице пользователь заполняет поля, жмет Некст, в это время посылается запрос на веб-сервис на их сохранение и после этого рендерится страница. Вызов веб-сервиса долгий, не хочется мучать юзера, поэтому появилась мысль ложить таск по сохранению атрибутов в некое хранилище и выполнять его асинхронно.
и чем данная задача отличается от классики - "Набор товаров в корзину"? И потом, один Run - купить?
Если это классический визард, то почему таск идёт 20сек....и неужели не важно на 3-ем шаге визарда, куда повернул пользователь во втором шаге? (набрасываем задачи в потоке)

Alexey Tomin
он тут на веб сервис ссылается. Который писал не он, и он долгий...20сек.

Все правильно, расписал в ответе Алексею
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515531
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123забыл никЯ говорю, задача далеко не такая тривиальная как кажется.
ты против одного потока на юзверя. Как тогда работает контейнер сервлетов при потоке на вызов сервлета (хотя и с настройками).
Все правильно, я против потока на каждый нттр запрос + такое же количество на мой экзекьютор, логично же?
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515534
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник,
теперь понятно...почти)))
Alex Kuznetsov говорил про Состояния....их много...т.е. Таски влияют друг на друга?
Тогда нужно ждать каждый таск для след.таска.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515535
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никПока пользователь заполняет следующую страницу, таск выполняется асинхронно.
а если первый таск - "Удалить и выйти"? ))
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515542
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123забыл ник,
теперь понятно...почти)))
Alex Kuznetsov говорил про Состояния....их много...т.е. Таски влияют друг на друга?
Тогда нужно ждать каждый таск для след.таска.

неа, ждать не надо, новый по времени таск, содержит в себе данные всех предыдущих. Выполняется таск для 1 страницы, пока он выполняется пришли таски со 2 и с 3. 3 содержит в себе 2, смысла выполнять 2 нет. сейчас распишу остальные требования

1) Есть N клиентов, каждый из которых имеет уникальный идентификатор.
Ну тут понятно, визард могут проходить несколько людей одновременно
2) Тип тасков один и тот же, отличается только атрибутами. Таск выполняется долго(от 5 до 20 с)
Все таски суть - возьми инпуты со страницы и пошли на веб-сервис, выполняется иногда быстро иногда весьма долго
3) Если выполняется задача для какого-то клиента, нельзя позволить старт таска с таким же уникальным идентификатором. Таким образом в одно и тоже время может выполняться один таск с данным айди, но сколько угодно, если айди разные.
Если пришел новый таск в идеале я бы должен кансельнуть выполняюшийся, но это нереализуемо. Нужно ждать потому что если сразу выполнить новый таск, он может выполниться быстрее первого, и по итогу первый таск перепишет последний
4) Пока выполняется таск, могут придти несколько тасков с данным айди. Когда таск отработает, нужно взять самый последний таск(по времени) для этгого айди - остальные скипнуть.
это уже обьяснял, нет смысла выполнять сначала 2 страницу потом третью, если можно сразу третью
5) Нужна возможность сделать блокирующий вызов, который вернется только если нету выполняющихся тасков с данным айди, и нет тасков в очереди ожидания с данным айди. Дополнительно, если вызван этот блокирующий метод, то пока он не закончится, новые таски с данным айди гарантированно не попадут в очередь(это обеспечивается в данный момент, детали не важны).
Самое интересное - юзер нажал f5 - по бизнес логике прежде чем рендерить страницу я вынужден ждать выполнения всех предыдущих тасков
...
Рейтинг: 0 / 0
25 сообщений из 53, страница 1 из 3
Форумы / Java [игнор отключен] [закрыт для гостей] / Любителям поломать голову над многопоточным кодом
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]