powered by simpleCommunicator - 2.0.61     © 2026 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / Java [игнор отключен] [закрыт для гостей] / Любителям поломать голову над многопоточным кодом
25 сообщений из 53, страница 2 из 3
Любителям поломать голову над многопоточным кодом
    #38515544
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123забыл никПока пользователь заполняет следующую страницу, таск выполняется асинхронно.
а если первый таск - "Удалить и выйти"? ))
такие таски тоже есть, они выполняются по-другому, именно для них и нужен метод - ПодождиПокаВсеТаскиВыполнятся
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515545
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Краткое резюме.

Сейчас у меня есть блокирующая очередь для тасков, есть распределятор, есть лист с номерами выполняющихся тасков, все

накрыто одним локом, кроме блокирующего метода take() у очереди, в принципе можно было бы уйти от блокирующей очереди и опрашивать очередь в цикле, и накрыть это опрашивание тем же локом, тогда все будет корректно. Но CPU будет жечься немеренно(состояние когда тасков нет достаточно обычно), пока в раздумьях
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515551
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
ну да и конечено есть экзекьютор, который выполняет таски и ему все фиолетово. По сути вся логика хитрая именно в распределяторе.
Проблема в методе ПодождиКомплитВсехТасков - я под локом проверяю - есть ли в очереди задачи для такого айди и выполняется ли что-нибудь для айди, пока эти условия соблюдаются - я жду, если нет - сразу выхожу. Проблема в том, что метод take() вне лока, то есть может случиться что ничего не выполняется для айди, распределятор выгреб таск из очереди, но не взял лок, чтобы положить айли в лист выполняющихся. Фейл
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515554
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никПо сути вся логика хитрая именно в распределяторе.
ну например,
ВИ №1
- шаг1 - Ввести адрес = проверка веб сервисом наличия города
- шаг2 - если города нет, то форма ввода города. Если есть, то возврат его ID.

Это классический визард. И как тут решать в потоках?
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515564
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123забыл никПо сути вся логика хитрая именно в распределяторе.
ну например,
ВИ №1
- шаг1 - Ввести адрес = проверка веб сервисом наличия города
- шаг2 - если города нет, то форма ввода города. Если есть, то возврат его ID.

Это классический визард. И как тут решать в потоках?

Нет, не до конца понятно обьяснил наверное. То что на странице - никого не колышет, все разруливается javascriptом. Вызовов сторонних сервисов нет. Сервис вызывается только если форма отвалидирована и все ок, этот сервис просто сохраняет ответы юзера, но работает долго, поэтому нужна асинхронность. Про веб можешь вообще забыть, речь идет о бэкэнде
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515572
cdtyjv
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никcdtyjvЭто бага.

Все равно есть проблема. Смотрите, в вашем варианте пришел таск с айди=1 например, стал выполняться (тупо спит 20 сек), через 2с приходит новый таск с айди =1, в вашем коде он реджектнется. А мне нужно чтобы он выполнился сразу после того как выполнится первый. Если пока выполняется первый пришли еще таски с айди =1, то должен выполниться только последний.
Я говорю, задача далеко не такая тривиальная как кажется. Вот Я буквально добавил пару сообщений логирования и метод мэйн -
Просто запустите

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TaskBlocking {
    /** Монитор. */
    private static final Object mux = new Object();
    
    /** Задачи. */
    private static final Map<Integer, TaskRunnable> tasks = new HashMap<Integer, TaskRunnable>();
    
    static ExecutorService e = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
    	Task t = new Task();
    	Task t2 = new Task();
    	addTask(1, t);
    	addTask(1, t2);
    }
    
    /**
     * Добавить задачу.
     *
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(Integer id, Task task) {
        boolean isNew = false;

        TaskRunnable taskRun;
        System.out.println("Trying add task");
        synchronized (mux) {
        	System.out.println("Get lock");
            taskRun = tasks.get(id);

            if (taskRun != null) {
            	System.out.println("Task already exist");
                taskRun.addTask(task);
            }
            else {
            	System.out.println("Put new task");
                taskRun = new TaskRunnable(id, task);

                TaskRunnable oldTaskRun = tasks.put(id, taskRun);
                
                assert oldTaskRun == null;
                
                isNew = true;
            }
        }
        System.out.println("Before end");
        if (isNew){
        	submit(taskRun);
        	System.out.println("Task submitted");
        }
        System.out.println("Add finished");
    }

    /**
     * Подождать окончания выполнения задач для данного идентификатора.
     * 
     * @param id Идентификатор.
     * @throws InterruptedException В случае прерывания.
     */
    public static void awaitCompletion(Integer id) throws InterruptedException {
        TaskRunnable taskRun;
        
        synchronized (mux) {
            taskRun = tasks.get(id);
            
            if (taskRun == null)
                return; // Нет активных задач.
        }
        
        taskRun.awaitCompletion();
    }

    /**
     * Отдать TaskRunnable в Executor. 
     * 
     * @param taskRun TaskRunnable. 
     */
    private static void submit(TaskRunnable taskRun) {
    	e.submit(taskRun);
        /*
            Тут кладете Runnable в Executor. Делаем это вне synchronized, так как в зависимости от конфигурации пула,
            задача может начать выполнение в этом же потоке. 
            Как быть с RejectedExecutionException, решайте сами.
        */
    }
    
    /**
     * Класс, выполняющий задачи.
     */
    private static class TaskRunnable implements Runnable {
        /** Идентификатор. */
        private final int id;

        /** Latch, означающий окончание выполнение задач для текушего id. */
        private final CountDownLatch completeLatch = new CountDownLatch(1);
        
        /** Текущая задача. */
        private Task task;

        /**
         * Конструктор.
         * 
         * @param id Идентификатор.
         * @param task Задача.
         */
        public TaskRunnable(int id, Task task) {
            assert Thread.holdsLock(mux); // Должны быть внутри synchronized для видимости.
            
            this.id = id;
            this.task = task;
        }

        /**
         * Добавить следующую задачу.
         * 
         * @param task Следующая задача.
         */
        public void addTask(Task task) {
            synchronized (mux) {
                this.task = task;
            }
        }
        
        /** {@inheritDoc} */
        @Override 
        public void run() {
            while (true) {
                Task task0;
                
                synchronized (mux) {
                    task0 = task;
                    
                    if (task0 == null) {
                        // Все, больше нет задач в очереди.
                        tasks.remove(id);
                        
                        completeLatch.countDown(); 
                        
                        return;
                    }
                    else
                        task = null; // Занулим следующую задачу.
                }
                
                try {
                    task0.run(); // Выполнить задачу.
                }
                catch (Exception e) {
                    // Обработка на ваше усмотрение.
                }
            }
        }

        /**
         * Подождать окончания выполнения задачи.
         *
         * @throws InterruptedException В случае прерывания.
         */
        public void awaitCompletion() throws InterruptedException {
            completeLatch.await();
        }
        

    }

    /**
     * Ваша задача.
     */
    private static class Task implements Runnable {
        /** {@inheritDoc} */
        @Override 
        public void run() {
            System.out.println("Task started");
            try {
				Thread.sleep(5000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
            System.out.println("Task stopped");
        }
    }
}


Ну здрасти :-) Вы сами сказали, что более новая задача должна перезаписывать более старые, если они еще не начались. Именно это здесь и происходит, и никакого реджекта там нет. Запустите код ниже сначала как есть, а потом с раскомментированной строкой в методе main:
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
public class TaskBlocking {
    /** Монитор. */
    private static final Object mux = new Object();

    /** Задачи. */
    private static final Map<Integer, TaskRunnable> tasks = new HashMap<Integer, TaskRunnable>();

    static ExecutorService e = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws Exception {
        addTask(1, new Task(1));

        // Thread.sleep(200); // Расскомментируйте, и увидите, что выполнится и первый, и второй.

        addTask(1, new Task(2));
    }

    /**
     * Добавить задачу.
     *
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(Integer id, Task task) {
        boolean isNew = false;

        TaskRunnable taskRun;
        System.out.println("Trying add task");
        synchronized (mux) {
            System.out.println("Get lock");
            taskRun = tasks.get(id);

            if (taskRun != null) {
                System.out.println("Task already exist");
                taskRun.addTask(task);
            }
            else {
                System.out.println("Put new task");
                taskRun = new TaskRunnable(id, task);

                TaskRunnable oldTaskRun = tasks.put(id, taskRun);

                assert oldTaskRun == null;

                isNew = true;
            }
        }
        System.out.println("Before end");
        if (isNew){
            submit(taskRun);
            System.out.println("Task submitted");
        }
        System.out.println("Add finished");
    }

    /**
     * Подождать окончания выполнения задач для данного идентификатора.
     *
     * @param id Идентификатор.
     * @throws InterruptedException В случае прерывания.
     */
    public static void awaitCompletion(Integer id) throws InterruptedException {
        TaskRunnable taskRun;

        synchronized (mux) {
            taskRun = tasks.get(id);

            if (taskRun == null)
                return; // Нет активных задач.
        }

        taskRun.awaitCompletion();
    }

    /**
     * Отдать TaskRunnable в Executor.
     *
     * @param taskRun TaskRunnable.
     */
    private static void submit(TaskRunnable taskRun) {
        e.submit(taskRun);
        /*
            Тут кладете Runnable в Executor. Делаем это вне synchronized, так как в зависимости от конфигурации пула,
            задача может начать выполнение в этом же потоке.
            Как быть с RejectedExecutionException, решайте сами.
        */
    }

    /**
     * Класс, выполняющий задачи.
     */
    private static class TaskRunnable implements Runnable {
        /** Идентификатор. */
        private final int id;

        /** Latch, означающий окончание выполнение задач для текушего id. */
        private final CountDownLatch completeLatch = new CountDownLatch(1);

        /** Текущая задача. */
        private Task task;

        /**
         * Конструктор.
         *
         * @param id Идентификатор.
         * @param task Задача.
         */
        public TaskRunnable(int id, Task task) {
            assert Thread.holdsLock(mux); // Должны быть внутри synchronized для видимости.

            this.id = id;
            this.task = task;
        }

        /**
         * Добавить следующую задачу.
         *
         * @param task Следующая задача.
         */
        public void addTask(Task task) {
            synchronized (mux) {
                this.task = task;
            }
        }

        /** {@inheritDoc} */
        @Override
        public void run() {
            while (true) {
                Task task0;

                synchronized (mux) {
                    task0 = task;

                    if (task0 == null) {
                        // Все, больше нет задач в очереди.
                        tasks.remove(id);

                        completeLatch.countDown();

                        return;
                    }
                    else
                        task = null; // Занулим следующую задачу.
                }

                try {
                    task0.run(); // Выполнить задачу.
                }
                catch (Exception e) {
                    // Обработка на ваше усмотрение.
                }
            }
        }

        /**
         * Подождать окончания выполнения задачи.
         *
         * @throws InterruptedException В случае прерывания.
         */
        public void awaitCompletion() throws InterruptedException {
            completeLatch.await();
        }


    }

    /**
     * Ваша задача.
     */
    private static class Task implements Runnable {

        public final int id;

        private Task(int id) {
            this.id = id;
        }

        /** {@inheritDoc} */
        @Override
        public void run() {
            System.out.println("Task started [id=" + id + ']');
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("Task stopped [id=" + id + ']');
        }
    }
}

...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515577
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никПро веб можешь вообще забыть, речь идет о бэкэнде
дело твоё. Я не понимаю, как можно писать код бэкенд без оглядки на код клиента. И наоборот.
Без разницы на чём он написан. ВИ или прецендент - он вообще без кода. Но потоковость работы очень сильно от него зависит.
Ты упорно выбрал себе "Таск" как метод решения.
То веб, то не веб....то визард, но ОДНА форма.
Может просто ты бэкенд-программист, но вероятно есть более простое решение (с участием клиента).
IMHO
Удачи!
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515583
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123забыл никПро веб можешь вообще забыть, речь идет о бэкэнде
дело твоё. Я не понимаю, как можно писать код бэкенд без оглядки на код клиента. И наоборот.
Без разницы на чём он написан. ВИ или прецендент - он вообще без кода. Но потоковость работы очень сильно от него зависит.
Ты упорно выбрал себе "Таск" как метод решения.
То веб, то не веб....то визард, но ОДНА форма.
Может просто ты бэкенд-программист, но вероятно есть более простое решение (с участием клиента).
IMHO
Удачи!

Емае, клиент сайд тут вообще не при чем, приложение уже год в продакшене, все ок, и все разруливается. Проблема была - долгое ожидание пользователей, вот она и решается, для этого сохранение формы решено перенести в асинхронный режим, клиент сайд как работал так и продолжает работать.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515584
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
cdtyjvНу здрасти :-) Вы сами сказали, что более новая задача должна перезаписывать более старые, если они еще не начались. Именно это здесь и происходит, и никакого реджекта там нет. Запустите код ниже сначала как есть, а потом с раскомментированной строкой в методе main:

Да, убедили, я был не прав. Согласен, что решение гораздо лучше моего. Пока вижу одну проблему, но некритичную на данный момент, пока еще гоняю тесты, но вроде все требования соблюдены.

А проблема такая - Представим что одновременно визард проходят 11 юезров(на 1 больше чем в экзекьюторе). Проходят активно, то есть пока таск выполняется они успевают заполнить еще одну страницу. 10 юзеров будут в шоколаде, а 11 получит starvation. Ситуация малореальная с нашей текущей нагрузкой, но вдруг? Что скажете?
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515589
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
OFF
забыл никЕмае
Емае.
Напиши более конкретный ВИ.
Иначе по твоему тексту подойдёт AJAX вызов на JS асинхронно с клиента с доп.текстом "На сервер ВСЁ отправлено".....ожидать не надо.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515591
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Petro123OFF
забыл никЕмае
Емае.
Напиши более конкретный ВИ.
Иначе по твоему тексту подойдёт AJAX вызов на JS асинхронно с клиента с доп.текстом "На сервер ВСЁ отправлено".....ожидать не надо.
Блин я твоя не понимать, примерно так все и будет работать, как ты написал, только к чему это? Клиент готов и в него никаких правок не планируется, а вот логика перехода между страницами на бэкэнде поменяется, раньше сохранение было синхронным, а теперь асинхронное, как это сделать с некоторыми рестрикшенами и был разговор, а ты о чем?
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515593
Фотография Petro123
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник,
я спрашивал выше про:
"и чем данная задача отличается от классики - "Набор товаров в корзину"? И потом, один Run - купить?"
Которая решена ещё при Иване Грозном?
Не увидел или не понял твой ответ.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515594
cdtyjv
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник ,
Ну вообще это достаточно серьезный косяк. Решений несколько, и все они разрливаются через мануальное создание пула.
1) Использовать ThreadPoolExecutor.CallerRunsPolicy. Это самый простой вариант - если не получается выполнить асинхронно, то выполнить синхронно. Но не факт, что он вас устроит по двум причинам:
- Долгий ответ клиенту, от которого хотелось избавиться.
- Starvation никуда не делся, а просто переместился в другое место - теперь у вас может застарвиться один клиент, если он в первом окне браузера начал синхронное выполнение, а во втором окне постоянно тыкает вперед-назад. В этом случае, вы рискуете в первом окне браузера так и не получить ответ :-)
В общем, сомнительное решение.

2) Более заморочный вариант:
2.1) Создать руками BlockingQueue.
2.2) Передать его в ThreadPoolExecutor.
2.3) Класть задачи не в ThreadPoolExecutor, а напрямую в BlockingQueue.
2.4) Если какой-то TaskRunnable слишком долго выполняется, то форсировать его завершение, и класть в конец этой очереди.
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
222.
223.
224.
225.
226.
227.
228.
229.
230.
public class TaskBlockingAndQueued {
    /** Монитор. */
    private static final Object mux = new Object();

    /** Задачи. */
    private static final Map<Integer, TaskRunnable> tasks = new HashMap<Integer, TaskRunnable>();

    /** Очередь задач. */
    private static final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

    /** Пул. */
    private static final ExecutorService e = new ThreadPoolExecutor(8, 8, 1000L, TimeUnit.MILLISECONDS, taskQueue);

    public static void main(String[] args) throws Exception {
        addTask(1, new Task(1));

        //Thread.sleep(200); // Расскомментируйте, и увидите, что выполнится и первый, и второй.

        addTask(1, new Task(2));

        e.shutdown();
    }

    /**
     * Добавить задачу.
     *
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(Integer id, Task task) {
        boolean isNew = false;

        TaskRunnable taskRun;

        synchronized (mux) {
            taskRun = tasks.get(id);

            if (taskRun != null)
                taskRun.addTask(task);
            else {
                System.out.println("Put new task");
                taskRun = new TaskRunnable(id, task);

                TaskRunnable oldTaskRun = tasks.put(id, taskRun);

                assert oldTaskRun == null;

                isNew = true;
            }
        }

        if (isNew)
            submit(taskRun);
    }

    /**
     * Подождать окончания выполнения задач для данного идентификатора.
     *
     * @param id Идентификатор.
     * @throws InterruptedException В случае прерывания.
     */
    public static void awaitCompletion(Integer id) throws InterruptedException {
        TaskRunnable taskRun;

        synchronized (mux) {
            taskRun = tasks.get(id);

            if (taskRun == null)
                return; // Нет активных задач.
        }

        taskRun.awaitCompletion();
    }

    /**
     * Отдать TaskRunnable в Executor.
     *
     * @param taskRun TaskRunnable.
     */
    private static void submit(TaskRunnable taskRun) {
        try {
            taskQueue.put(taskRun);
        }
        catch (InterruptedException e) {
            // Handle.
        }
    }

    /**
     * Класс, выполняющий задачи.
     */
    private static class TaskRunnable implements Runnable {
        /** Идентификатор. */
        private final int id;

        /** Latch, означающий окончание выполнение задач для текушего id. */
        private final CountDownLatch completeLatch = new CountDownLatch(1);

        /** Текущая задача. */
        private Task task;

        /**
         * Конструктор.
         *
         * @param id Идентификатор.
         * @param task Задача.
         */
        public TaskRunnable(int id, Task task) {
            assert Thread.holdsLock(mux); // Должны быть внутри synchronized для видимости.

            this.id = id;
            this.task = task;
        }

        /**
         * Добавить следующую задачу.
         *
         * @param task Следующая задача.
         */
        public void addTask(Task task) {
            synchronized (mux) {
                this.task = task;
            }
        }

        /** {@inheritDoc} */
        @Override
        public void run() {
            while (true) {
                boolean proceed = false;

                Task task0;

                synchronized (mux) {
                    task0 = task;

                    if (task0 == null) {
                        // Все, больше нет задач в очереди.
                        tasks.remove(id);

                        completeLatch.countDown();

                        return;
                    }
                    else {
                        if (canProceed()) {
                            proceed = true;

                            task = null; // Занулим следующую задачу.
                        }
                    }
                }

                if (proceed) {
                    try {
                        task0.run(); // Выполнить задачу.
                    }
                    catch (Exception e) {
                        // Обработка на ваше усмотрение.
                    }
                }
                else {
                    returnToQueue();
                    
                    return;
                }
            }
        }

        /**
         * Подождать окончания выполнения задачи.
         *
         * @throws InterruptedException В случае прерывания.
         */
        public void awaitCompletion() throws InterruptedException {
            completeLatch.await();
        }

        /**
         * Проверить, можно ли продолжать выполнение задач, или надо вернуть объект в конец очереди.
         *
         * @return {@code true}, если
         */
        private boolean canProceed() {
            // Ваша имплементация.
            return true;
        }

        /**
         * Вернуть задачу в очередь.
         */
        private void returnToQueue() {
            // Ваша исплементация, напр.:
            try {
                taskQueue.put(this);
            }
            catch (InterruptedException e) {
                // Handle.
            }
        }
        
    }

    /**
     * Ваша задача.
     */
    private static class Task implements Runnable {

        private final int id;

        private Task(int id) {
            this.id = id;
        }

        /** {@inheritDoc} */
        @Override
        public void run() {
            System.out.println("Task started [id=" + id + "]");

            try {
                Thread.sleep(2000);
            }
            catch (InterruptedException e) {
                // Handle.
            }

            System.out.println("Task finished [id=" + id + "]");
        }
    }
}



P.S.: Ну и помните дисклеймер про наличие багов. Это не production code, а демонстрация идеи.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515607
mvn3
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
забыл ник,

Вы знаете, у меня когда-то тоже была подобная проблема, унаследованная система, куча кода (вызов soap-сервиса из jsf-бина был довольно долгий после 6-ти шагов визарда). Правда, долго (около 1 минуты ) выполнялся только последний шаг (возможно это у Вас не так).

Так вот, в результате, в голове тоже начались появляться куча мыслей там, асинхронность...., пулы, оптимизации и прочие технические пакости. Все это естественно требовало время, тем более что код не мой. Так вот, ломая голову, ко мне подошел чел из UX и предложил очень простой вариант: давать фид-бек пользователю о состоянии выполнения запроса.

Т.е. юзер кликает финиш, и вместо того чтобы страница "зависала" на целую минуту, выводилось просто окно состояния выполнения операции (с апдейтом в каждые 7-8 сек.) т.к. "самое страшное для нас это не знание" (С) не помню кто)

Например:

1) Подготовка запроса
2) Передача данных
3) Обработка данных
ну и т.д.

Для кого-то это может показаться смешным, однако это работает, причем работает очень хорошо.
После этого использовал этот метод еще пару раз (Естественно главное не перегнуть палку ;) )
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515608
kagax
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
А использовать для этого quartz не получится?
тынц

Очередная кнопка Next пользователя в другой jvm случайно не может быть обработана по какой-нибудь причине?
А так все ж fail-over будет.

Правда в БД придется таблички под это создавать, но как вариант рассмотреть quartz?
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515628
Alex Kuznetsov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
cdtyjv забыл ник ,
...
2) Более заморочный вариант
2.1) Создать руками BlockingQueue.
2.2) Передать его в ThreadPoolExecutor.
2.3) Класть задачи не в ThreadPoolExecutor, а напрямую в BlockingQueue.
2.4) Если какой-то TaskRunnable слишком долго выполняется, то форсировать его завершение, и класть в конец этой очереди.
...Практически тот вариант, который я предложил...

И ещё вопрос на понимание к забыл ник , а вообще-то, по определению, визард предполагает исполнение задач уже после того как все данные в ходе его прохождения будут получены... может рассмотреть вариант набора данных в ходе прохождения визарда и только по нажатию кнопки "Готово" запускать на выполнение задачи?
Ну и соответственно, исходя из условий задачи, (ну и здравой логики), не давать выполняться новым задачам от одного и того-же пользователя, пока предыдущие не будут выполнены, а то получается что он может сначала запустить что-то на создание информации в базе, а затем следом на изменение, при этом создание ещё не завершилось, а изменение не понятно чего уже началось => неконсистентность данных...
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38515701
cdtyjv
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник ,
Я тут подумал - не катит мое решение. Не могу толком сказать почему, но прям вот нутром чую, что это засада какая-то. Более того, у нас тут есть еще один источник starvation - это метод awaitCompletion. Если юзер кликнул F5 в одном окне, а в другом постоянно плодит новые задачи, то мы никогда не выйдем из awaitCompletion, то есть рефреш по F5 никогда не завершится.
Я предлагаю поменять данное требование: ждать не окончание всех задач для данного id, а окончание той задачи, которая была последней на момент клика на F5. Тогда решение может выглядеть так (помним про баги):
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
public class TestBlocking {
    /** Монитор. */
    private static final Object mux = new Object();

    /** Задачи. */
    private static final Map<Integer, Task> taskMap = new HashMap<>();

    /**
     * Добавить задачу.
     *
     * @param id Идентификатор.
     * @param task Задача.
     */
    public static void addTask(int id, Task task) {
        synchronized (mux) {
            Task previousTask = taskMap.get(id);

            if (previousTask != null) {
                boolean cancelled = previousTask.tryCancel(); // Пробуем отменить последнюю задачу.

                if (cancelled)
                    // Получилось, сохраним референс на текущую активную задачу.
                    task.setPreviousTask(previousTask.getPreviousTask());
                else
                    // Не получилось, значит последняя задача является активной.
                    task.setPreviousTask(previousTask);
            }

            taskMap.put(id, task);
        }

        try {
            // Каждая новая задача идет в отдельном Runnable.
            submit(task);
        }
        catch (RejectedExecutionException e) {
            // Обработать.
        }
    }

    /**
     * Подождать завершения последней поступившей задачи.
     *
     * @param id Идентификатор.
     * @throws InterruptedException Если произошло прерывание.
     */
    public static void awaitCompletion(int id) throws InterruptedException {
        Task task;

        synchronized (mux) {
            task = taskMap.get(id);
        }

        // Вновь поступающие задачи не влияют на завершение данной задачи. Starvation отсутствует.
        if (task != null)
            task.awaitCompletion();
    }

    /**
     * Отдать задачу в Executor.
     *
     * @param task Задача.
     */
    private static void submit(Task task) {
        // ...
    }

    /**
     * Задача.
     */
    private class Task implements Runnable {
        /** Идентификатор. */
        private final int id;

        /** Latch, сигнализирующий окончание работы задачи. */
        private final CountDownLatch completeLatch = new CountDownLatch(1);

        /** Предыдущая задача, которая была активна в момент добавления данной задачи. */
        private Task previousTask;

        /** Флаг начала выполнения задачи. */
        private boolean started;

        /** Флаг отмены задачи. */
        private boolean cancelled;

        /**
         * @param id Идентификатор.
         */
        private Task(int id) {
            this.id = id;
        }

        /**
         * Получить предыдущую задачу.
         *
         * @return Предыдущая задача.
         */
        private Task getPreviousTask() {
            synchronized (mux) {
                return previousTask;
            }
        }

        /**
         * Установить предыдущую задачу.
         *
         * @param previousTask Предыдущая задача.
         */
        private void setPreviousTask(Task previousTask) {
            synchronized (mux) {
                assert this.previousTask == null;

                this.previousTask = previousTask;
            }
        }

        /**
         * Подождать окончания работы задачи.
         *
         * @throws InterruptedException В случае прерывания.
         */
        public void awaitCompletion() throws InterruptedException {
            Task previousTask0;

            synchronized (mux) {
                previousTask0 = previousTask;
            }

            if (previousTask0 != null)
                previousTask0.awaitCompletion();

            completeLatch.await();
        }

        /**
         * Попробовать отменить задачу.
         *
         * @return {@code true}, если задача была отменена; {@code false}, если задачу нельзя отменить, так как она
         *     уже начала выполняться.
         */
        public boolean tryCancel() {
            synchronized (mux) {
                if (started)
                    return false; // Уже нельзя отменить.
                else {
                    cancelled = true;

                    completeLatch.countDown();

                    return true; // Успели отменить.
                }
            }
        }

        /** {@inheritDoc} */
        @Override
        public void run() {
            Task previousTask;

            synchronized (mux) {
                if (cancelled)
                    return;

                started = true;

                previousTask = this.previousTask;

                this.previousTask = null; // Что бы избежать memory leak.
            }

            try {
                previousTask.awaitCompletion();

                run0();
            }
            catch (InterruptedException e) {
                // Обработать.
            }
            finally {
                synchronized (mux) {
                    Task task0 = taskMap.get(id);

                    if (task0 == this) // Что бы не удалить более новую задачу.
                        taskMap.remove(id);

                    completeLatch.countDown();
                }
            }
        }

        /**
         * Логика задачи.
         */
        private void run0() {
            // ...
        }
    }
}

...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516023
Alexey Tomin
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл никЕсть визард, юзер заполнил страницу, нажал некст - по бизнес-логике мне надо заслать данные в веб-сервис, но он калечный и время выполнения 5-20 сек, которые юзер вынужден ждать, прежде чем ему покажется новый скрин(оно и так на аяксе работает).
Так как страница 2 содержит в скрытых полях и данные с первой страницы, и так далее - то пришла идея - а зачем мучать пользователя, то есть мы ацептаем данные, ложим их в какую-то очередь, рендерим следующую страницу. Пока пользователь заполняет следующую страницу, таск выполняется асинхронно.

Т.е. есть должна быть hashmap<ID пользователя, состояние мастера>
Когда приходит submit с формы смотрим, что там с сохранённым состоянием.
Если пришёл следующий к сохранённому шаг- всё хорошо, дополняем данными. Если нет- что-то там говорим.
Вызов сервиса- наверное в отдельных сервисных потоках.
--------------
Хотя вообще интересно- первоначально было
" То есть если выполняется таск для 1 страницы, в это время пришли со 2 и с 3, надо сделать так чтобы 2 вообще не выполнялся, а сразу 3(когда закончится таки 1)."
А что если сервис звать только для шага 4, а до этого копить данные?
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516087
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
cdtyjv
2) Более заморочный вариант:
2.1) Создать руками BlockingQueue.
2.2) Передать его в ThreadPoolExecutor.
2.3) Класть задачи не в ThreadPoolExecutor, а напрямую в BlockingQueue.
2.4) Если какой-то TaskRunnable слишком долго выполняется, то форсировать его завершение, и класть в конец этой очереди.

Вот что-то такое я и пытался сделать первоначально
cdtyjvP.S.: Ну и помните дисклеймер про наличие багов. Это не production code, а демонстрация идеи.
угу, это все понятно.

В общем ладно, я видел ваш последний вариант, пока занят другими делами, думаю в течение недели дойдут руки посмотреть, потом отпишусь, на самом деле допилил свой вариант, но он выглядит коряво, если ваш работает корректно, покажу разработчикам, что им понятнее пусть и выбирают, им же поддерживать
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516090
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mvn3забыл ник,

Вы знаете, у меня когда-то тоже была подобная проблема, унаследованная система, куча кода (вызов soap-сервиса из jsf-бина был довольно долгий после 6-ти шагов визарда). Правда, долго (около 1 минуты ) выполнялся только последний шаг (возможно это у Вас не так).

Так вот, в результате, в голове тоже начались появляться куча мыслей там, асинхронность...., пулы, оптимизации и прочие технические пакости. Все это естественно требовало время, тем более что код не мой. Так вот, ломая голову, ко мне подошел чел из UX и предложил очень простой вариант: давать фид-бек пользователю о состоянии выполнения запроса.

Т.е. юзер кликает финиш, и вместо того чтобы страница "зависала" на целую минуту, выводилось просто окно состояния выполнения операции (с апдейтом в каждые 7-8 сек.) т.к. "самое страшное для нас это не знание" (С) не помню кто)

Например:

1) Подготовка запроса
2) Передача данных
3) Обработка данных
ну и т.д.

Для кого-то это может показаться смешным, однако это работает, причем работает очень хорошо.
После этого использовал этот метод еще пару раз (Естественно главное не перегнуть палку ;) )

Все правильно, только в момем случае юзерам наплевать на статус задачи, для них просто нужно чтобы было быстро и корректно, они понятия не имеют что там вызываются какие-то веб-сервисы и тп. Если бы нужно было следить за статусом, непременно делал бы что-то похожее на то что вы предложили.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516092
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
kagaxА использовать для этого quartz не получится?
тынц

Очередная кнопка Next пользователя в другой jvm случайно не может быть обработана по какой-нибудь причине?
А так все ж fail-over будет.

Правда в БД придется таблички под это создавать, но как вариант рассмотреть quartz?

а чем тут quartz помог бы? Разверните мысль, хотя в принципе скажу сразу - что у нас БД в принципе нет:) SOA мать ее
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516099
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Alex KuznetsovИ ещё вопрос на понимание к забыл ник , а вообще-то, по определению, визард предполагает исполнение задач уже после того как все данные в ходе его прохождения будут получены... может рассмотреть вариант набора данных в ходе прохождения визарда и только по нажатию кнопки "Готово" запускать на выполнение задачи?
Ну и соответственно, исходя из условий задачи, (ну и здравой логики), не давать выполняться новым задачам от одного и того-же пользователя, пока предыдущие не будут выполнены, а то получается что он может сначала запустить что-то на создание информации в базе, а затем следом на изменение, при этом создание ещё не завершилось, а изменение не понятно чего уже началось => неконсистентность данных...

Вопросы хорошие, но специфика задачи такова, что все эти места обдуманы, там все удет ок. А выполнять все по кнопке Готово нельзя - потому что юзер может нажать Ф5 на любой странице и тп.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516104
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Alexey TominА что если сервис звать только для шага 4, а до этого копить данные?
Нельзя, юзер может нажать ф5, плюс в любой момент нажать сейвЕкзит - и в этот момент опять надо ждать пока все таски выполнятся.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516136
kagax
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
забыл ника чем тут quartz помог бы? Разверните мысль, хотя в принципе скажу сразу - что у нас БД в принципе нет:) SOA мать ее


1) при нажатии на кнопку создаем новую job1 в quartz с идентификатором entityNumber (с указанием, что запускать нужно сразу)
2) при нажатии очередной раз кнопки "Next" создаем новую job2. После чего создаем ( JobChangedJobListener ), в котором указываем, что запускать job2 нужно после job1.

База данных не обязательна - она нужна только на тот случай, чтобы, если jvm остановится, то запланированные job-ы будут выполнены после нового запуска jvm. Ну, или их подхватит и довыполнит другая jvm из кластера.
...
Рейтинг: 0 / 0
Любителям поломать голову над многопоточным кодом
    #38516151
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
kagax
1) при нажатии на кнопку создаем новую job1 в quartz с идентификатором entityNumber (с указанием, что запускать нужно сразу)
2) при нажатии очередной раз кнопки "Next" создаем новую job2. После чего создаем ( JobChangedJobListener ), в котором указываем, что запускать job2 нужно после job1.


А что если пока выполняется таск 1 пришли таск2-4, мне нужно выполнить только 4. Ну и плюс нужна возможность ждать пока все джобы выполняться и тд. В общем теже яйца, вид сбоку, смысла тянуть либу нет никакого.
...
Рейтинг: 0 / 0
25 сообщений из 53, страница 2 из 3
Форумы / Java [игнор отключен] [закрыт для гостей] / Любителям поломать голову над многопоточным кодом
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]