powered by simpleCommunicator - 2.0.61     © 2026 Programmizd 02
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Форумы / Java [игнор отключен] [закрыт для гостей] / В неск. потоков
19 сообщений из 19, страница 1 из 1
В неск. потоков
    #39052002
JDS
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Есть задача, которую желательно распараллелить, т.е. нужно запускать в неск. потоков, сделал тестовый пример:
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
package threadstest;

class TestTask extends Thread {
    public TestTask() {
        super();
        ThreadsTest.curr_working_threads++;
    }

    @Override
    public void run()
    {  // чота наш поток как бы делает
        for (int i = 0; i < 11; i++) {
            System.out.println(this.getName() + ": step "+i);
            try {Thread.sleep(500);
            } catch (InterruptedException ex) {}
        }
        ThreadsTest.curr_working_threads--;
    }
}

public class ThreadsTest {
    public static volatile int curr_working_threads = 0; 

    public static void main(String[] args) throws InterruptedException {
       int max_working_threads = 10; // макс. кол-во потоков
       
       // условно есть задачи, которые надо запустить в обработку, но не по одной, 
       // а в указанное число потоков (некий пул потоков, ограниченный max_working_threads)
       String[] tasks = new String[20];
       for (int i = 0; i < 20; i++)
          tasks[i] = "Task " + i; // условно задача
       
       // собственно берем эти задачи и ставим на обработку
       for (String s: tasks) {
           // ставим сразу до max_working_threads
           if (curr_working_threads < max_working_threads) {
              TestTask tt = new TestTask();
              tt.setName(s);
              tt.start();
           }
           // прежде чем поставить следующую задачу, ждем пока освободится место в типа "пуле"
           while (curr_working_threads >= max_working_threads)
               try {Thread.sleep(1000);
               } catch (InterruptedException ex) {}
       }
    }
}


Все вроде нормально работает ), но т.к. нахожусь уровне ребенка, который из кубиков с буквами учится складывать слова, а то и вообще не понимает, что на кубиках буквы ) на вс. случай может кто подскажет, как правильно это реализуется (может стоит использовать какой-то готовый класс пула потоков, очереди и т.п.? По-моему как-то ненадежно выглядит "синхронизация" работы с счетчиком работающих потоков или volatile в данном случае достаточно (может лучше создать отдельный метод для инкре/декримента счетчика с синхронайзд, а то может вообще этот счетчик не нужен)?) В общем как? )
...
Рейтинг: 0 / 0
В неск. потоков
    #39052015
xifr
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
JDS,

1) Прочить про стандартные Executors и ThreadPoolExecutor.

2) Прочитать про то как нужно именовать свойства, имхо "curr_working_threads" - ужас!

3) Так делать нельзя
Код: java
1.
2.
ThreadsTest.curr_working_threads++;
ThreadsTest.curr_working_threads--;


И это "public static volatile int" - вам не поможет.
Почитай как нужно делать счётчик (и как это можно делать средствами и пакета java.util.concurrent.**)
И что делает и зачем нужен "volatile"
...
Рейтинг: 0 / 0
В неск. потоков
    #39052029
JDS
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
xifr , спасибо, направление понял ушел искать, читать.
По поводу имен переменных, соглсен, длинновато, но вроде не противоречит соглашению
"Except for variables, all instance, class, and class constants are in mixed case with a lowercase first letter. Internal words start with capital letters. Variable names should not start with underscore _ or dollar sign $ characters, even though both are allowed.
Variable names should be short yet meaningful. The choice of a variable name should be mnemonic- that is, designed to indicate to the casual observer the intent of its use. One-character variable names should be avoided except for temporary "throwaway" variables. Common names for temporary variables are i, j, k, m, and n for integers; c, d, and e for characters."

Ну может кроме Variable names should be short yet meaningful )
А народ пишет по-разному ):
30% number_favorite_news 12% NumberFavoriteNews 46% numberFavoriteNews 2% numberfavoritenews 2% nfn (мне _сейчас_ так удобнее) 8% Иной вариант. Мне диктует его язык разработки
...
Рейтинг: 0 / 0
В неск. потоков
    #39052040
xifr
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
JDSПо поводу имен переменных, соглсен, длинновато, но вроде не противоречит соглашению


Из ссылки:
переменные называем, как nameNameName
константы называем, как NAME_NAME_NAME (причем константы по смыслу, а не static объекты)

Про статистику, это скорее не про JAVA, а вообще в целом.
Например у PL/SQL программистов действительно, как правило переменные идут в стиле name_name_name
...
Рейтинг: 0 / 0
В неск. потоков
    #39052055
Фотография Blazkowicz
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Никогда не наследуйтесь от класса Thread кроме случаев острой необходимости. До появления java.util.concurrency (j.u.c) рекомендовали использовать Runnable.
http://docs.oracle.com/javase/tutorial/essential/concurrency/

Никогда не оставляйте пустых catch блоков.
Прочитайте определение volatile и атомарных операций. На том же вашем любимом хабре полно статей по этой теме.

Задачу в зависимости от требований можно решить тремя методами
1) Создать пул через Executors и кидать туда чего душе угодно
2) ForkJoinPool, если нужен Fork/Join
http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
3) ParallelStream, если задачу удобно решить через Stream API
https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html
...
Рейтинг: 0 / 0
В неск. потоков
    #39052063
Фотография Blazkowicz
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
JDS но вроде не противоречит соглашению
Противоречит.

JDS А народ пишет по-разному ):
[/quot]
Где там про Java?
...
Рейтинг: 0 / 0
В неск. потоков
    #39052086
JDS
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Blazkowicz , тоже спасибо, по части замечаний в курсе, а по большей части спасибо за инфу куда копать.
Про именование точно, это про веб похоже.
...
Рейтинг: 0 / 0
В неск. потоков
    #39052212
Atum1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
JDSЕсть задача, которую желательно распараллелить, т.е. нужно запускать в неск. потоков, сделал тестовый пример:


как вариант :

где

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

накидывает по одному заданию раз в секудну в очередь Queue<String> queue = new ConcurrentLinkedQueue<>();

(Задания можно брать из базы итд)

ExecutorService executorService = Executors.newFixedThreadPool(3);

в три потока разгребает нашу очередь ... в коде есть разные задержки , так же из потока возвращается Callable в котором можно передать какие то данные ...

while (true) - можно заменить на scheduler.scheduleWithFixedDelay - в котором выполнять задания в n потоков асинхронно.



Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Locale;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class WorkingThreads {

    private final static Queue<String> queue = new ConcurrentLinkedQueue<>();

    private final static ExecutorService executorService = Executors.newFixedThreadPool(3);

    private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public static void main(String[] args) throws InterruptedException {

        //кладем задния в очередь 1 раз в секунду
        Runnable runnable = () -> {
            queue.add(LocalDateTime.now().format(DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss", Locale.getDefault())));
            System.out.println(queue);
        };

        scheduler.scheduleWithFixedDelay(runnable, 3000, 1000, TimeUnit.MILLISECONDS);

        //разгребаем очередь задний в три потока 
        while (true) {

            //Ничего не делаем 3.5  секунды
            
            Thread.sleep(3500 + ThreadLocalRandom.current().nextInt(2000));

            // асинхронно в 3 потока разгребам очередь 
            for (final String task : queue) {

                executorService.submit(new Callable<Boolean>() {

                    @Override
                    public Boolean call() throws Exception {
                        Thread.sleep(500 + ThreadLocalRandom.current().nextInt(1000));
                        System.out.println(String.format("Успешно обработали задние %s в %s потоке ", task, Thread.currentThread().getName() ) );
                        // удалили его из очереди
                        Boolean isOk = queue.remove(task);
                        // вернули результат обрбаотка
                        return isOk;
                    }
                });

            }
            System.out.println(String.format("осталось  queue.size() %s", queue.size()));

        }

        //executorService.awaitTermination(10L, TimeUnit.SECONDS);
        //executorService.shutdown();
    }

}



Вывод :


Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
[15.09.2015 18:56:16]
queue.size() 1
[15.09.2015 18:56:16, 15.09.2015 18:56:17]
Успешно обработали задние 15.09.2015 18:56:16 в pool-1-thread-1 потоке 
[15.09.2015 18:56:17, 15.09.2015 18:56:18]
[15.09.2015 18:56:17, 15.09.2015 18:56:18, 15.09.2015 18:56:19]
[15.09.2015 18:56:17, 15.09.2015 18:56:18, 15.09.2015 18:56:19, 15.09.2015 18:56:20]
[15.09.2015 18:56:17, 15.09.2015 18:56:18, 15.09.2015 18:56:19, 15.09.2015 18:56:20, 15.09.2015 18:56:21]
queue.size() 5
[15.09.2015 18:56:17, 15.09.2015 18:56:18, 15.09.2015 18:56:19, 15.09.2015 18:56:20, 15.09.2015 18:56:21, 15.09.2015 18:56:22]
Успешно обработали задние 15.09.2015 18:56:19 в pool-1-thread-1 потоке 
Успешно обработали задние 15.09.2015 18:56:18 в pool-1-thread-3 потоке 
Успешно обработали задние 15.09.2015 18:56:17 в pool-1-thread-2 потоке 
[15.09.2015 18:56:20, 15.09.2015 18:56:21, 15.09.2015 18:56:22, 15.09.2015 18:56:23]
Успешно обработали задние 15.09.2015 18:56:20 в pool-1-thread-1 потоке 
Успешно обработали задние 15.09.2015 18:56:21 в pool-1-thread-3 потоке 
[15.09.2015 18:56:22, 15.09.2015 18:56:23, 15.09.2015 18:56:24]
[15.09.2015 18:56:22, 15.09.2015 18:56:23, 15.09.2015 18:56:24, 15.09.2015 18:56:25]
[15.09.2015 18:56:22, 15.09.2015 18:56:23, 15.09.2015 18:56:24, 15.09.2015 18:56:25, 15.09.2015 18:56:26]
queue.size() 5
[15.09.2015 18:56:22, 15.09.2015 18:56:23, 15.09.2015 18:56:24, 15.09.2015 18:56:25, 15.09.2015 18:56:26, 15.09.2015 18:56:27]
Успешно обработали задние 15.09.2015 18:56:24 в pool-1-thread-3 потоке 
Успешно обработали задние 15.09.2015 18:56:22 в pool-1-thread-2 потоке 
[15.09.2015 18:56:23, 15.09.2015 18:56:25, 15.09.2015 18:56:26, 15.09.2015 18:56:27, 15.09.2015 18:56:28]
Успешно обработали задние 15.09.2015 18:56:23 в pool-1-thread-1 потоке 
Успешно обработали задние 15.09.2015 18:56:26 в pool-1-thread-2 потоке 
Успешно обработали задние 15.09.2015 18:56:25 в pool-1-thread-3 потоке 
[15.09.2015 18:56:27, 15.09.2015 18:56:28, 15.09.2015 18:56:29]
[15.09.2015 18:56:27, 15.09.2015 18:56:28, 15.09.2015 18:56:29, 15.09.2015 18:56:30]
queue.size() 4
[15.09.2015 18:56:27, 15.09.2015 18:56:28, 15.09.2015 18:56:29, 15.09.2015 18:56:30, 15.09.2015 18:56:31]
Успешно обработали задние 15.09.2015 18:56:28 в pool-1-thread-2 потоке 
Успешно обработали задние 15.09.2015 18:56:29 в pool-1-thread-3 потоке 
Успешно обработали задние 15.09.2015 18:56:27 в pool-1-thread-1 потоке 
[15.09.2015 18:56:30, 15.09.2015 18:56:31, 15.09.2015 18:56:32]
Успешно обработали задние 15.09.2015 18:56:30 в pool-1-thread-2 потоке 
[15.09.2015 18:56:31, 15.09.2015 18:56:32, 15.09.2015 18:56:33]
[15.09.2015 18:56:31, 15.09.2015 18:56:32, 15.09.2015 18:56:33, 15.09.2015 18:56:34]
queue.size() 4
[15.09.2015 18:56:31, 15.09.2015 18:56:32, 15.09.2015 18:56:33, 15.09.2015 18:56:34, 15.09.2015 18:56:35]
Успешно обработали задние 15.09.2015 18:56:32 в pool-1-thread-1 потоке 
Успешно обработали задние 15.09.2015 18:56:33 в pool-1-thread-2 потоке 
[15.09.2015 18:56:31, 15.09.2015 18:56:34, 15.09.2015 18:56:35, 15.09.2015 18:56:36]
Успешно обработали задние 15.09.2015 18:56:31 в pool-1-thread-3 потоке 
Успешно обработали задние 15.09.2015 18:56:34 в pool-1-thread-1 потоке 
[15.09.2015 18:56:35, 15.09.2015 18:56:36, 15.09.2015 18:56:37]
[15.09.2015 18:56:35, 15.09.2015 18:56:36, 15.09.2015 18:56:37, 15.09.2015 18:56:38]
[15.09.2015 18:56:35, 15.09.2015 18:56:36, 15.09.2015 18:56:37, 15.09.2015 18:56:38, 15.09.2015 18:56:39]
queue.size() 5
[15.09.2015 18:56:35, 15.09.2015 18:56:36, 15.09.2015 18:56:37, 15.09.2015 18:56:38, 15.09.2015 18:56:39, 15.09.2015 18:56:40]
Успешно обработали задние 15.09.2015 18:56:36 в pool-1-thread-3 потоке 
Успешно обработали задние 15.09.2015 18:56:37 в pool-1-thread-1 потоке 
Успешно обработали задние 15.09.2015 18:56:35 в pool-1-thread-2 потоке 
Успешно обработали задние 15.09.2015 18:56:38 в pool-1-thread-3 потоке 
[15.09.2015 18:56:39, 15.09.2015 18:56:40, 15.09.2015 18:56:41]
Успешно обработали задние 15.09.2015 18:56:39 в pool-1-thread-1 потоке 
[15.09.2015 18:56:40, 15.09.2015 18:56:41, 15.09.2015 18:56:42]
[15.09.2015 18:56:40, 15.09.2015 18:56:41, 15.09.2015 18:56:42, 15.09.2015 18:56:43]
[15.09.2015 18:56:40, 15.09.2015 18:56:41, 15.09.2015 18:56:42, 15.09.2015 18:56:43, 15.09.2015 18:56:44]
queue.size() 5
Успешно обработали задние 15.09.2015 18:56:41 в pool-1-thread-3 потоке 
[15.09.2015 18:56:40, 15.09.2015 18:56:42, 15.09.2015 18:56:43, 15.09.2015 18:56:44, 15.09.2015 18:56:45]
Успешно обработали задние 15.09.2015 18:56:40 в pool-1-thread-2 потоке 
Успешно обработали задние 15.09.2015 18:56:42 в pool-1-thread-1 потоке 
Успешно обработали задние 15.09.2015 18:56:43 в pool-1-thread-3 потоке 
[15.09.2015 18:56:44, 15.09.2015 18:56:45, 15.09.2015 18:56:46]
Успешно обработали задние 15.09.2015 18:56:44 в pool-1-thread-2 потоке 
[15.09.2015 18:56:45, 15.09.2015 18:56:46, 15.09.2015 18:56:47]
[15.09.2015 18:56:45, 15.09.2015 18:56:46, 15.09.2015 18:56:47, 15.09.2015 18:56:48]
[15.09.2015 18:56:45, 15.09.2015 18:56:46, 15.09.2015 18:56:47, 15.09.2015 18:56:48, 15.09.2015 18:56:49]
queue.size() 5
Успешно обработали задние 15.09.2015 18:56:45 в pool-1-thread-1 потоке 
Успешно обработали задние 15.09.2015 18:56:46 в pool-1-thread-3 потоке 
[15.09.2015 18:56:47, 15.09.2015 18:56:48, 15.09.2015 18:56:49, 15.09.2015 18:56:50]
Успешно обработали задние 15.09.2015 18:56:47 в pool-1-thread-2 потоке 
Успешно обработали задние 15.09.2015 18:56:48 в pool-1-thread-1 потоке 
[15.09.2015 18:56:49, 15.09.2015 18:56:50, 15.09.2015 18:56:51]
Успешно обработали задние 15.09.2015 18:56:49 в pool-1-thread-3 потоке 
[15.09.2015 18:56:50, 15.09.2015 18:56:51, 15.09.2015 18:56:52]
[15.09.2015 18:56:50, 15.09.2015 18:56:51, 15.09.2015 18:56:52, 15.09.2015 18:56:53]
[15.09.2015 18:56:50, 15.09.2015 18:56:51, 15.09.2015 18:56:52, 15.09.2015 18:56:53, 15.09.2015 18:56:54]
queue.size() 5
[15.09.2015 18:56:50, 15.09.2015 18:56:51, 15.09.2015 18:56:52, 15.09.2015 18:56:53, 15.09.2015 18:56:54, 15.09.2015 18:56:55]
Успешно обработали задние 15.09.2015 18:56:50 в pool-1-thread-2 потоке 
Успешно обработали задние 15.09.2015 18:56:52 в pool-1-thread-3 потоке 
Успешно обработали задние 15.09.2015 18:56:51 в pool-1-thread-1 потоке 
[15.09.2015 18:56:53, 15.09.2015 18:56:54, 15.09.2015 18:56:55, 15.09.2015 18:56:56]
Успешно обработали задние 15.09.2015 18:56:53 в pool-1-thread-2 потоке 
Успешно обработали задние 15.09.2015 18:56:54 в pool-1-thread-3 потоке 
[15.09.2015 18:56:55, 15.09.2015 18:56:56, 15.09.2015 18:56:57]
[15.09.2015 18:56:55, 15.09.2015 18:56:56, 15.09.2015 18:56:57, 15.09.2015 18:56:58]
queue.size() 4
[15.09.2015 18:56:55, 15.09.2015 18:56:56, 15.09.2015 18:56:57, 15.09.2015 18:56:58, 15.09.2015 18:56:59]
Успешно обработали задние 15.09.2015 18:56:55 в pool-1-thread-1 потоке 
Успешно обработали задние 15.09.2015 18:56:56 в pool-1-thread-2 потоке 



...
Рейтинг: 0 / 0
В неск. потоков
    #39052265
Фотография Blazkowicz
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Atum1,

Внутри ExecutorService есть своя очередь. Зачем ещё одну заводить? Так же стоит обратить внимание на Future/FutureTask.
...
Рейтинг: 0 / 0
В неск. потоков
    #39052291
Atum1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
BlazkowiczAtum1,

Внутри ExecutorService есть своя очередь. Зачем ещё одну заводить? Так же стоит обратить внимание на Future/FutureTask.

Спешил. Хотел показать разные варианты. Да нужно переписать через futuretask
...
Рейтинг: 0 / 0
В неск. потоков
    #39052413
Atum1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Blazkowicz

вопрос а как этот шедулер переиспользует лямбду?

в этом месте?

Код: java
1.
2.
3.
4.
5.
6.
7.
//кладем задния в очередь 1 раз в секунду
        Runnable runnable = () -> {
            queue.add(LocalDateTime.now().format(DateTimeFormatter.ofPattern("dd.MM.yyyy HH:mm:ss", Locale.getDefault())));
            System.out.println(queue);
        };

        scheduler.scheduleWithFixedDelay(runnable, 3000, 1000, TimeUnit.MILLISECONDS);



каждый раз создает новый экземпляр runnable ? сам ?
...
Рейтинг: 0 / 0
В неск. потоков
    #39052420
Фотография Blazkowicz
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Atum1каждый раз создает новый экземпляр runnable ? сам ?
Вроде как нет, каждый раз не создаётся. Лямбды имеют поведение аналогичное анонимным классам, но реализация совершенно иная.
...
Рейтинг: 0 / 0
В неск. потоков
    #39052493
Atum1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
BlazkowiczAtum1каждый раз создает новый экземпляр runnable ? сам ?
Вроде как нет, каждый раз не создаётся. Лямбды имеют поведение аналогичное анонимным классам, но реализация совершенно иная.

Два, но это тред какой-то, мы же явно создали экземпляр runnable - и по всем старым канонам его можно использовать только один раз в отдельном потоке ... один раз сказать ему старт и все ...?!


и еще вопрос в тему про обработку исключений

Код: java
1.
2.
3.
      executorService.submit(() -> {
            System.out.println(1 / 0);
        });



как обрабатывать и перехватывать и возвращать в главный поток ? ведь если там целый стек таких вызовов асинхронный - то мы ничего не отловим и не увидим ...?
...
Рейтинг: 0 / 0
В неск. потоков
    #39052514
Фотография Blazkowicz
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Atum1Два, но это тред какой-то, мы же явно создали экземпляр runnable - и по всем старым канонам его можно использовать только один раз в отдельном потоке ... один раз сказать ему старт и все ...?!

Вопроса не понял. Каким ещё канонам и почему только один раз? Создавай сколько угодно потоков на одном Runnable. Что мешает?


Atum1как обрабатывать и перехватывать и возвращать в главный поток ? ведь если там целый стек таких вызовов асинхронный - то мы ничего не отловим и не увидим ...?
По-хорошему через Future можно получить исключение выполнения. Если без Future, когда они не нужны, то нужно создать свою обертку, которая перехватывает и логирует исключения. И запускать через неё.
...
Рейтинг: 0 / 0
В неск. потоков
    #39052671
JDS
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Почитал про Executors, ThreadPoolExecutor и ForkJoinPool.
Понял одно - сомнительно, что мне нужен именно пул потоков.
Сам по себе пул потоков нужен именно для снижения издержек на создание самих этих потоков (аналогично как например и с пулом соединений с БД).
То есть, если я в пул поставлю например сто задач, ограничив пул 5-ю потоками, то, как понял, все эти сто задач будут выполняться "одновременно", используя доступные потоки выполнения в пуле, что не совсем то, что мне нужно по-моему.
Мне нужно просто, чтобы например постоянно выполнялось пять задач в пяти потоках, завершилась одна задача, добавился поток с новой задачей.

Про ForkJoinPool пока не понял его глобальные отличия, декларируется, дескать он в полной мере использует многопроцессорность, упоминается в контексте не многопоточного программирования, а некоего "параллельного", а ThreadPoolExecutor не использует многопроцессорность? Или треды, которые реализуются через тот же Runnable-интерфейс все железно запускаются только на одном процессоре/ядре? Понял так же то, что реализаций может быть множество, но пока склонен остаться на немного поправленном примитивном решении:
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
package threadstest;

import java.util.concurrent.atomic.AtomicInteger;

class ThreadCounter {
    private AtomicInteger c = new AtomicInteger(0);

    public void increment() {
        c.incrementAndGet();
    }

    public void decrement() {
        c.decrementAndGet();
    }

    public int value() {
        return c.get();
    }

}

class TestTask implements Runnable {
    String thName;
    ThreadCounter cntWT;
    
    public TestTask(ThreadCounter c, String s) {
        cntWT  = c;
        thName = s;        
        new Thread(this).start();        
    }

    @Override
    public void run()
    {                        
        for (int i = 0; i < 11; i++) {
            System.out.println(thName + ": step "+i);
            try {Thread.sleep(500);
            } catch (InterruptedException ex) {}
        }

        cntWT.decrement(); // задание отработало
    }
}

public class ThreadsTest {    
    public static void main(String[] args) throws InterruptedException {
       ThreadCounter cntWorkThreads = new ThreadCounter(); // счетчик работающих потоков
       int maxWorkingThreads = 10; // макс. кол-во потоков
       
       // условно, задачи, которые надо запустить в обработку
       String[] tasks = new String[20];
       for (int i = 0; i < 20; i++)
          tasks[i] = "Task " + i;
              
       // ставим задачи на обработку
       for (String s: tasks) {
           // ставим сразу до max_working_threads
           if (cntWorkThreads.value() < maxWorkingThreads) {
              cntWorkThreads.increment();
              TestTask tt = new TestTask(cntWorkThreads, s);
           }
           // прежде чем поставить следующую задачу, 
           // ждем пока хотя бы кто-то отработает из уже работающих
           while (cntWorkThreads.value() >= maxWorkingThreads)
               try {Thread.sleep(500);
               } catch (InterruptedException ex) {
                   ex.printStackTrace();
                   break;
               }
       }
    } 
}

...
Рейтинг: 0 / 0
В неск. потоков
    #39052716
Фотография Blazkowicz
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
JDSПонял одно - сомнительно, что мне нужен именно пул потоков.
Сам по себе пул потоков нужен именно для снижения издержек на создание самих этих потоков (аналогично как например и с пулом соединений с БД).

ExecutorService это не пул потоков. Это именно ExecutorService. ThreadPoolExecutor это ExecutorService внутри которого используется пул потоков.

JDSТо есть, если я в пул поставлю например сто задач, ограничив пул 5-ю потоками, то, как понял, все эти сто задач будут выполняться "одновременно", используя доступные потоки выполнения в пуле, что не совсем то, что мне нужно по-моему.

Вы очень глубоко ошибаетесь. ExecutorService никак не может менять задачи в процессе выполнения, даже если они блокируются. Внутри ThreadPoolExecutor помимо пула потоков находится ещё и очередь. Все задачи, которые не выполняются, находятся в этой очереди. По мере завершения выполнения задач, новые поступают из этой очереди.

JDSМне нужно просто, чтобы например постоянно выполнялось пять задач в пяти потоках, завершилась одна задача, добавился поток с новой задачей.

ThreadPoolExecutor именно это и делает, с одной лишь разницей, что вместо создания нового потока он пере-использует освободившийся.

JDSПро ForkJoinPool пока не понял его глобальные отличия, декларируется, дескать он в полной мере использует многопроцессорность, упоминается в контексте не многопоточного программирования, а некоего "параллельного", а ThreadPoolExecutor не использует многопроцессорность?

Это просто удобный инструмент для реализации Fork/Join. Но с вашим букетом заблуждений, наверное, в него вникать рано.

JDSИли треды, которые реализуются через тот же Runnable-интерфейс все железно запускаются только на одном процессоре/ядре?

"Треды" не реализуются через Runnable-интерфейс. Потоки вообще не реализуются. Это такая сущность, которая исполняет код. Runnable это всего лишь часть кода.
Про ядра в контексте Java можно для начала забыть.

Ваше основное заблуждение в том что вы не можете отделить "задачу" от "потока".

JDSПонял так же то, что реализаций может быть множество, но пока склонен остаться на немного поправленном примитивном решении:
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
class ThreadCounter {
    private AtomicInteger c = new AtomicInteger(0);

    public void increment() {
        c.incrementAndGet();
    }

    public void decrement() {
        c.decrementAndGet();
    }

    public int value() {
        return c.get();
    }
}



Гениальный код. Ну, нафига такое писать???

JDS
Код: java
1.
2.
    String thName;
    ThreadCounter cntWT;



Чем больше код похож на английский язык, тем проще его читать. Не бойтесь полноименных идентификаторов.

Код: java
1.
2.
    String threadName;
    ThreadCounter workerThreadCounter;



JDS
Код: java
1.
2.
3.
4.
5.
    public TestTask(ThreadCounter c, String s) {
        cntWT  = c;
        thName = s;        
        new Thread(this).start();        
    }



Запускать потоки из конструктора не безопасно, так как конструктор ещё не завершил исполнение, а поток уже имеет ссылку на объект и может с ним работать. Это чревато трудно-повторимыми косяками.


JDS
Код: java
1.
2.
            try {Thread.sleep(500);
            } catch (InterruptedException ex) {}



InterruptedException очень важный элемент при работе с задачами вроде вашей. Не стоит его игнорировать.
У нас лет десять назад шеф обещал за пустые catch-и увольнять.

JDS
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
public class ThreadsTest {    
    public static void main(String[] args) throws InterruptedException {
       ThreadCounter cntWorkThreads = new ThreadCounter(); // счетчик работающих потоков
       int maxWorkingThreads = 10; // макс. кол-во потоков
       
       // условно, задачи, которые надо запустить в обработку
       String[] tasks = new String[20];
       for (int i = 0; i < 20; i++)
          tasks[i] = "Task " + i;
              
       // ставим задачи на обработку
       for (String s: tasks) {
           // ставим сразу до max_working_threads
           if (cntWorkThreads.value() < maxWorkingThreads) {
              cntWorkThreads.increment();
              TestTask tt = new TestTask(cntWorkThreads, s);
           }
           // прежде чем поставить следующую задачу, 
           // ждем пока хотя бы кто-то отработает из уже работающих
           while (cntWorkThreads.value() >= maxWorkingThreads)
               try {Thread.sleep(500);
               } catch (InterruptedException ex) {
                   ex.printStackTrace();
                   break;
               }
       }
    } 
}



Очень рекомендую прочесть Clean Code by Robert Cecil Martin.
У вас имена переменных во многих случаях совершенно не отражают своего предназначения.
...
Рейтинг: 0 / 0
В неск. потоков
    #39052962
Atum1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
[quot JDS]

JDSМне нужно просто, чтобы например постоянно выполнялось пять задач в пяти потоках, завершилась одна задача, добавился поток с новой задачей.

ThreadPoolExecutor именно это и делает, с одной лишь разницей, что вместо создания нового потока он пере-использует освободившийся.

Про ForkJoinPool пока не понял его глобальные отличия, декларируется, дескать он в полной мере использует многопроцессорность, упоминается в контексте не многопоточного программирования, а некоего "параллельного", а ThreadPoolExecutor не использует многопроцессорность?



как один из примеров ForkJoinPool
17690278

когда у вас простая вычислительная задача - вы разбиваете интервал на части ,а потом в конце их нужно склеить

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
List<Future<BigInteger>> results

--->
future.get()

BigInteger result = BigInteger.ONE;
            for (Future<BigInteger> future : results) {
                result = result.multiply(future.get());
            }
...
Рейтинг: 0 / 0
В неск. потоков
    #39052976
JDS
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
BlazkowiczExecutorService это не пул потоков. Это именно ExecutorService. ThreadPoolExecutor это ExecutorService внутри которого используется пул потоков.
Спасибо за пояснение, понимаю конечно, да и не писал, что ExecutorService - это пул потоков.
BlazkowiczВы очень глубоко ошибаетесь. ExecutorService никак не может менять задачи в процессе выполнения, даже если они блокируются.
Внутри ThreadPoolExecutor помимо пула потоков находится ещё и очередь. Все задачи, которые не выполняются, находятся в этой очереди. По мере завершения выполнения задач, новые поступают из этой очереди.
...
ThreadPoolExecutor именно это и делает, с одной лишь разницей, что вместо создания нового потока он пере-использует освободившийся.
Действительно, к своей радости, оказался неправ (да и изначально надеялся, что так и должно быть, но неверно трактовал одно из предложений в чтиве)
BlazkowiczЭто просто удобный инструмент для реализации Fork/Join. Но с вашим букетом заблуждений, наверное, в него вникать рано.
Ок
Blazkowicz"Треды" не реализуются через Runnable-интерфейс. Потоки вообще не реализуются. Это такая сущность, которая исполняет код.
Runnable это всего лишь часть кода. Про ядра в контексте Java можно для начала забыть.
Согласен, выразился некорректно, но не будем доводить до абсурда, предполагая, что человек не понимает сути слова "поток" в контексте топика )
BlazkowiczВаше основное заблуждение в том что вы не можете отделить "задачу" от "потока".
Это ваше заблуждение насчет моего заблуждения (см. выше))
BlazkowiczГениальный код. Ну, нафига такое писать???
Мне тоже не нравится, можно и вообще бы обойтись скорее, одним AtomicInteger например, просто пока читал, зацепил этот пример, ну и воткнул, дабы оценить по пути как спецы JAVA воспримут код из букваря )
Понятно, что вариантов синхронизации может придумать много, а если использовать пул, то этот счетчик и вообще вроде как не нужен.
BlazkowiczЗапускать потоки из конструктора не безопасно, так как конструктор ещё не завершил исполнение, а поток уже имеет ссылку на объект и может с ним работать. Это чревато трудно-повторимыми косяками.
Полезный нюанс, спасибо, а Шилдту руки оторвать, чему детей учит ) (без шуток, согласен, есть такое дело)

BlazkowiczInterruptedException очень важный элемент при работе с задачами вроде вашей. Не стоит его игнорировать.
У нас лет десять назад шеф обещал за пустые catch-и увольнять.
Абсолютно согласен и сам на практике не грешу этим, но стараюсь просто не писать лишнего, приводя примеры, так-то можно написать и логирование и черта лысого, зачем это в примере? В моем понимании, в нормальной ситуации человек итак не будет гасить эксепшн пустым блоком обработки.

BlazkowiczОчень рекомендую прочесть Clean Code by Robert Cecil Martin.
Спасибо, посмотрю.
...
Рейтинг: 0 / 0
В неск. потоков
    #39052996
JDS
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Atum1как один из примеров ForkJoinPool
когда у вас простая вычислительная задача - вы разбиваете интервал на части, а потом в конце их нужно склеить
Если под ForkJoinPool подразумевается именно разбивка задачи и дальнейшее склеивание результатов, видимо, само название и говорит об этом, то склеивание мне не нужно, т.е. с головой хватит обычного FixedThreadPool-а.
Буду рад еще уточнениям, а так всем спасибо.
...
Рейтинг: 0 / 0
19 сообщений из 19, страница 1 из 1
Форумы / Java [игнор отключен] [закрыт для гостей] / В неск. потоков
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]