Гость
Целевая тема:
Создать новую тему:
Автор:
Форумы / Java [игнор отключен] [закрыт для гостей] / Логика метода WebCrawler#stop [Concurrency in practice 7.2.5] / 3 сообщений из 3, страница 1 из 1
23.02.2017, 12:18
    #39409534
questioner
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Логика метода WebCrawler#stop [Concurrency in practice 7.2.5]
Экзекутор, который умеет возвращать не только не начатые задачи, но и те, что прерваны в прогрессе:
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
public class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService exec;
    private final Set<Runnable> tasksCancelledAtShutdown =
            Collections.synchronizedSet(new HashSet<Runnable>());

    public TrackingExecutor(ExecutorService exec) {
        this.exec = exec;
    }

    public void shutdown() {
        exec.shutdown();
    }

    public List<Runnable> shutdownNow() {
        return exec.shutdownNow();
    }

    public boolean isShutdown() {
        return exec.isShutdown();
    }

    public boolean isTerminated() {
        return exec.isTerminated();
    }

    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        return exec.awaitTermination(timeout, unit);
    }

    public List<Runnable> getCancelledTasks() {
        if (!exec.isTerminated())
            throw new IllegalStateException(/*...*/);
        return new ArrayList<Runnable>(tasksCancelledAtShutdown);
    }

    public void execute(final Runnable runnable) {
        exec.execute(new Runnable() {
            public void run() {
                try {
                    runnable.run();
                } finally {
                    if (isShutdown()
                            && Thread.currentThread().isInterrupted())
                        tasksCancelledAtShutdown.add(runnable);
                }
            }
        });
    }
}



и есть краулер, который этот класс использует

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
public abstract class WebCrawler {
    private volatile TrackingExecutor exec;
    @GuardedBy("this") private final Set<URL> urlsToCrawl = new HashSet<URL>();

    private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
    private static final long TIMEOUT = 500;
    private static final TimeUnit UNIT = MILLISECONDS;

    public WebCrawler(URL startUrl) {
        urlsToCrawl.add(startUrl);
    }

    public synchronized void start() {
        exec = new TrackingExecutor(Executors.newCachedThreadPool());
        for (URL url : urlsToCrawl) submitCrawlTask(url);
        urlsToCrawl.clear();
    }

    public synchronized void stop() throws InterruptedException {
        try {
            saveUncrawled(exec.shutdownNow());
            if (exec.awaitTermination(TIMEOUT, UNIT))
                saveUncrawled(exec.getCancelledTasks());
        } finally {
            exec = null;
        }
    }

    protected abstract List<URL> processPage(URL url);

    private void saveUncrawled(List<Runnable> uncrawled) {
        for (Runnable task : uncrawled)
            urlsToCrawl.add(((CrawlTask) task).getPage());
    }

    private void submitCrawlTask(URL u) {
        exec.execute(new CrawlTask(u));
    }

    private class CrawlTask implements Runnable {
        private final URL url;

        CrawlTask(URL url) {
            this.url = url;
        }

        private int count = 1;

        boolean alreadyCrawled() {
            return seen.putIfAbsent(url, true) != null;
        }

        void markUncrawled() {
            seen.remove(url);
            System.out.printf("marking %s uncrawled%n", url);
        }

        public void run() {
            for (URL link : processPage(url)) {
                if (Thread.currentThread().isInterrupted())
                    return;
                submitCrawlTask(link);
            }
        }

        public URL getPage() {
            return url;
        }
    }
}


Мне не ясен код метода stop:

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
public synchronized void stop() throws InterruptedException {
        try {
            saveUncrawled(exec.shutdownNow()); //1
            if (exec.awaitTermination(TIMEOUT, UNIT)) //2 
                saveUncrawled(exec.getCancelledTasks()); //3
        } finally {
            exec = null; //4
        }
    }



1 - exec.shutdownNow() - возвращает ещё не начатые задачи

Во второй строчке мы ждём пока задачи исполнят нашу фразу о прерывании. Этого может и не произойти. Как я понял могут быть false positive результаты из-за того, что задача например не предусматривает cancellation. А ждать нам надо ибо только в конце исполнения эти задачи попадут в коллекцию внутри экзекутора.

В случае если дождались, то понятно, добавляем задачи - радуемся жизни.

4- а вот если не дождались, то что это за чудо код? зачем он?
...
Рейтинг: 0 / 0
23.02.2017, 21:04
    #39409720
no56892
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Логика метода WebCrawler#stop [Concurrency in practice 7.2.5]
questioner,
автор4- а вот если не дождались, то что это за чудо код? зачем он?
Что бы если интерраптнуть на awaitTermination - сделать доступным для GC TrackingExecutor (естественно после того как все потоки там завершатся рано или поздно).

Если awaitTermination не дождется - то рабочие потоки доработают -> либо NPE в submitCrawlTask, либо будут складывать в уже не нужную tasksCancelledAtShutdown, ну либо успеют засунуть туда).
...
Рейтинг: 0 / 0
23.02.2017, 21:22
    #39409723
no56892
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Логика метода WebCrawler#stop [Concurrency in practice 7.2.5]
no56892questioner,
автор4- а вот если не дождались, то что это за чудо код? зачем он?
Что бы если интерраптнуть на awaitTermination - сделать доступным для GC TrackingExecutor (естественно после того как все потоки там завершатся рано или поздно).

+ в случае если isTerminated() == false, тоже IllegalStateException будет.
...
Рейтинг: 0 / 0
Форумы / Java [игнор отключен] [закрыт для гостей] / Логика метода WebCrawler#stop [Concurrency in practice 7.2.5] / 3 сообщений из 3, страница 1 из 1
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]