Гость
Целевая тема:
Создать новую тему:
Автор:
Форумы / Java [игнор отключен] [закрыт для гостей] / ExecutorCompletionService / 24 сообщений из 24, страница 1 из 1
22.04.2017, 20:12
    #39442741
R@mon
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
Друзья, доброго времени суток!
Помогите пожалуйста разобраться с проблемой.
Есть тестовое приложение которое запускает 2 потока для сканирования 2х каталогов.
В каждом из этих 2х потоках запускаются подпотоки по одному на каждый файл в каталоге.
Объясните пожалуйста почему программа виснет на строке:
Код: java
1.
Result result = service.take().get();



При этом, судя по логам метод FileScaner.call отрабатывает без проблем и возвращает результат.


Полный код:
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
import org.apache.log4j.Logger;
import java.io.File;
import java.util.concurrent.*;

public class Main {
    private static Logger logger = Logger.getLogger(Main.class);

    public static void main(String[] args) {
        logger.debug("main start");

        new Thread(new DirectoryScaner(new File("D:\\test\\txt\\in"))).start();
        new Thread(new DirectoryScaner(new File("D:\\test\\csv\\in"))).start();

        logger.debug("main end");
    }
}



Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
import org.apache.log4j.Logger;

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.*;

public class DirectoryScaner implements Runnable {

    private static Logger logger = Logger.getLogger(DirectoryScaner.class);

    private File directory;

    public DirectoryScaner(File directory) {
        this.directory = directory;
        logger.debug("new DirectoryScaner " + directory);
    }

    public void run() {
        logger.debug("Start DirectoryScaner run for directory " + directory.getAbsolutePath());

        long random = (long)(Math.random() * 10 + 1)*1000;
        logger.debug("random " + random);
        try {
            Thread.sleep(random);
        } catch (InterruptedException e) {
            logger.error("Получили ошибку InterruptedException",e);
        }

        scan();

        logger.debug("End DirectoryScaner run for directory " + directory.getAbsolutePath());
    }

    private void scan(){
        logger.debug("Start DirectoryScaner scan for directory " + directory.getAbsolutePath());

        ExecutorService executor = Executors.newFixedThreadPool(1);
        CompletionService<Result> service = new ExecutorCompletionService<Result>(executor);

        File[] files = directory.listFiles(new FileFilter() {
            public boolean accept(File pathname) {
                return pathname.isFile();
            }
        });

        logger.debug("Найдено файлов " + files.length);
        for (File file : files) {
            executor.submit(new FileScaner(directory, file));
        }

        executor.shutdown();

        for (int i = 0; i < files.length; i++) {
            try {
                logger.debug("Повисли и ждем результата...");
                Result result = service.take().get();
                logger.debug("Получили результат " + result);

                File dest = new File(directory.getAbsoluteFile() + "\\out\\" + result.getFile().getName());
                result.getFile().renameTo(dest);
                logger.debug("file " + result.getFile() + " renameTo " + dest.getAbsolutePath());

            } catch (InterruptedException e) {
                logger.error("Получили ошибку InterruptedException",e);
            } catch (ExecutionException e) {
                logger.error("Получили ошибку ExecutionException",e);
            }
        }
        logger.debug("Start DirectoryScaner scan for directory " + directory.getAbsolutePath());
    }
}



Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
import org.apache.log4j.Logger;

import java.io.File;
import java.util.concurrent.Callable;

public class FileScaner implements Callable<Result> {

    private static Logger logger = Logger.getLogger(FileScaner.class);
    private File directory;
    private File file;

    public FileScaner(File directory, File file) {
        this.directory = directory;
        this.file = file;
        logger.debug("new FileScaner " + file);
    }

    @Override
    public Result call() throws Exception {

        logger.debug("Start FileScaner call for file " + file.getAbsolutePath());

        long random = (long)(Math.random() * 10 + 1)*1000;
        logger.debug("random " + random + " for file " + file.getAbsolutePath());

        try {
            Thread.sleep(random);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Result result = new Result();
        result.setFile(file);
        if (directory.equals(new File("D:\\test\\csv\\in"))){
            result.setException(new Exception("!!! call csv Exception"));
        }
        logger.debug("End FileScaner call for file" + file.getAbsolutePath() + " result " + result);

        return result;
    }
}



Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
import java.io.File;

public class Result {
    private File file;
    private Exception exception;

    public File getFile() {
        return file;
    }

    public void setFile(File file) {
        this.file = file;
    }

    public Exception getException() {
        return exception;
    }

    public void setException(Exception exception) {
        this.exception = exception;
    }

    @Override
    public String toString() {
        return "Result{" +
                "file=" + file +
                ", exception=" + exception +
                '}';
    }
}
...
Рейтинг: 0 / 0
22.04.2017, 20:23
    #39442744
Basil A. Sidorov
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
R@monЕсть тестовое приложение которое запускает 2 потока для сканирования 2х каталогов.
В каждом из этих 2х потоках запускаются подпотоки по одному на каждый файл в каталоге.Есть хоть одно разумное объяснение такой реализации?
...
Рейтинг: 0 / 0
22.04.2017, 20:47
    #39442753
R@mon
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
Basil A. Sidorov,
1. Мне нужно одновременно сканировать 8 каталогов. В каждом файлы своего формата. Два написал для примера.
2. FileScaner будет разбирать конкретный файл (размер файла 100-200 МБ), так что тут будет основная работа. Ну и строить из него List<DBFObj> или List<CSVObj> в зависимости от типа файла.
Что конкретно не так со структурой?
...
Рейтинг: 0 / 0
22.04.2017, 21:06
    #39442757
Basil A. Sidorov
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
Должны быть очевидны два фундаментальных изъяна вашего дизайна:
1. Дисковый ввод-вывод, в целом, является последовательным. SSD несколько изменяют ситуацию, но ускорить обработку "одновременным" чтением кучи файлов - вряд ли получится;
2. Я, конечно, разорялся на тему "не надо экономить потоки", но на моей прошлой работе было вполне рядовое хранилище, вполне средней системы, содержавшее несколько миллионов файлов. Даже на 64-разрядных системах проблемы начнутся гораздо раньше, чем вы сможете обработать "тридцать тыщ одних курьеров файлов".
...
Рейтинг: 0 / 0
22.04.2017, 21:14
    #39442759
Petro123
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
R@monМне нужно одновременно сканировать 8 каталогов.
а считывающая головка - одна(.
Одним потоком составьте список и отдать его в несколько потоков уже с готовым списком файлов.
...
Рейтинг: 0 / 0
22.04.2017, 21:22
    #39442762
R@mon
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
Друзья, возможно реализация в целом хромает и возможно даже на обе ноги. Я бы не хотел углубляться в этот вопрос.
Может кто-то подсказать, по сути вопроса, что не так с ExecutorCompletionService?
...
Рейтинг: 0 / 0
22.04.2017, 21:24
    #39442765
Petro123
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
R@monЯ бы не хотел углубляться в этот вопрос
тебе решать. Удачи!
...
Рейтинг: 0 / 0
22.04.2017, 21:33
    #39442767
Basil A. Sidorov
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
R@monДрузья, возможно реализация в целом хромает и возможно даже на обе ноги. Я бы не хотел углубляться в этот вопрос.Я тоже не хочу углубляться. Только в реализацию кривого дизайна.Может кто-то подсказать, по сути вопроса, что не так с ExecutorCompletionService?"Меня опять терзают смутные сомнения", что Stream API Java8 делает всё, что вам нужно и даже больше.
...
Рейтинг: 0 / 0
22.04.2017, 21:37
    #39442768
R@mon
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
Basil A. Sidorov,
У меня есть ограничения Java6
...
Рейтинг: 0 / 0
22.04.2017, 22:10
    #39442774
Basil A. Sidorov
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
R@monУ меня есть ограничения Java6Но доку-то всё равно надо читать?
ExecutorCompletionService организует очередь задач, на предоставленном вами исполнителе.
Вы, вместо того, чтобы дать сервисному компоненту спокойно работать вы бьете его по рукам, (за каким-то лешим) выключая из работы исполнителя: где тут shutdown ?!
...
Рейтинг: 0 / 0
22.04.2017, 22:35
    #39442782
R@mon
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
Basil A. Sidorov,
shutdown для исполнителя говорит лишь о том, что больше не принимаются задачи в очередь. Те что уже есть продолжают работать. Это так, что касается документации.
...
Рейтинг: 0 / 0
22.04.2017, 22:38
    #39442783
Basil A. Sidorov
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
Хотя смотрю на весь этот букет кода и классов ...
Вот задлянафига вся эта развесистая клюква, если последовательный проход по массиву со списком файлов справится точно так же, но гораздо проще?
...
Рейтинг: 0 / 0
22.04.2017, 22:50
    #39442784
R@mon
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
Basil A. Sidorov,
Показанный пример очень упрощен, что бы показать лишь суть происходящего.
Я не могу понять, что во всем этом я делаю не так. Почему не прокидывается ответ.
Вот нагуглил пример, все работает как надо.
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.*;

public class Test {

    public static class stringCallable implements Callable<String>{
        String mstring;

        stringCallable(String s) {mstring = s;}

        @Override
        public String call() throws Exception {

            Random r = new Random();
            int sec = r.nextInt(5);
            sec = sec * 1000;
            Thread.sleep(sec);
            return mstring+" - "+sec;
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
        ArrayList<Callable<String>> list = new ArrayList<Callable<String>>();
        ExecutorService e = Executors.newFixedThreadPool(5);
        list.add(new stringCallable("1"));
        list.add(new stringCallable("2"));
        list.add(new stringCallable("3"));
        list.add(new stringCallable("4"));
        list.add(new stringCallable("5"));
        System.out.println ("Starting Solver");
        try {
            solve(e, list);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        } catch (ExecutionException e1) {
            e1.printStackTrace();
        }
        System.out.println ("End Block");
    }

    public static void solve(ExecutorService e, Collection<Callable<String>> solvers) throws InterruptedException, ExecutionException {

        CompletionService<String> ecs = new ExecutorCompletionService<String>(e);
        for (Callable<String> s : solvers){
            ecs.submit(s);
        }

        e.shutdown();

        int n = solvers.size();
        for (int i = 0; i < n; ++i) {
            String r = ecs.take().get();
            if (r != null)
                use(r);
        }
    }
    private static void use(String r) {
        System.out.println (r);
    }
}
...
Рейтинг: 0 / 0
22.04.2017, 22:53
    #39442785
Basil A. Sidorov
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
R@monshutdown для исполнителя говорит лишь о том, что больше не принимаются задачи в очередь. Те что уже есть продолжают работать. Это так, что касается документации."Иногда не помогает даже чтение газет".
Вы отправили тысячу заданий на однопоточный исполнитель. Это настолько быстро, что ещё до того, как будет выполнено ваше первое задание (перенос файла из каталога в каталог), исполнитель прекратит принимать новые задания.
Мне сложно оценить высоты полёта ваших мыслей, но есть в них что странное - лично я не стал бы даже гадать о возможных вариантах поведения сервисной обёртки над остановленным исполнителем.
...
Рейтинг: 0 / 0
22.04.2017, 22:55
    #39442786
Basil A. Sidorov
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
R@monВот нагуглил пример, все работает как надоВместо бессмысленного гуглежа надо сделать по примеру из доки, а ещё лучше - выкинуть изначально дрянную идею и ещё раз подумать над дизайном.
...
Рейтинг: 0 / 0
22.04.2017, 23:01
    #39442787
Basil A. Sidorov
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
sleep - отдельный косяк: хотите взять театральную паузу - используйте wait.
...
Рейтинг: 0 / 0
22.04.2017, 23:11
    #39442792
R@mon
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
Basil A. Sidorov,
Пример в точности по документации, ссылку на которую вы предоставляли.
Количество потоков тут не важно. Все готовые результаты хранятся в коллекции.
Если есть готовый результат, то service.take() получает его по первому запросу, если результата еще нет, то происходит блокировка и ожидание первого готового.
Я надеялся получить ответ (или предположение) на конкретный вопрос. Если у вас такого нет, надеюсь поможет кто-либо еще.
...
Рейтинг: 0 / 0
22.04.2017, 23:20
    #39442794
Basil A. Sidorov
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
R@monПример в точности по документации, ссылку на которую вы предоставляли.
и где в доке shutdown?
Код: sql
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
   void solve(Executor e,
              Collection<Callable<Result>> solvers)
     throws InterruptedException, ExecutionException {
       CompletionService<Result> ecs
           = new ExecutorCompletionService<Result>(e);
       for (Callable<Result> s : solvers)
           ecs.submit(s);
       int n = solvers.size();
       for (int i = 0; i < n; ++i) {
           Result r = ecs.take().get();
           if (r != null)
               use(r);
       }
   }

Количество потоков тут не важно.Но, почему-то, автор примера из гугла заготовил пять потоков под пять заданий.Все готовые результаты хранятся в коллекции.Толку-то: вопрос в том, что происходит с теми, кто даже не начал исполняться?Я надеялся получить ответ (или предположение) на конкретный вопрос. Если у вас такого нет, надеюсь поможет кто-либо еще.Т.е. ответ "выкинуть и передалать правильно" вас не устраивает?
Ладушки - успехов в костылестроении и граблехождении.
...
Рейтинг: 0 / 0
23.04.2017, 00:16
    #39442804
Basil A. Sidorov
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
Претензии к наличию shutdown - снимаются.
Претензии к кривизне дизайна - остаются.
...
Рейтинг: 0 / 0
24.04.2017, 09:21
    #39443052
R@mon
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
Выспался и еще раз прочитал код, все оказалось проще чем кажется.
Для того что бы service мог знать о задачах которые завершились надо в него и делать submit. Я же в примере делаю его в executor.
Поменял, все заработало.

Теперь что касается "кривизны дизайна".
Я так понимаю, что основной недочет в том, что большое количество потоков будет использовать большое количество файлов. При этом узким местом остается ввод\вывод.
Как избавиться от этого? Первым и основным потоком прочитать все файлы в память?
...
Рейтинг: 0 / 0
24.04.2017, 09:47
    #39443060
Petro123
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
R@monПервым и основным потоком прочитать все файлы в память?
не содержимое, а имена файлов. Идеально - не читая сами файлы.
...
Рейтинг: 0 / 0
24.04.2017, 10:00
    #39443075
R@mon
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
Petro123,
Так я вроде это и делаю...
Читаю имена файлов и передаю ссылку на один файл в каждый поток, что бы они не мешали друг другу. Каждый поток работает со своим файлом, нет?
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
        File[] files = directory.listFiles(new FileFilter() {
            public boolean accept(File pathname) {
                return pathname.isFile();
            }
        });

        logger.debug("Найдено файлов " + files.length);
        for (File file : files) {
            service.submit(new FileScaner(directory, file));
        }
...
Рейтинг: 0 / 0
24.04.2017, 10:19
    #39443085
Petro123
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
R@monЧитаю имена файлов и передаю ссылку на один файл в каждый поток, что бы они не мешали друг другу. Каждый поток работает со своим файлом, нет?
нет.
1. Я думал ты искать будешь по какому то критерию файлы. Это долгая операция и занимает все ресурсы машины. Её не распараллелить.
Потом ты СПИСОК как входнй параметр передаёшь в поток для разбора. И старт потока.
Итого на клик кнопки у тебя завис на время поиска и потом поток в фоне делает работу хоть 8 часов.
...
Рейтинг: 0 / 0
24.04.2017, 16:23
    #39443445
Basil A. Sidorov
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
ExecutorCompletionService
R@monЯ так понимаю, что основной недочет в том, что большое количество потоков будет использовать большое количество файлов. При этом узким местом остается ввод\вывод.
Как избавиться от этого? Первым и основным потоком прочитать все файлы в память?В оптимизированном коде легко запутаться, но общая стратегия для поточной обработки примерно такая.
Выделяется один (несколько) больших байтовых массивов (порядка сотен мегабайт каждый).
Из этих массивов "берутся" блоки (8-32КБ) в/из которых читает/пишет с которыми работают или один поток или два (отдельно на чтение и запись).
Т.е. файлов (в вашем случае) будет открыто по числу входных/выходных для типов/каталогов, т.к. быстрее, чем последовательная обработка по каждому типу всё равно не получится.
Технически это будет ByteBuffer поверх byte[] и реализация интерфейсов разборки/сборки (scatter/gather) из NIO.
Головоломная часть - управления блоками и синхронизация. Тут я даже псевдокод не готов предложить.
...
Рейтинг: 0 / 0
Форумы / Java [игнор отключен] [закрыт для гостей] / ExecutorCompletionService / 24 сообщений из 24, страница 1 из 1
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]