powered by simpleCommunicator - 2.0.61     © 2026 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / Java [игнор отключен] [закрыт для гостей] / Любителям поломать голову над многопоточным кодом
53 сообщений из 53, показаны все 3 страниц
Любителям поломать голову над многопоточным кодом
    #38515241
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Столкнулся с интересной проблемой, нужна помощь:)

Задача с рабочего проекта(веб). Смысл вот в чем - имеется визард, на каждой странице пользователь заполняет поля, жмет Некст, в это время посылается запрос на веб-сервис на их сохранение и после этого рендерится страница. Вызов веб-сервиса долгий, не хочется мучать юзера, поэтому появилась мысль ложить таск по сохранению атрибутов в некое хранилище и выполнять его асинхронно.

Доп требования -

1) Каждый юзер имеет свой уникальный entityNumber
2) Нельзя дать выполняться двум таскам одновременно для одинакового entityNumber(более старый может перетереть более новый, если новый выполнится быстрее)
3) Допустим выполняется таск для entityNumber =1. Пока он выполняется, юзер успел нажать два раза Некст, имеет смысл посылать только последний таск для этого entityNumber(приложение устроено так, что Некст с 4 страницы содержит все атрибуты с 1 по 4 страницы). То есть если выполняется таск для 1 страницы, в это время пришли со 2 и с 3, надо сделать так чтобы 2 вообще не выполнялся, а сразу 3(когда закончится таки 1).
4) Вот тут косяк :) Предположим юзер по ошибке нажал Ф5. Я в контроллере хочу убедиться, что в хранилище нет выполняющихся для entityNumber юзера. Если выполняются - подождать, пока они выполнятся.

Моя идея была такова - Сохранять таски в ArrayBlockingQueue. Запустить поток, который лочит очередь, берет первый таск, если в очереди есть еще таск с данным entityNumber - реджектаем его, если нет - то добавляет в список выполняющихся entityNumber и посылает его в экзекьютор. Если уже выполняется таск с таким entityNumber, то помещаем таск в конец очереди.
Скажу сразу, я знаю что один лок - это потенциальное проседание производительности, но меня устраивает подождать 0.1с, вместо нынешних 15 на веб-сервис:) Все работало неплохо, пока не появилось требование 4) - моя имплементация покрывает 1-3, а 4) никак не пойму как сделать.

Вот мой код -

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
package test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Pool {

	static Pool pool;
 	ArrayBlockingQueue<Task> queue = new ArrayBlockingQueue<Task>(1000);
 	Lock lock = new ReentrantLock();
	volatile boolean stopped;
 	Watcher watcher = null;
	List<Long> runned = new ArrayList<Long>();

	private Pool() {

	}

	public static synchronized Pool getInstance() {
		if (pool == null) {
			pool = new Pool();
			pool.init();
 		}
 		return pool;
 	}
	
 	public void waitForMemberComplete(long entity){
		 for (;;) {
 			try {
				 lock.lock();
				 if (runned.contains(entity)) {
					Thread.sleep(100);
					System.out.println("Member task runned, sleep");
					continue;
 				}
				 boolean needSleep = false;
				 for (Task t : queue) {
 					if (t.getEntityNumber() == entity) {
						System.out.println("Member task in queue, sleep");
						 needSleep = true;
						 break;
					 }
				 }
				 if (needSleep) {
					 Thread.sleep(100);
					 continue;
				 }
				 System.out.println("Member has no pending tasks");
				 return;
			} catch (InterruptedException e) {
				 e.printStackTrace();
			} finally {
 				lock.unlock();
			 }
 		}
		
	 }

 	public void put(Task task) {
		 if (stopped) {
			 return;
		}
		try {
 			lock.lock();
			queue.add(task);
			System.out.println("Task added for " + task.getEntityNumber());
 		} finally {
			lock.unlock();
		}
	}

	 public void destroy() {
 		stopped = true;
		watcher.shutdown();
 	}

	 ArrayBlockingQueue<Task> getQueue() {
		 return queue;
	}

        private void init() {
 		watcher = new Watcher(pool, lock, runned);
		watcher.start();
	}

	boolean isStopped() {
		return stopped;
	 }
	
	public static void main(String[] args) throws InterruptedException {
		Task t = new Task(0);
		Task t2 = new Task(0);
		Task t3 = new Task(0);
		Task t4 = new Task(1);
		Task t5 = new Task(2);
		Pool.getInstance().put(t);
		Thread.sleep(100);
		Pool.getInstance().put(t2);
		Thread.sleep(100);
		Pool.getInstance().put(t3);
		Pool.getInstance().put(t4);
		Pool.getInstance().put(t5);
		Pool.getInstance().waitForMemberComplete(0);
	}
}

class Task implements Runnable {
	long entity;
	public Task (long entity) {
		this.entity = entity;
	}
	public long getEntityNumber() {
		return entity;
	}

	@Override
	public void run() {
		try {
			Thread.sleep(300);
			System.out.println("Task done");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

class Watcher extends Thread {

	Pool pool;
	Lock lock;
	ExecutorService executor;
	List<Long> runned;

	public Watcher(Pool pool, Lock lock, List<Long> runned) {
		this.pool = pool;
		this.lock = lock;
		this.runned = runned;
		executor = new CustomExecutor(runned, lock);
	}

	public void shutdown() {
		List<Runnable> notRunned = executor.shutdownNow();
		// Maybe save in logs?
	}

	public void run() {
		for (;;) {
			try {
				Thread.sleep(20);
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			}
			if (pool.isStopped()) {
				return;
			}
			Task toRun = null;
			try {
				toRun = pool.getQueue().take();
			} catch (InterruptedException e) {
				return;
			}
			try {
				lock.lock();
				boolean hasNewest = false;
				for (Task t : pool.getQueue()) {
					if (toRun.getEntityNumber() == t.getEntityNumber()) {
						System.out.println("Member has newest task " + t.getEntityNumber());
						hasNewest = true;
						break;
					}
				}
				if (hasNewest) {
					continue;
				}
				if (runned.contains(toRun.getEntityNumber())) {
					pool.getQueue().add(toRun);
					System.out.println("Member task running, put in end queue " + toRun.getEntityNumber());
					continue;
				}
				runned.add(toRun.getEntityNumber());
				executor.execute(toRun);
				System.out.println("Task scheduled for member" + toRun.getEntityNumber());
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				lock.unlock();
			}
		}
	}
}

final class CustomExecutor extends ThreadPoolExecutor {

	List<Long> runned;
	Lock lock;

	public CustomExecutor(List<Long> runned, Lock lock) {
		super(5, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
		this.runned = runned;
		this.lock = lock;
	}

	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		super.afterExecute(r, t);
		try {
			lock.lock();
			Task runnable = (Task) r;
			runned.remove(runnable.getEntityNumber());
			System.out.println("Task completed, executor can accept new tasks for " + runnable.getEntityNumber());
		} finally {
			lock.unlock();
		}
	}
}



Собственно проблема понятна, она вот тут - toRun = pool.getQueue().take();
То есть может случиться так, что не выполняется ни одного таска с энтити, и его нет в очереди, но его мы только что вытащили из очереди, поэтому waitForMemberComplete() работает неправильно. Ложить его под Лок тоже нельзя по понятным причинам. Что посоветуете? Можно и абсолютно другие варианты, если знаете что может помочь. Ну и попутно гляньте, может есть еще ошибки, замечания?
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515271
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник,
Долгий не веб сервис а рендеринг. Поэтому задача imho надумана. AJAX быстрый.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515306
cdtyjv
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник ,
Так, ну код плохой от и до. Выкинуть его, и переписать с чистого листа.

Опишите свою задачу в более общем виде, без лишних деталей вроде HTTP, entityId, и прочего ненужного. Оно должно выглядеть примерно так:
1) Есть N клиентов, каждый из которых имеет уникальный идентификатор.
2) Существует M типов задач, который может стартовать каждый клиент. Эти задачи упорядочены. То есть, если взять две задачи M1 и M2, то между ними можно однозначно поставить знаки <, > или =.
3) Клиент может одновременно стартовать несколько задач.
4) ...

Тогда станет более понятно, в чем заключается ваша задача, и мы сможем вам помочь.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515327
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
cdtyjv,
Плюс учесть, что визард сам по себе Синхронный процесс.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515415
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123забыл ник,
Долгий не веб сервис а рендеринг. Поэтому задача imho надумана. AJAX быстрый.
cdtyjv,
Плюс учесть, что визард сам по себе Синхронный процесс.


Спасибо Петро, хороший ты человек, но иногда пишешь ну совсем не в ту степь. Как у меня работают вебсервисы, и как работает ajax я уж разобрался) И задача не надуманна, она есть такая как есть.
cdtyjvзабыл ник,
Так, ну код плохой от и до. Выкинуть его, и переписать с чистого листа.

Ну еще бы, его же написал не свеном:) Ладно, оставим пассажи. Ужасный код - это код, который не выполняет задачу или выглядит непонятным. Мне не важен каждый выжанный такт процессора, мне нужна всего лишь корректность, потому решение с одним локом на все - меня устраивает, да хоть на чистых синхронайзед. Слипы тоже расставлены в основном чтобы тестить.
Если не сложно, разверните свою мысль чем ужасен.
cdtyjvОпишите свою задачу в более общем виде, без лишних деталей вроде HTTP, entityId, и прочего ненужного.

Ну ок, я старался так и делать, дело в том что она нетривиальна именно в подробностях, и лично мне проще иметь конкретные требования, чем абстрактные.
Если бы я мог сформулировать формальные требования, я бы уж структуру нагуглил. Но, хорошо, постараюсь объяснить задачу еще раз, моя вина, что много деталей наверное.

Итак,

1) Есть N клиентов, каждый из которых имеет уникальный идентификатор.
2) Тип тасков один и тот же, отличается только атрибутами. Таск выполняется долго(от 5 до 20 с)
3) Если выполняется задача для какого-то клиента, нельзя позволить старт таска с таким же уникальным идентификатором. Таким образом в одно и тоже время может выполняться один таск с данным айди, но сколько угодно, если айди разные.
4) Пока выполняется таск, могут придти несколько тасков с данным айди. Когда таск отработает, нужно взять самый последний таск(по времени) для этгого айди - остальные скипнуть.
5) Нужна возможность сделать блокирующий вызов, который вернется только если нету выполняющихся тасков с данным айди, и нет тасков в очереди ожидания с данным айди. Дополнительно, если вызван этот блокирующий метод, то пока он не закончится, новые таски с данным айди гарантированно не попадут в очередь(это обеспечивается в данный момент, детали не важны).
6) Таски могут приходить как им вздумается, но для данного айди они упорядочены во времени.
Ну как-то так, или опять непонятно?
cdtyjvТогда станет более понятно, в чем заключается ваша задача, и мы сможем вам помочь.

Собственно для этого и писал)
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515437
cdtyjv
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник ,
Ок, простейший блокирующий вариант. Отсутствие багов не гарантированно, код призван продемонстрировать идею.
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
public class TaskBlocking {
    /** Монитор. */
    private static final Object mux = new Object();
    
    /** Задачи. */
    private static final Map<Integer, TaskRunnable> tasks = new HashMap<>();

    /**
     * Добавить задачу.
     * 
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(Integer id, Task task) {
        boolean isNew = false;
        
        TaskRunnable taskRun;
        
        synchronized (mux) {
            taskRun = tasks.get(id);
            
            if (taskRun != null)
                taskRun.addTask(task);
            else {
                taskRun = new TaskRunnable(id, task);
                
                isNew = true;
            }
        }
        
        if (isNew)
            submit(taskRun);
    }

    /**
     * Подождать окончания выполнения задач для данного идентификатора.
     * 
     * @param id Идентификатор.
     * @throws InterruptedException В случае прерывания.
     */
    public static void awaitCompletion(Integer id) throws InterruptedException {
        TaskRunnable taskRun;
        
        synchronized (mux) {
            taskRun = tasks.get(id);
            
            if (taskRun == null)
                return; // Нет активных задач.
        }
        
        taskRun.awaitCompletion();
    }

    /**
     * Отдать TaskRunnable в Executor. 
     * 
     * @param taskRun TaskRunnable. 
     */
    private static void submit(TaskRunnable taskRun) {
        /*
            Тут кладете Runnable в Executor. Делаем это вне synchronized, так как в зависимости от конфигурации пула,
            задача может начать выполнение в этом же потоке. 
            Как быть с RejectedExecutionException, решайте сами.
        */
    }
    
    /**
     * Класс, выполняющий задачи.
     */
    private static class TaskRunnable implements Runnable {
        /** Идентификатор. */
        private final int id;

        /** Latch, означающий окончание выполнение задач для текушего id. */
        private final CountDownLatch completeLatch = new CountDownLatch(1);
        
        /** Текущая задача. */
        private Task task;

        /**
         * Конструктор.
         * 
         * @param id Идентификатор.
         * @param task Задача.
         */
        public TaskRunnable(int id, Task task) {
            assert Thread.holdsLock(mux); // Должны быть внутри synchronized для видимости.
            
            this.id = id;
            this.task = task;
        }

        /**
         * Добавить следующую задачу.
         * 
         * @param task Следующая задача.
         */
        public void addTask(Task task) {
            synchronized (mux) {
                this.task = task;
            }
        }
        
        /** {@inheritDoc} */
        @Override 
        public void run() {
            while (true) {
                Task task0;
                
                synchronized (mux) {
                    task0 = task;
                    
                    if (task0 == null) {
                        // Все, больше нет задач в очереди.
                        tasks.remove(id);
                        
                        completeLatch.countDown(); 
                        
                        return;
                    }
                    else
                        task = null; // Занулим следующую задачу.
                }
                
                try {
                    task0.run(); // Выполнить задачу.
                }
                catch (Exception e) {
                    // Обработка на ваше усмотрение.
                }
            }
        }

        /**
         * Подождать окончания выполнения задачи.
         *
         * @throws InterruptedException В случае прерывания.
         */
        public void awaitCompletion() throws InterruptedException {
            completeLatch.await();
        }
    }

    /**
     * Ваша задача.
     */
    private static class Task implements Runnable {
        /** {@inheritDoc} */
        @Override 
        public void run() {
            // Логика задачи.
        }
    }
}

...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515440
Лагман
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
У меня на слове "ложить" произошел эксепшн.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515441
Alex Kuznetsov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник,

Есть предложение:
1. организовать один поток на одного клиента, который будет обрабатывать очередь хранящуюся в потоке,
2. поток обернуть в отдельный класс, который реализует механизм добавления задачи в очередь ( с отслеживанием типа и времени старта задачи) ,
3. для обеспечения уникальности очереди для клиента - запретить открытие второй сессии с одним и тем-же идентификатором клиента
4. контроллер в данном случае тупо отсылает идентификатор и параметры задачи в класс обёртку потока

Если нет желания организовывать для каждого клиента поток, то можно сделать один класс, который организует добавление задачи в список, который содержит в себе очереди для каждого клиента. Также организовать пул потоков, которые выбирают и обрабатывают задачи из очередей.

Как-то так, при первом приближении.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515445
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Alex Kuznetsov забыл ник,

Есть предложение:
1. организовать один поток на одного клиента, который будет обрабатывать очередь хранящуюся в потоке,
2. поток обернуть в отдельный класс, который реализует механизм добавления задачи в очередь ( с отслеживанием типа и времени старта задачи) ,
3. для обеспечения уникальности очереди для клиента - запретить открытие второй сессии с одним и тем-же идентификатором клиента
4. контроллер в данном случае тупо отсылает идентификатор и параметры задачи в класс обёртку потока


Думал об этом, но поток на каждого клиента - слишком жирно, этого как раз и хочу избежать, именно поэтому в решении будет экзекьютор, пока еще не знаю как, но потоки плодить точно не хочу.

Alex KuznetsovЕсли нет желания организовывать для каждого клиента поток, то можно сделать один класс, который организует добавление задачи в список, который содержит в себе очереди для каждого клиента. Также организовать пул потоков, которые выбирают и обрабатывают задачи из очередей.

Как-то так, при первом приближении


Да, с этого примерно и начинал, но слишком громоздко, очень большое пространство состояний и трудно написать корректно. Собственно вот и пришел к моему варианту, сейчас думаю допиливать его или все-таки попробовать такой вариант с нуля опять.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515447
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
ЛагманУ меня на слове "ложить" произошел эксепшн.

Пфф, try нельзя опускать...
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515449
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
cdtyjv забыл ник ,
Ок, простейший блокирующий вариант. Отсутствие багов не гарантированно, код призван продемонстрировать идею.


Сейчас смотрю, на первый взгляд не совсем то что мне нужно, сейчас попробую детально разобраться. Вроде как в вашем варианте могут выполняться два таска с одним айди, в любом случае спасибо за то что уделили время. Позже отпишусь
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515458
Alex Kuznetsov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник... слишком громоздко, очень большое пространство состояний и трудно написать корректно... Вот здесь не понял... что сложно? добавить задачу в очередь? сделать пул потоков, которые будут выбирать задачи для исполнения? и к тому-же что означает фраза "очень большое пространство состояний" ? Чем определяется что оно большое? Сколько состояний? и состояний чего?

В общем как всегда, глаза боятся, а руки делают.

Тут вот ещё одна идея родилась: не плодить поток на каждого клиента, а сделать очередь для каждого клиента и пул потоков, которые выбирают задачи из очередей. просто помимо очереди для каждого клиента, должна быть ещё одна очередь, содержащая ссылки на задачи из очередей клиентов, ну и собственно тогда потоки выполняют задачи и после выполнения удаляют их из обеих очередей. Идея в том, что класс, который ведёт очередь задач клиентов сам следит за её заполнением и в случае успешного добавления задачи в очередь - просто дополняет ссылку на задачу в глобальную очередь. Ну и понятно, что поток из пула обращается к глобальной очереди за задачей и обрабатывает её...
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515470
cdtyjv
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никcdtyjv забыл ник ,
Ок, простейший блокирующий вариант. Отсутствие багов не гарантированно, код призван продемонстрировать идею.


Сейчас смотрю, на первый взгляд не совсем то что мне нужно, сейчас попробую детально разобраться. Вроде как в вашем варианте могут выполняться два таска с одним айди, в любом случае спасибо за то что уделили время. Позже отпишусьЭто бага. Сейчас:
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
    /**
     * Добавить задачу.
     *
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(Integer id, Task task) {
        boolean isNew = false;

        TaskRunnable taskRun;

        synchronized (mux) {
            taskRun = tasks.get(id);

            if (taskRun != null)
                taskRun.addTask(task);
            else {
                taskRun = new TaskRunnable(id, task);

                isNew = true;
            }
        }

        if (isNew)
            submit(taskRun);
    }

Должно быть:
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
    /**
     * Добавить задачу.
     *
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(Integer id, Task task) {
        boolean isNew = false;

        TaskRunnable taskRun;

        synchronized (mux) {
            taskRun = tasks.get(id);

            if (taskRun != null)
                taskRun.addTask(task);
            else {
                taskRun = new TaskRunnable(id, task);

                TaskRunnable oldTaskRun = tasks.put(id, taskRun);
                
                assert oldTaskRun == null;
                
                isNew = true;
            }
        }

        if (isNew)
            submit(taskRun);
    }

...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515474
cdtyjv
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Alex KuznetsovТут вот ещё одна идея родилась: не плодить поток на каждого клиента, а сделать очередь для каждого клиента и пул потоков, которые выбирают задачи из очередей. просто помимо очереди для каждого клиента, должна быть ещё одна очередь, содержащая ссылки на задачи из очередей клиентов, ну и собственно тогда потоки выполняют задачи и после выполнения удаляют их из обеих очередей. Идея в том, что класс, который ведёт очередь задач клиентов сам следит за её заполнением и в случае успешного добавления задачи в очередь - просто дополняет ссылку на задачу в глобальную очередь. Ну и понятно, что поток из пула обращается к глобальной очереди за задачей и обрабатывает её...См. мой код выше. Не надо никаких очередей. Одной мапки "id -> Runnable" достаточно.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515518
Alexey Tomin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никСтолкнулся с интересной проблемой, нужна помощь:)


Не могу предложить решение, но предлагаю разделить задачу на две чести.
1. Класс "выполни шаг хх для пользователя уу".
2. Собственно интерфейс пользователя.

Мне кажется, что http плохо подходит для длинных задач. Тут ajax нужен. Он поможет сделать п.2 так, чтобы пользователь не путался. Ну или выдавать страницы "ждите ответа". Но ajax красивее.

А если описать задачи 1 и 2 как независимые, с чётким контрактом на каждую операцию (а взаимодействие между ними- нечто вроде модного REST)- то решение, как мне кажется, придёт само.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515521
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
cdtyjvЭто бага.

Все равно есть проблема. Смотрите, в вашем варианте пришел таск с айди=1 например, стал выполняться (тупо спит 20 сек), через 2с приходит новый таск с айди =1, в вашем коде он реджектнется. А мне нужно чтобы он выполнился сразу после того как выполнится первый. Если пока выполняется первый пришли еще таски с айди =1, то должен выполниться только последний.
Я говорю, задача далеко не такая тривиальная как кажется. Вот Я буквально добавил пару сообщений логирования и метод мэйн -
Просто запустите

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TaskBlocking {
    /** Монитор. */
    private static final Object mux = new Object();
    
    /** Задачи. */
    private static final Map<Integer, TaskRunnable> tasks = new HashMap<Integer, TaskRunnable>();
    
    static ExecutorService e = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
    	Task t = new Task();
    	Task t2 = new Task();
    	addTask(1, t);
    	addTask(1, t2);
    }
    
    /**
     * Добавить задачу.
     *
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(Integer id, Task task) {
        boolean isNew = false;

        TaskRunnable taskRun;
        System.out.println("Trying add task");
        synchronized (mux) {
        	System.out.println("Get lock");
            taskRun = tasks.get(id);

            if (taskRun != null) {
            	System.out.println("Task already exist");
                taskRun.addTask(task);
            }
            else {
            	System.out.println("Put new task");
                taskRun = new TaskRunnable(id, task);

                TaskRunnable oldTaskRun = tasks.put(id, taskRun);
                
                assert oldTaskRun == null;
                
                isNew = true;
            }
        }
        System.out.println("Before end");
        if (isNew){
        	submit(taskRun);
        	System.out.println("Task submitted");
        }
        System.out.println("Add finished");
    }

    /**
     * Подождать окончания выполнения задач для данного идентификатора.
     * 
     * @param id Идентификатор.
     * @throws InterruptedException В случае прерывания.
     */
    public static void awaitCompletion(Integer id) throws InterruptedException {
        TaskRunnable taskRun;
        
        synchronized (mux) {
            taskRun = tasks.get(id);
            
            if (taskRun == null)
                return; // Нет активных задач.
        }
        
        taskRun.awaitCompletion();
    }

    /**
     * Отдать TaskRunnable в Executor. 
     * 
     * @param taskRun TaskRunnable. 
     */
    private static void submit(TaskRunnable taskRun) {
    	e.submit(taskRun);
        /*
            Тут кладете Runnable в Executor. Делаем это вне synchronized, так как в зависимости от конфигурации пула,
            задача может начать выполнение в этом же потоке. 
            Как быть с RejectedExecutionException, решайте сами.
        */
    }
    
    /**
     * Класс, выполняющий задачи.
     */
    private static class TaskRunnable implements Runnable {
        /** Идентификатор. */
        private final int id;

        /** Latch, означающий окончание выполнение задач для текушего id. */
        private final CountDownLatch completeLatch = new CountDownLatch(1);
        
        /** Текущая задача. */
        private Task task;

        /**
         * Конструктор.
         * 
         * @param id Идентификатор.
         * @param task Задача.
         */
        public TaskRunnable(int id, Task task) {
            assert Thread.holdsLock(mux); // Должны быть внутри synchronized для видимости.
            
            this.id = id;
            this.task = task;
        }

        /**
         * Добавить следующую задачу.
         * 
         * @param task Следующая задача.
         */
        public void addTask(Task task) {
            synchronized (mux) {
                this.task = task;
            }
        }
        
        /** {@inheritDoc} */
        @Override 
        public void run() {
            while (true) {
                Task task0;
                
                synchronized (mux) {
                    task0 = task;
                    
                    if (task0 == null) {
                        // Все, больше нет задач в очереди.
                        tasks.remove(id);
                        
                        completeLatch.countDown(); 
                        
                        return;
                    }
                    else
                        task = null; // Занулим следующую задачу.
                }
                
                try {
                    task0.run(); // Выполнить задачу.
                }
                catch (Exception e) {
                    // Обработка на ваше усмотрение.
                }
            }
        }

        /**
         * Подождать окончания выполнения задачи.
         *
         * @throws InterruptedException В случае прерывания.
         */
        public void awaitCompletion() throws InterruptedException {
            completeLatch.await();
        }
        

    }

    /**
     * Ваша задача.
     */
    private static class Task implements Runnable {
        /** {@inheritDoc} */
        @Override 
        public void run() {
            System.out.println("Task started");
            try {
				Thread.sleep(5000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
            System.out.println("Task stopped");
        }
    }
}


...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515523
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никЗадача с рабочего проекта(веб). Смысл вот в чем - имеется визард, на каждой странице пользователь заполняет поля, жмет Некст, в это время посылается запрос на веб-сервис на их сохранение и после этого рендерится страница. Вызов веб-сервиса долгий, не хочется мучать юзера, поэтому появилась мысль ложить таск по сохранению атрибутов в некое хранилище и выполнять его асинхронно.
и чем данная задача отличается от классики - "Набор товаров в корзину"? И потом, один Run - купить?
Если это классический визард, то почему таск идёт 20сек....и неужели не важно на 3-ем шаге визарда, куда повернул пользователь во втором шаге? (набрасываем задачи в потоке)

Alexey Tomin
он тут на веб сервис ссылается. Который писал не он, и он долгий...20сек.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515525
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Alex Kuznetsovзабыл ник... слишком громоздко, очень большое пространство состояний и трудно написать корректно... Вот здесь не понял... что сложно? добавить задачу в очередь? сделать пул потоков, которые будут выбирать задачи для исполнения? и к тому-же что означает фраза "очень большое пространство состояний" ? Чем определяется что оно большое? Сколько состояний? и состояний чего?

В общем как всегда, глаза боятся, а руки делают.

Тут вот ещё одна идея родилась: не плодить поток на каждого клиента, а сделать очередь для каждого клиента и пул потоков, которые выбирают задачи из очередей. просто помимо очереди для каждого клиента, должна быть ещё одна очередь, содержащая ссылки на задачи из очередей клиентов, ну и собственно тогда потоки выполняют задачи и после выполнения удаляют их из обеих очередей. Идея в том, что класс, который ведёт очередь задач клиентов сам следит за её заполнением и в случае успешного добавления задачи в очередь - просто дополняет ссылку на задачу в глобальную очередь. Ну и понятно, что поток из пула обращается к глобальной очереди за задачей и обрабатывает её...

Вот вы мыслите почти как я вначале. Я тоже думал об очереди для каждого айди и многих читателях. Потом я подумал - а нафиг мне много читателей, если можно создать один поток, так называемый распределятор, тогда и несколько очередей не нужно - распределятор просто получает лок, пробегает по очереди - выкидывает лишние таски, если таск уже выполняется с таким айди - кладет его обратно в хвост, и тд. Вот собственно мы и пришли к моему варианту.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515526
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никЯ говорю, задача далеко не такая тривиальная как кажется.
ты против одного потока на юзверя. Как тогда работает контейнер сервлетов при потоке на вызов сервлета (хотя и с настройками).
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515529
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Alexey Tominзабыл никСтолкнулся с интересной проблемой, нужна помощь:)


Не могу предложить решение, но предлагаю разделить задачу на две чести.
1. Класс "выполни шаг хх для пользователя уу".
2. Собственно интерфейс пользователя.

Мне кажется, что http плохо подходит для длинных задач. Тут ajax нужен. Он поможет сделать п.2 так, чтобы пользователь не путался. Ну или выдавать страницы "ждите ответа". Но ajax красивее.

А если описать задачи 1 и 2 как независимые, с чётким контрактом на каждую операцию (а взаимодействие между ними- нечто вроде модного REST)- то решение, как мне кажется, придёт само.

Не-не-не, вы все не так поняли. Поясняю.
Есть визард, юзер заполнил страницу, нажал некст - по бизнес-логике мне надо заслать данные в веб-сервис, но он калечный и время выполнения 5-20 сек, которые юзер вынужден ждать, прежде чем ему покажется новый скрин(оно и так на аяксе работает).
Так как страница 2 содержит в скрытых полях и данные с первой страницы, и так далее - то пришла идея - а зачем мучать пользователя, то есть мы ацептаем данные, ложим их в какую-то очередь, рендерим следующую страницу. Пока пользователь заполняет следующую страницу, таск выполняется асинхронно.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515530
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123забыл никЗадача с рабочего проекта(веб). Смысл вот в чем - имеется визард, на каждой странице пользователь заполняет поля, жмет Некст, в это время посылается запрос на веб-сервис на их сохранение и после этого рендерится страница. Вызов веб-сервиса долгий, не хочется мучать юзера, поэтому появилась мысль ложить таск по сохранению атрибутов в некое хранилище и выполнять его асинхронно.
и чем данная задача отличается от классики - "Набор товаров в корзину"? И потом, один Run - купить?
Если это классический визард, то почему таск идёт 20сек....и неужели не важно на 3-ем шаге визарда, куда повернул пользователь во втором шаге? (набрасываем задачи в потоке)

Alexey Tomin
он тут на веб сервис ссылается. Который писал не он, и он долгий...20сек.

Все правильно, расписал в ответе Алексею
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515531
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123забыл никЯ говорю, задача далеко не такая тривиальная как кажется.
ты против одного потока на юзверя. Как тогда работает контейнер сервлетов при потоке на вызов сервлета (хотя и с настройками).
Все правильно, я против потока на каждый нттр запрос + такое же количество на мой экзекьютор, логично же?
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515534
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник,
теперь понятно...почти)))
Alex Kuznetsov говорил про Состояния....их много...т.е. Таски влияют друг на друга?
Тогда нужно ждать каждый таск для след.таска.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515535
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никПока пользователь заполняет следующую страницу, таск выполняется асинхронно.
а если первый таск - "Удалить и выйти"? ))
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515542
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123забыл ник,
теперь понятно...почти)))
Alex Kuznetsov говорил про Состояния....их много...т.е. Таски влияют друг на друга?
Тогда нужно ждать каждый таск для след.таска.

неа, ждать не надо, новый по времени таск, содержит в себе данные всех предыдущих. Выполняется таск для 1 страницы, пока он выполняется пришли таски со 2 и с 3. 3 содержит в себе 2, смысла выполнять 2 нет. сейчас распишу остальные требования

1) Есть N клиентов, каждый из которых имеет уникальный идентификатор.
Ну тут понятно, визард могут проходить несколько людей одновременно
2) Тип тасков один и тот же, отличается только атрибутами. Таск выполняется долго(от 5 до 20 с)
Все таски суть - возьми инпуты со страницы и пошли на веб-сервис, выполняется иногда быстро иногда весьма долго
3) Если выполняется задача для какого-то клиента, нельзя позволить старт таска с таким же уникальным идентификатором. Таким образом в одно и тоже время может выполняться один таск с данным айди, но сколько угодно, если айди разные.
Если пришел новый таск в идеале я бы должен кансельнуть выполняюшийся, но это нереализуемо. Нужно ждать потому что если сразу выполнить новый таск, он может выполниться быстрее первого, и по итогу первый таск перепишет последний
4) Пока выполняется таск, могут придти несколько тасков с данным айди. Когда таск отработает, нужно взять самый последний таск(по времени) для этгого айди - остальные скипнуть.
это уже обьяснял, нет смысла выполнять сначала 2 страницу потом третью, если можно сразу третью
5) Нужна возможность сделать блокирующий вызов, который вернется только если нету выполняющихся тасков с данным айди, и нет тасков в очереди ожидания с данным айди. Дополнительно, если вызван этот блокирующий метод, то пока он не закончится, новые таски с данным айди гарантированно не попадут в очередь(это обеспечивается в данный момент, детали не важны).
Самое интересное - юзер нажал f5 - по бизнес логике прежде чем рендерить страницу я вынужден ждать выполнения всех предыдущих тасков
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515544
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123забыл никПока пользователь заполняет следующую страницу, таск выполняется асинхронно.
а если первый таск - "Удалить и выйти"? ))
такие таски тоже есть, они выполняются по-другому, именно для них и нужен метод - ПодождиПокаВсеТаскиВыполнятся
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515545
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Краткое резюме.

Сейчас у меня есть блокирующая очередь для тасков, есть распределятор, есть лист с номерами выполняющихся тасков, все

накрыто одним локом, кроме блокирующего метода take() у очереди, в принципе можно было бы уйти от блокирующей очереди и опрашивать очередь в цикле, и накрыть это опрашивание тем же локом, тогда все будет корректно. Но CPU будет жечься немеренно(состояние когда тасков нет достаточно обычно), пока в раздумьях
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515551
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
ну да и конечено есть экзекьютор, который выполняет таски и ему все фиолетово. По сути вся логика хитрая именно в распределяторе.
Проблема в методе ПодождиКомплитВсехТасков - я под локом проверяю - есть ли в очереди задачи для такого айди и выполняется ли что-нибудь для айди, пока эти условия соблюдаются - я жду, если нет - сразу выхожу. Проблема в том, что метод take() вне лока, то есть может случиться что ничего не выполняется для айди, распределятор выгреб таск из очереди, но не взял лок, чтобы положить айли в лист выполняющихся. Фейл
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515554
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никПо сути вся логика хитрая именно в распределяторе.
ну например,
ВИ №1
- шаг1 - Ввести адрес = проверка веб сервисом наличия города
- шаг2 - если города нет, то форма ввода города. Если есть, то возврат его ID.

Это классический визард. И как тут решать в потоках?
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515564
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123забыл никПо сути вся логика хитрая именно в распределяторе.
ну например,
ВИ №1
- шаг1 - Ввести адрес = проверка веб сервисом наличия города
- шаг2 - если города нет, то форма ввода города. Если есть, то возврат его ID.

Это классический визард. И как тут решать в потоках?

Нет, не до конца понятно обьяснил наверное. То что на странице - никого не колышет, все разруливается javascriptом. Вызовов сторонних сервисов нет. Сервис вызывается только если форма отвалидирована и все ок, этот сервис просто сохраняет ответы юзера, но работает долго, поэтому нужна асинхронность. Про веб можешь вообще забыть, речь идет о бэкэнде
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515572
cdtyjv
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никcdtyjvЭто бага.

Все равно есть проблема. Смотрите, в вашем варианте пришел таск с айди=1 например, стал выполняться (тупо спит 20 сек), через 2с приходит новый таск с айди =1, в вашем коде он реджектнется. А мне нужно чтобы он выполнился сразу после того как выполнится первый. Если пока выполняется первый пришли еще таски с айди =1, то должен выполниться только последний.
Я говорю, задача далеко не такая тривиальная как кажется. Вот Я буквально добавил пару сообщений логирования и метод мэйн -
Просто запустите

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TaskBlocking {
    /** Монитор. */
    private static final Object mux = new Object();
    
    /** Задачи. */
    private static final Map<Integer, TaskRunnable> tasks = new HashMap<Integer, TaskRunnable>();
    
    static ExecutorService e = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
    	Task t = new Task();
    	Task t2 = new Task();
    	addTask(1, t);
    	addTask(1, t2);
    }
    
    /**
     * Добавить задачу.
     *
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(Integer id, Task task) {
        boolean isNew = false;

        TaskRunnable taskRun;
        System.out.println("Trying add task");
        synchronized (mux) {
        	System.out.println("Get lock");
            taskRun = tasks.get(id);

            if (taskRun != null) {
            	System.out.println("Task already exist");
                taskRun.addTask(task);
            }
            else {
            	System.out.println("Put new task");
                taskRun = new TaskRunnable(id, task);

                TaskRunnable oldTaskRun = tasks.put(id, taskRun);
                
                assert oldTaskRun == null;
                
                isNew = true;
            }
        }
        System.out.println("Before end");
        if (isNew){
        	submit(taskRun);
        	System.out.println("Task submitted");
        }
        System.out.println("Add finished");
    }

    /**
     * Подождать окончания выполнения задач для данного идентификатора.
     * 
     * @param id Идентификатор.
     * @throws InterruptedException В случае прерывания.
     */
    public static void awaitCompletion(Integer id) throws InterruptedException {
        TaskRunnable taskRun;
        
        synchronized (mux) {
            taskRun = tasks.get(id);
            
            if (taskRun == null)
                return; // Нет активных задач.
        }
        
        taskRun.awaitCompletion();
    }

    /**
     * Отдать TaskRunnable в Executor. 
     * 
     * @param taskRun TaskRunnable. 
     */
    private static void submit(TaskRunnable taskRun) {
    	e.submit(taskRun);
        /*
            Тут кладете Runnable в Executor. Делаем это вне synchronized, так как в зависимости от конфигурации пула,
            задача может начать выполнение в этом же потоке. 
            Как быть с RejectedExecutionException, решайте сами.
        */
    }
    
    /**
     * Класс, выполняющий задачи.
     */
    private static class TaskRunnable implements Runnable {
        /** Идентификатор. */
        private final int id;

        /** Latch, означающий окончание выполнение задач для текушего id. */
        private final CountDownLatch completeLatch = new CountDownLatch(1);
        
        /** Текущая задача. */
        private Task task;

        /**
         * Конструктор.
         * 
         * @param id Идентификатор.
         * @param task Задача.
         */
        public TaskRunnable(int id, Task task) {
            assert Thread.holdsLock(mux); // Должны быть внутри synchronized для видимости.
            
            this.id = id;
            this.task = task;
        }

        /**
         * Добавить следующую задачу.
         * 
         * @param task Следующая задача.
         */
        public void addTask(Task task) {
            synchronized (mux) {
                this.task = task;
            }
        }
        
        /** {@inheritDoc} */
        @Override 
        public void run() {
            while (true) {
                Task task0;
                
                synchronized (mux) {
                    task0 = task;
                    
                    if (task0 == null) {
                        // Все, больше нет задач в очереди.
                        tasks.remove(id);
                        
                        completeLatch.countDown(); 
                        
                        return;
                    }
                    else
                        task = null; // Занулим следующую задачу.
                }
                
                try {
                    task0.run(); // Выполнить задачу.
                }
                catch (Exception e) {
                    // Обработка на ваше усмотрение.
                }
            }
        }

        /**
         * Подождать окончания выполнения задачи.
         *
         * @throws InterruptedException В случае прерывания.
         */
        public void awaitCompletion() throws InterruptedException {
            completeLatch.await();
        }
        

    }

    /**
     * Ваша задача.
     */
    private static class Task implements Runnable {
        /** {@inheritDoc} */
        @Override 
        public void run() {
            System.out.println("Task started");
            try {
				Thread.sleep(5000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
            System.out.println("Task stopped");
        }
    }
}


Ну здрасти :-) Вы сами сказали, что более новая задача должна перезаписывать более старые, если они еще не начались. Именно это здесь и происходит, и никакого реджекта там нет. Запустите код ниже сначала как есть, а потом с раскомментированной строкой в методе main:
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
public class TaskBlocking {
    /** Монитор. */
    private static final Object mux = new Object();

    /** Задачи. */
    private static final Map<Integer, TaskRunnable> tasks = new HashMap<Integer, TaskRunnable>();

    static ExecutorService e = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws Exception {
        addTask(1, new Task(1));

        // Thread.sleep(200); // Расскомментируйте, и увидите, что выполнится и первый, и второй.

        addTask(1, new Task(2));
    }

    /**
     * Добавить задачу.
     *
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(Integer id, Task task) {
        boolean isNew = false;

        TaskRunnable taskRun;
        System.out.println("Trying add task");
        synchronized (mux) {
            System.out.println("Get lock");
            taskRun = tasks.get(id);

            if (taskRun != null) {
                System.out.println("Task already exist");
                taskRun.addTask(task);
            }
            else {
                System.out.println("Put new task");
                taskRun = new TaskRunnable(id, task);

                TaskRunnable oldTaskRun = tasks.put(id, taskRun);

                assert oldTaskRun == null;

                isNew = true;
            }
        }
        System.out.println("Before end");
        if (isNew){
            submit(taskRun);
            System.out.println("Task submitted");
        }
        System.out.println("Add finished");
    }

    /**
     * Подождать окончания выполнения задач для данного идентификатора.
     *
     * @param id Идентификатор.
     * @throws InterruptedException В случае прерывания.
     */
    public static void awaitCompletion(Integer id) throws InterruptedException {
        TaskRunnable taskRun;

        synchronized (mux) {
            taskRun = tasks.get(id);

            if (taskRun == null)
                return; // Нет активных задач.
        }

        taskRun.awaitCompletion();
    }

    /**
     * Отдать TaskRunnable в Executor.
     *
     * @param taskRun TaskRunnable.
     */
    private static void submit(TaskRunnable taskRun) {
        e.submit(taskRun);
        /*
            Тут кладете Runnable в Executor. Делаем это вне synchronized, так как в зависимости от конфигурации пула,
            задача может начать выполнение в этом же потоке.
            Как быть с RejectedExecutionException, решайте сами.
        */
    }

    /**
     * Класс, выполняющий задачи.
     */
    private static class TaskRunnable implements Runnable {
        /** Идентификатор. */
        private final int id;

        /** Latch, означающий окончание выполнение задач для текушего id. */
        private final CountDownLatch completeLatch = new CountDownLatch(1);

        /** Текущая задача. */
        private Task task;

        /**
         * Конструктор.
         *
         * @param id Идентификатор.
         * @param task Задача.
         */
        public TaskRunnable(int id, Task task) {
            assert Thread.holdsLock(mux); // Должны быть внутри synchronized для видимости.

            this.id = id;
            this.task = task;
        }

        /**
         * Добавить следующую задачу.
         *
         * @param task Следующая задача.
         */
        public void addTask(Task task) {
            synchronized (mux) {
                this.task = task;
            }
        }

        /** {@inheritDoc} */
        @Override
        public void run() {
            while (true) {
                Task task0;

                synchronized (mux) {
                    task0 = task;

                    if (task0 == null) {
                        // Все, больше нет задач в очереди.
                        tasks.remove(id);

                        completeLatch.countDown();

                        return;
                    }
                    else
                        task = null; // Занулим следующую задачу.
                }

                try {
                    task0.run(); // Выполнить задачу.
                }
                catch (Exception e) {
                    // Обработка на ваше усмотрение.
                }
            }
        }

        /**
         * Подождать окончания выполнения задачи.
         *
         * @throws InterruptedException В случае прерывания.
         */
        public void awaitCompletion() throws InterruptedException {
            completeLatch.await();
        }


    }

    /**
     * Ваша задача.
     */
    private static class Task implements Runnable {

        public final int id;

        private Task(int id) {
            this.id = id;
        }

        /** {@inheritDoc} */
        @Override
        public void run() {
            System.out.println("Task started [id=" + id + ']');
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("Task stopped [id=" + id + ']');
        }
    }
}

...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515577
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никПро веб можешь вообще забыть, речь идет о бэкэнде
дело твоё. Я не понимаю, как можно писать код бэкенд без оглядки на код клиента. И наоборот.
Без разницы на чём он написан. ВИ или прецендент - он вообще без кода. Но потоковость работы очень сильно от него зависит.
Ты упорно выбрал себе "Таск" как метод решения.
То веб, то не веб....то визард, но ОДНА форма.
Может просто ты бэкенд-программист, но вероятно есть более простое решение (с участием клиента).
IMHO
Удачи!
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515583
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123забыл никПро веб можешь вообще забыть, речь идет о бэкэнде
дело твоё. Я не понимаю, как можно писать код бэкенд без оглядки на код клиента. И наоборот.
Без разницы на чём он написан. ВИ или прецендент - он вообще без кода. Но потоковость работы очень сильно от него зависит.
Ты упорно выбрал себе "Таск" как метод решения.
То веб, то не веб....то визард, но ОДНА форма.
Может просто ты бэкенд-программист, но вероятно есть более простое решение (с участием клиента).
IMHO
Удачи!

Емае, клиент сайд тут вообще не при чем, приложение уже год в продакшене, все ок, и все разруливается. Проблема была - долгое ожидание пользователей, вот она и решается, для этого сохранение формы решено перенести в асинхронный режим, клиент сайд как работал так и продолжает работать.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515584
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
cdtyjvНу здрасти :-) Вы сами сказали, что более новая задача должна перезаписывать более старые, если они еще не начались. Именно это здесь и происходит, и никакого реджекта там нет. Запустите код ниже сначала как есть, а потом с раскомментированной строкой в методе main:

Да, убедили, я был не прав. Согласен, что решение гораздо лучше моего. Пока вижу одну проблему, но некритичную на данный момент, пока еще гоняю тесты, но вроде все требования соблюдены.

А проблема такая - Представим что одновременно визард проходят 11 юезров(на 1 больше чем в экзекьюторе). Проходят активно, то есть пока таск выполняется они успевают заполнить еще одну страницу. 10 юзеров будут в шоколаде, а 11 получит starvation. Ситуация малореальная с нашей текущей нагрузкой, но вдруг? Что скажете?
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515589
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
OFF
забыл никЕмае
Емае.
Напиши более конкретный ВИ.
Иначе по твоему тексту подойдёт AJAX вызов на JS асинхронно с клиента с доп.текстом "На сервер ВСЁ отправлено".....ожидать не надо.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515591
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123OFF
забыл никЕмае
Емае.
Напиши более конкретный ВИ.
Иначе по твоему тексту подойдёт AJAX вызов на JS асинхронно с клиента с доп.текстом "На сервер ВСЁ отправлено".....ожидать не надо.
Блин я твоя не понимать, примерно так все и будет работать, как ты написал, только к чему это? Клиент готов и в него никаких правок не планируется, а вот логика перехода между страницами на бэкэнде поменяется, раньше сохранение было синхронным, а теперь асинхронное, как это сделать с некоторыми рестрикшенами и был разговор, а ты о чем?
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515593
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник,
я спрашивал выше про:
"и чем данная задача отличается от классики - "Набор товаров в корзину"? И потом, один Run - купить?"
Которая решена ещё при Иване Грозном?
Не увидел или не понял твой ответ.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515594
cdtyjv
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник ,
Ну вообще это достаточно серьезный косяк. Решений несколько, и все они разрливаются через мануальное создание пула.
1) Использовать ThreadPoolExecutor.CallerRunsPolicy. Это самый простой вариант - если не получается выполнить асинхронно, то выполнить синхронно. Но не факт, что он вас устроит по двум причинам:
- Долгий ответ клиенту, от которого хотелось избавиться.
- Starvation никуда не делся, а просто переместился в другое место - теперь у вас может застарвиться один клиент, если он в первом окне браузера начал синхронное выполнение, а во втором окне постоянно тыкает вперед-назад. В этом случае, вы рискуете в первом окне браузера так и не получить ответ :-)
В общем, сомнительное решение.

2) Более заморочный вариант:
2.1) Создать руками BlockingQueue.
2.2) Передать его в ThreadPoolExecutor.
2.3) Класть задачи не в ThreadPoolExecutor, а напрямую в BlockingQueue.
2.4) Если какой-то TaskRunnable слишком долго выполняется, то форсировать его завершение, и класть в конец этой очереди.
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
222.
223.
224.
225.
226.
227.
228.
229.
230.
public class TaskBlockingAndQueued {
    /** Монитор. */
    private static final Object mux = new Object();

    /** Задачи. */
    private static final Map<Integer, TaskRunnable> tasks = new HashMap<Integer, TaskRunnable>();

    /** Очередь задач. */
    private static final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

    /** Пул. */
    private static final ExecutorService e = new ThreadPoolExecutor(8, 8, 1000L, TimeUnit.MILLISECONDS, taskQueue);

    public static void main(String[] args) throws Exception {
        addTask(1, new Task(1));

        //Thread.sleep(200); // Расскомментируйте, и увидите, что выполнится и первый, и второй.

        addTask(1, new Task(2));

        e.shutdown();
    }

    /**
     * Добавить задачу.
     *
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(Integer id, Task task) {
        boolean isNew = false;

        TaskRunnable taskRun;

        synchronized (mux) {
            taskRun = tasks.get(id);

            if (taskRun != null)
                taskRun.addTask(task);
            else {
                System.out.println("Put new task");
                taskRun = new TaskRunnable(id, task);

                TaskRunnable oldTaskRun = tasks.put(id, taskRun);

                assert oldTaskRun == null;

                isNew = true;
            }
        }

        if (isNew)
            submit(taskRun);
    }

    /**
     * Подождать окончания выполнения задач для данного идентификатора.
     *
     * @param id Идентификатор.
     * @throws InterruptedException В случае прерывания.
     */
    public static void awaitCompletion(Integer id) throws InterruptedException {
        TaskRunnable taskRun;

        synchronized (mux) {
            taskRun = tasks.get(id);

            if (taskRun == null)
                return; // Нет активных задач.
        }

        taskRun.awaitCompletion();
    }

    /**
     * Отдать TaskRunnable в Executor.
     *
     * @param taskRun TaskRunnable.
     */
    private static void submit(TaskRunnable taskRun) {
        try {
            taskQueue.put(taskRun);
        }
        catch (InterruptedException e) {
            // Handle.
        }
    }

    /**
     * Класс, выполняющий задачи.
     */
    private static class TaskRunnable implements Runnable {
        /** Идентификатор. */
        private final int id;

        /** Latch, означающий окончание выполнение задач для текушего id. */
        private final CountDownLatch completeLatch = new CountDownLatch(1);

        /** Текущая задача. */
        private Task task;

        /**
         * Конструктор.
         *
         * @param id Идентификатор.
         * @param task Задача.
         */
        public TaskRunnable(int id, Task task) {
            assert Thread.holdsLock(mux); // Должны быть внутри synchronized для видимости.

            this.id = id;
            this.task = task;
        }

        /**
         * Добавить следующую задачу.
         *
         * @param task Следующая задача.
         */
        public void addTask(Task task) {
            synchronized (mux) {
                this.task = task;
            }
        }

        /** {@inheritDoc} */
        @Override
        public void run() {
            while (true) {
                boolean proceed = false;

                Task task0;

                synchronized (mux) {
                    task0 = task;

                    if (task0 == null) {
                        // Все, больше нет задач в очереди.
                        tasks.remove(id);

                        completeLatch.countDown();

                        return;
                    }
                    else {
                        if (canProceed()) {
                            proceed = true;

                            task = null; // Занулим следующую задачу.
                        }
                    }
                }

                if (proceed) {
                    try {
                        task0.run(); // Выполнить задачу.
                    }
                    catch (Exception e) {
                        // Обработка на ваше усмотрение.
                    }
                }
                else {
                    returnToQueue();
                    
                    return;
                }
            }
        }

        /**
         * Подождать окончания выполнения задачи.
         *
         * @throws InterruptedException В случае прерывания.
         */
        public void awaitCompletion() throws InterruptedException {
            completeLatch.await();
        }

        /**
         * Проверить, можно ли продолжать выполнение задач, или надо вернуть объект в конец очереди.
         *
         * @return {@code true}, если
         */
        private boolean canProceed() {
            // Ваша имплементация.
            return true;
        }

        /**
         * Вернуть задачу в очередь.
         */
        private void returnToQueue() {
            // Ваша исплементация, напр.:
            try {
                taskQueue.put(this);
            }
            catch (InterruptedException e) {
                // Handle.
            }
        }
        
    }

    /**
     * Ваша задача.
     */
    private static class Task implements Runnable {

        private final int id;

        private Task(int id) {
            this.id = id;
        }

        /** {@inheritDoc} */
        @Override
        public void run() {
            System.out.println("Task started [id=" + id + "]");

            try {
                Thread.sleep(2000);
            }
            catch (InterruptedException e) {
                // Handle.
            }

            System.out.println("Task finished [id=" + id + "]");
        }
    }
}



P.S.: Ну и помните дисклеймер про наличие багов. Это не production code, а демонстрация идеи.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515607
mvn3
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
забыл ник,

Вы знаете, у меня когда-то тоже была подобная проблема, унаследованная система, куча кода (вызов soap-сервиса из jsf-бина был довольно долгий после 6-ти шагов визарда). Правда, долго (около 1 минуты ) выполнялся только последний шаг (возможно это у Вас не так).

Так вот, в результате, в голове тоже начались появляться куча мыслей там, асинхронность...., пулы, оптимизации и прочие технические пакости. Все это естественно требовало время, тем более что код не мой. Так вот, ломая голову, ко мне подошел чел из UX и предложил очень простой вариант: давать фид-бек пользователю о состоянии выполнения запроса.

Т.е. юзер кликает финиш, и вместо того чтобы страница "зависала" на целую минуту, выводилось просто окно состояния выполнения операции (с апдейтом в каждые 7-8 сек.) т.к. "самое страшное для нас это не знание" (С) не помню кто)

Например:

1) Подготовка запроса
2) Передача данных
3) Обработка данных
ну и т.д.

Для кого-то это может показаться смешным, однако это работает, причем работает очень хорошо.
После этого использовал этот метод еще пару раз (Естественно главное не перегнуть палку ;) )
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515608
kagax
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
А использовать для этого quartz не получится?
тынц

Очередная кнопка Next пользователя в другой jvm случайно не может быть обработана по какой-нибудь причине?
А так все ж fail-over будет.

Правда в БД придется таблички под это создавать, но как вариант рассмотреть quartz?
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515628
Alex Kuznetsov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
cdtyjv забыл ник ,
...
2) Более заморочный вариант
2.1) Создать руками BlockingQueue.
2.2) Передать его в ThreadPoolExecutor.
2.3) Класть задачи не в ThreadPoolExecutor, а напрямую в BlockingQueue.
2.4) Если какой-то TaskRunnable слишком долго выполняется, то форсировать его завершение, и класть в конец этой очереди.
...Практически тот вариант, который я предложил...

И ещё вопрос на понимание к забыл ник , а вообще-то, по определению, визард предполагает исполнение задач уже после того как все данные в ходе его прохождения будут получены... может рассмотреть вариант набора данных в ходе прохождения визарда и только по нажатию кнопки "Готово" запускать на выполнение задачи?
Ну и соответственно, исходя из условий задачи, (ну и здравой логики), не давать выполняться новым задачам от одного и того-же пользователя, пока предыдущие не будут выполнены, а то получается что он может сначала запустить что-то на создание информации в базе, а затем следом на изменение, при этом создание ещё не завершилось, а изменение не понятно чего уже началось => неконсистентность данных...
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515701
cdtyjv
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник ,
Я тут подумал - не катит мое решение. Не могу толком сказать почему, но прям вот нутром чую, что это засада какая-то. Более того, у нас тут есть еще один источник starvation - это метод awaitCompletion. Если юзер кликнул F5 в одном окне, а в другом постоянно плодит новые задачи, то мы никогда не выйдем из awaitCompletion, то есть рефреш по F5 никогда не завершится.
Я предлагаю поменять данное требование: ждать не окончание всех задач для данного id, а окончание той задачи, которая была последней на момент клика на F5. Тогда решение может выглядеть так (помним про баги):
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
public class TestBlocking {
    /** Монитор. */
    private static final Object mux = new Object();

    /** Задачи. */
    private static final Map<Integer, Task> taskMap = new HashMap<>();

    /**
     * Добавить задачу.
     *
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(int id, Task task) {
        synchronized (mux) {
            Task previousTask = taskMap.get(id);

            if (previousTask != null) {
                boolean cancelled = previousTask.tryCancel(); // Пробуем отменить последнюю задачу.

                if (cancelled)
                    // Получилось, сохраним референс на текущую активную задачу.
                    task.setPreviousTask(previousTask.getPreviousTask());
                else
                    // Не получилось, значит последняя задача является активной.
                    task.setPreviousTask(previousTask);
            }

            taskMap.put(id, task);
        }

        try {
            // Каждая новая задача идет в отдельном Runnable.
            submit(task);
        }
        catch (RejectedExecutionException e) {
            // Обработать.
        }
    }

    /**
     * Подождать завершения последней поступившей задачи.
     *
     * @param id Идентификатор.
     * @throws InterruptedException Если произошло прерывание.
     */
    public static void awaitCompletion(int id) throws InterruptedException {
        Task task;

        synchronized (mux) {
            task = taskMap.get(id);
        }

        // Вновь поступающие задачи не влияют на завершение данной задачи. Starvation отсутствует.
        if (task != null)
            task.awaitCompletion();
    }

    /**
     * Отдать задачу в Executor.
     *
     * @param task Задача.
     */
    private static void submit(Task task) {
        // ...
    }

    /**
     * Задача.
     */
    private class Task implements Runnable {
        /** Идентификатор. */
        private final int id;

        /** Latch, сигнализирующий окончание работы задачи. */
        private final CountDownLatch completeLatch = new CountDownLatch(1);

        /** Предыдущая задача, которая была активна в момент добавления данной задачи. */
        private Task previousTask;

        /** Флаг начала выполнения задачи. */
        private boolean started;

        /** Флаг отмены задачи. */
        private boolean cancelled;

        /**
         * @param id Идентификатор.
         */
        private Task(int id) {
            this.id = id;
        }

        /**
         * Получить предыдущую задачу.
         *
         * @return Предыдущая задача.
         */
        private Task getPreviousTask() {
            synchronized (mux) {
                return previousTask;
            }
        }

        /**
         * Установить предыдущую задачу.
         *
         * @param previousTask Предыдущая задача.
         */
        private void setPreviousTask(Task previousTask) {
            synchronized (mux) {
                assert this.previousTask == null;

                this.previousTask = previousTask;
            }
        }

        /**
         * Подождать окончания работы задачи.
         *
         * @throws InterruptedException В случае прерывания.
         */
        public void awaitCompletion() throws InterruptedException {
            Task previousTask0;

            synchronized (mux) {
                previousTask0 = previousTask;
            }

            if (previousTask0 != null)
                previousTask0.awaitCompletion();

            completeLatch.await();
        }

        /**
         * Попробовать отменить задачу.
         *
         * @return {@code true}, если задача была отменена; {@code false}, если задачу нельзя отменить, так как она
         *     уже начала выполняться.
         */
        public boolean tryCancel() {
            synchronized (mux) {
                if (started)
                    return false; // Уже нельзя отменить.
                else {
                    cancelled = true;

                    completeLatch.countDown();

                    return true; // Успели отменить.
                }
            }
        }

        /** {@inheritDoc} */
        @Override
        public void run() {
            Task previousTask;

            synchronized (mux) {
                if (cancelled)
                    return;

                started = true;

                previousTask = this.previousTask;

                this.previousTask = null; // Что бы избежать memory leak.
            }

            try {
                previousTask.awaitCompletion();

                run0();
            }
            catch (InterruptedException e) {
                // Обработать.
            }
            finally {
                synchronized (mux) {
                    Task task0 = taskMap.get(id);

                    if (task0 == this) // Что бы не удалить более новую задачу.
                        taskMap.remove(id);

                    completeLatch.countDown();
                }
            }
        }

        /**
         * Логика задачи.
         */
        private void run0() {
            // ...
        }
    }
}

...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516023
Alexey Tomin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никЕсть визард, юзер заполнил страницу, нажал некст - по бизнес-логике мне надо заслать данные в веб-сервис, но он калечный и время выполнения 5-20 сек, которые юзер вынужден ждать, прежде чем ему покажется новый скрин(оно и так на аяксе работает).
Так как страница 2 содержит в скрытых полях и данные с первой страницы, и так далее - то пришла идея - а зачем мучать пользователя, то есть мы ацептаем данные, ложим их в какую-то очередь, рендерим следующую страницу. Пока пользователь заполняет следующую страницу, таск выполняется асинхронно.

Т.е. есть должна быть hashmap<ID пользователя, состояние мастера>
Когда приходит submit с формы смотрим, что там с сохранённым состоянием.
Если пришёл следующий к сохранённому шаг- всё хорошо, дополняем данными. Если нет- что-то там говорим.
Вызов сервиса- наверное в отдельных сервисных потоках.
--------------
Хотя вообще интересно- первоначально было
" То есть если выполняется таск для 1 страницы, в это время пришли со 2 и с 3, надо сделать так чтобы 2 вообще не выполнялся, а сразу 3(когда закончится таки 1)."
А что если сервис звать только для шага 4, а до этого копить данные?
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516087
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
cdtyjv
2) Более заморочный вариант:
2.1) Создать руками BlockingQueue.
2.2) Передать его в ThreadPoolExecutor.
2.3) Класть задачи не в ThreadPoolExecutor, а напрямую в BlockingQueue.
2.4) Если какой-то TaskRunnable слишком долго выполняется, то форсировать его завершение, и класть в конец этой очереди.

Вот что-то такое я и пытался сделать первоначально
cdtyjvP.S.: Ну и помните дисклеймер про наличие багов. Это не production code, а демонстрация идеи.
угу, это все понятно.

В общем ладно, я видел ваш последний вариант, пока занят другими делами, думаю в течение недели дойдут руки посмотреть, потом отпишусь, на самом деле допилил свой вариант, но он выглядит коряво, если ваш работает корректно, покажу разработчикам, что им понятнее пусть и выбирают, им же поддерживать
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516090
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mvn3забыл ник,

Вы знаете, у меня когда-то тоже была подобная проблема, унаследованная система, куча кода (вызов soap-сервиса из jsf-бина был довольно долгий после 6-ти шагов визарда). Правда, долго (около 1 минуты ) выполнялся только последний шаг (возможно это у Вас не так).

Так вот, в результате, в голове тоже начались появляться куча мыслей там, асинхронность...., пулы, оптимизации и прочие технические пакости. Все это естественно требовало время, тем более что код не мой. Так вот, ломая голову, ко мне подошел чел из UX и предложил очень простой вариант: давать фид-бек пользователю о состоянии выполнения запроса.

Т.е. юзер кликает финиш, и вместо того чтобы страница "зависала" на целую минуту, выводилось просто окно состояния выполнения операции (с апдейтом в каждые 7-8 сек.) т.к. "самое страшное для нас это не знание" (С) не помню кто)

Например:

1) Подготовка запроса
2) Передача данных
3) Обработка данных
ну и т.д.

Для кого-то это может показаться смешным, однако это работает, причем работает очень хорошо.
После этого использовал этот метод еще пару раз (Естественно главное не перегнуть палку ;) )

Все правильно, только в момем случае юзерам наплевать на статус задачи, для них просто нужно чтобы было быстро и корректно, они понятия не имеют что там вызываются какие-то веб-сервисы и тп. Если бы нужно было следить за статусом, непременно делал бы что-то похожее на то что вы предложили.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516092
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
kagaxА использовать для этого quartz не получится?
тынц

Очередная кнопка Next пользователя в другой jvm случайно не может быть обработана по какой-нибудь причине?
А так все ж fail-over будет.

Правда в БД придется таблички под это создавать, но как вариант рассмотреть quartz?

а чем тут quartz помог бы? Разверните мысль, хотя в принципе скажу сразу - что у нас БД в принципе нет:) SOA мать ее
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516099
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Alex KuznetsovИ ещё вопрос на понимание к забыл ник , а вообще-то, по определению, визард предполагает исполнение задач уже после того как все данные в ходе его прохождения будут получены... может рассмотреть вариант набора данных в ходе прохождения визарда и только по нажатию кнопки "Готово" запускать на выполнение задачи?
Ну и соответственно, исходя из условий задачи, (ну и здравой логики), не давать выполняться новым задачам от одного и того-же пользователя, пока предыдущие не будут выполнены, а то получается что он может сначала запустить что-то на создание информации в базе, а затем следом на изменение, при этом создание ещё не завершилось, а изменение не понятно чего уже началось => неконсистентность данных...

Вопросы хорошие, но специфика задачи такова, что все эти места обдуманы, там все удет ок. А выполнять все по кнопке Готово нельзя - потому что юзер может нажать Ф5 на любой странице и тп.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516104
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Alexey TominА что если сервис звать только для шага 4, а до этого копить данные?
Нельзя, юзер может нажать ф5, плюс в любой момент нажать сейвЕкзит - и в этот момент опять надо ждать пока все таски выполнятся.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516136
kagax
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
забыл ника чем тут quartz помог бы? Разверните мысль, хотя в принципе скажу сразу - что у нас БД в принципе нет:) SOA мать ее


1) при нажатии на кнопку создаем новую job1 в quartz с идентификатором entityNumber (с указанием, что запускать нужно сразу)
2) при нажатии очередной раз кнопки "Next" создаем новую job2. После чего создаем ( JobChangedJobListener ), в котором указываем, что запускать job2 нужно после job1.

База данных не обязательна - она нужна только на тот случай, чтобы, если jvm остановится, то запланированные job-ы будут выполнены после нового запуска jvm. Ну, или их подхватит и довыполнит другая jvm из кластера.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516151
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
kagax
1) при нажатии на кнопку создаем новую job1 в quartz с идентификатором entityNumber (с указанием, что запускать нужно сразу)
2) при нажатии очередной раз кнопки "Next" создаем новую job2. После чего создаем ( JobChangedJobListener ), в котором указываем, что запускать job2 нужно после job1.


А что если пока выполняется таск 1 пришли таск2-4, мне нужно выполнить только 4. Ну и плюс нужна возможность ждать пока все джобы выполняться и тд. В общем теже яйца, вид сбоку, смысла тянуть либу нет никакого.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516168
kagax
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
забыл никА что если пока выполняется таск 1 пришли таск2-4, мне нужно выполнить только 4. Ну и плюс нужна возможность ждать пока все джобы выполняться и тд. В общем теже яйца, вид сбоку, смысла тянуть либу нет никакого.

Можно узнать статус job и отменить запуск ненужных job.

Плюсов тут только в использовании более менее стандартного решения для управления job-ами. Но можно и самостоятельно реализовать нужный функционал.

Если базы нет и требования failover нет - то можно и не тянуть quartz
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516206
Alexey Tomin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никAlexey TominА что если сервис звать только для шага 4, а до этого копить данные?
Нельзя, юзер может нажать ф5, плюс в любой момент нажать сейвЕкзит - и в этот момент опять надо ждать пока все таски выполнятся.

Тогда мапа "юзер-состояние", пул экзекьюторов и команда "задачу в этот статус", которая либо меняет цель, либо говорит, что так нельзя.

Всё одно- надо разнести класс исполнения (поверх старого интерфейса) и интерфейс пользователя. И сделать постановку задачи отдельно для каждого. Без этого счастья не будет.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516346
Alex Kuznetsov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Alexey Tomin,

Вот мы плавно и приходим к очереди задач и пулу потоков
...
Рейтинг: 0 / 0
53 сообщений из 53, показаны все 3 страниц
Форумы / Java [игнор отключен] [закрыт для гостей] / Любителям поломать голову над многопоточным кодом
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]