Гость
Целевая тема:
Создать новую тему:
Автор:
Форумы / Java [игнор отключен] [закрыт для гостей] / Многопоточные коллекции / 20 сообщений из 20, страница 1 из 1
18.02.2017, 22:15
    #39407311
3ю4
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
Вопрос касается таких вещей, как CopyOnWriteArrayList и CopyOnWriteArraySet.
Допустим, я хочу создать CopyOnWriteArrayList, который содержит CopyOnWriteArraySet`ы, которые уже содержат какие-то Object`ы.
Есть объект Storage с методом public boolean setObject(n, o), где n - номер Set`a, o - номер объекта. В нём заданный объект добавляется (add) в выбранный Set.
Теперь представим ситуацию, когда много потоков решают одновременно вызвать setObject и положить заведомо разные объекты (hash и equals адекватные, коллизии исключаем) в коллекции.
Вопрос: почему в таком случае объекты иногда просто теряются в процессе добавления?
Тесты показали, что в случае с synchronized setObject объекты не теряются, но ведь вся фишка этих коллекций, что они должны работать в многопотоке сами по себе.
Расскажите, в каком месте теряются мои Object`ы?
...
Рейтинг: 0 / 0
18.02.2017, 22:51
    #39407317
redwhite90
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
3ю4,

Думаю код будет как минимум не лишним...
...
Рейтинг: 0 / 0
18.02.2017, 23:08
    #39407322
3ю4
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
Вот такой Storage.

import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;

public class Storage {

List<Set<User>> list;

public Storage(int n) {
list = new CopyOnWriteArrayList<Set<User>>();
for (int i = 0; i < n; i++) {
Set<User> set = new CopyOnWriteArraySet<User>();
list.add(set);
}
}

public boolean setUser(int n, User o) {
return list.get(n).add(o);
}
}

Каждый поток пытается положить объект вызовом такой строки в run():

public void run() {
......
storage.setUser(n, user);
......
}

В конечном итоге логи показывают, что метод был вызван для всех 1200 юзеров, но прошло лишь 1199. И то такая ошибка одна лишь на 10-20 проходов программы. С synchronized setUser повторить ошибку не удаётся, все 1200 проходят.
...
Рейтинг: 0 / 0
18.02.2017, 23:33
    #39407328
3ю4
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
Засунул в setUser перед return вот такую конструкцию.
Код: java
1.
2.
3.
4.
5.
6.
synchronized (num) {
            num.incrementAndGet();
            if (num.get() > 1198) {
                LOG.info("INSIDE " + num);
            }
        }


В логах показало, что насчитало 1200, но реально добавило 1199.
...
Рейтинг: 0 / 0
18.02.2017, 23:43
    #39407329
redwhite90
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
3ю4,

Первое, что в глаза бросается, что

List<Set<User>> list; должен быть final

ну и всё равно это не весь код. Как стартуют потоки?
...
Рейтинг: 0 / 0
18.02.2017, 23:56
    #39407333
3ю4
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
redwhite90, весь код большой (с FixedThreadPool, с синхронизаторами, локами и пр.), одновременно пытаются добавить своего юзера до 30 потоков, стучась в
Код: java
1.
storage.setUser(n, user);


Проблему я локализовал ровно до него. И даже чуть дальше. Поэтому остальной код только запутает читателей.
Метод
Код: java
1.
2.
3.
    public boolean setUser(int n, User o) {
        return list.get(n).add(o);
    }


отрабатывает всегда 1200 раз, но добавляет 1199 (а иногда и 1198) юзеров.
...
Рейтинг: 0 / 0
19.02.2017, 00:42
    #39407344
redwhite90
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
3ю4,

эксепшены внутри run кетчить пробовали?

submit используете?

В любом случае представленного кода недостаточно, чтобы что-то понять(как минимум я не вижу проблемы)
Оснований, что многопоточные коллекции работают верно, а Ваш код нет - нет никаких


Совет по поводу final пробовали?

как объект(все объекты, которые шарятся) ваш публикуется?
...
Рейтинг: 0 / 0
19.02.2017, 00:43
    #39407345
redwhite90
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
redwhite90

Оснований, что многопоточные коллекции работают верно, а Ваш код нет - нет никаких


Оснований, что многопоточные коллекции работают не верно, а Ваш код верно -нет никаких
...
Рейтинг: 0 / 0
19.02.2017, 00:44
    #39407346
redwhite90
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
3ю4,

ну и попробуйте упростить Ваш пример до того, который можно здесь показать, а то гадание получается
...
Рейтинг: 0 / 0
19.02.2017, 00:49
    #39407347
questioner
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
3ю4,

Может с equals не так что-то и юзер просто повторяется?
...
Рейтинг: 0 / 0
19.02.2017, 01:31
    #39407349
3ю4
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
redwhite903ю4,

эксепшены внутри run кетчить пробовали?

submit используете?

В любом случае представленного кода недостаточно, чтобы что-то понять(как минимум я не вижу проблемы)
Оснований, что многопоточные коллекции работают верно, а Ваш код нет - нет никаких


Совет по поводу final пробовали?

как объект(все объекты, которые шарятся) ваш публикуется?
Не понимаю, при чём тут submit или final? Сабмитаются все потоки сразу. Есть несколько Condition, которые отпускаются, после чего вызывается упомянутый выше метод несколькими потоками. Если на методе указать synchronized, то всё идеально. Более 100 запусков с synchronized к ошибке не приводят. Без него буквально на первых 5-10 уже недосчитываюсь юзера.
questioner3ю4,

Может с equals не так что-то и юзер просто повторяется?
Это в принципе исключено, даже hashCode переопределил. Да и опять же, с synchronized никаких проблем.
Я подозреваю, что когда два потока добавляют юзеров в Set (а там вроде же создаётся сперва копия), каким-то образом листу передаётся лишь одно добавление. Ведь синхронизированными являются операции set.add(o) и list.get(n) по отдельности, но вот операция {set=list.get(n); set.add(o)} не атомарная. И вся проблема в моей нубской строке list.get(n).add(o).
...
Рейтинг: 0 / 0
19.02.2017, 01:58
    #39407352
3ю4
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
В общем, повесил прямо в run() до и после вызова setUser методы lock.lock() и lock.unlock(). Теперь работает нормально.
...
Рейтинг: 0 / 0
19.02.2017, 02:05
    #39407353
redwhite90
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
3ю4Не понимаю, при чём тут submit или final?

Storage не ThreadSafe.

http://stackoverflow.com/questions/3929342/choose-between-executorservices-submit-and-executorservices-execute

3ю4Это в принципе исключено, даже hashCode переопределил.
Серьезный подход даже hashCode
3ю4Я подозреваю, что когда два потока добавляют юзеров в Set (а там вроде же создаётся сперва копия), каким-то образом листу передаётся лишь одно добавление.

мутационная операция только одна будет происходить в один момент времени вторая будет дожидаться первую
3ю4Ведь синхронизированными являются операции set.add(o) и list.get(n) по отдельности, но вот операция {set=list.get(n); set.add(o)} не атомарная. И вся проблема в моей нубской строке list.get(n).add(o).
а зачем тут атомарность?
...
Рейтинг: 0 / 0
19.02.2017, 02:08
    #39407355
redwhite90
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
3ю4,

Вы понимаете, что данных недостаточно?

даже про лок... вот откуда нам знать как он создаётся и куда передаётся?
...
Рейтинг: 0 / 0
19.02.2017, 10:49
    #39407389
забыл ник
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
3ю4Ведь синхронизированными являются операции set.add(o) и list.get(n) по отдельности, но вот операция {set=list.get(n); set.add(o)} не атомарная. И вся проблема в моей нубской строке list.get(n).add(o).

Именно. Многопоточные коллекции это не панацея, надо достаточно глубоко понимать как они работают
...
Рейтинг: 0 / 0
19.02.2017, 14:56
    #39407438
questioner
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
забыл ник3ю4Ведь синхронизированными являются операции set.add(o) и list.get(n) по отдельности, но вот операция {set=list.get(n); set.add(o)} не атомарная. И вся проблема в моей нубской строке list.get(n).add(o).

Именно. Многопоточные коллекции это не панацея, надо достаточно глубоко понимать как они работают

А зачем там атомарность?
Как неудачно могут упорядочиться операции? Как это может влиять на исполнение?
...
Рейтинг: 0 / 0
19.02.2017, 15:59
    #39407449
no56892
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
Автор, твой приведенный здесь код корректен с этой точки зрения (хотя нафига первый лист синхронизировать если после конструктора он не изменяется, тупо final List[] и все, плюс copyonwrite стоило бы заменить на что-то более подходящее для частой записи).
К такому поведению может привести:
1. при добавлении такой элемент в сете уже есть в широком смысле (например, новый User расшаривается, а потом добавляется в коллекцию, а в др потоке меняют поле на основе кот вычиляется hashcode)
2. кто-то после добавления элемента и между вызовом size() туда туда лазит и удаляет.

Попробуй не переопределять equals/hashcode в User, увидишь разницу.
...
Рейтинг: 0 / 0
19.02.2017, 19:25
    #39407493
no56892
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
Дело было вечером, делать было нечего...

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

class Test {

    private static class Storage<T> {

        final private Set<T>[] list;

        @SuppressWarnings("unchecked")
        public Storage(final int size) {
	//ВАРИАНТ 1:
            list = Stream.generate(() -> Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>()))
                    .limit(size)
                    .toArray(Set[]::new);
	//ВАРИАНТ 2:
            /*list = Stream.generate(CopyOnWriteArraySet::new)
                    .limit(size)
                    .toArray(Set[]::new);*/
        }

        public boolean put(T element, int box) {
            return list[box].add(element);
        }

	//for test, NotThreadSafe
        public int getSize() {
            int size = 0;
            for(Set<T> set : list) {
                size += set.size();
            }
            return size;
        }

    }

    public static void main (String[] args) throws java.lang.Exception {
        for(int r = 0; r < 10; r++) {
            final AtomicInteger counter = new AtomicInteger();
            final Storage<Integer> sss = new Storage<>(10);
            final CountDownLatch cdl = new CountDownLatch(100);

            long start = System.currentTimeMillis();
            for (int i = 0; i < 100; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 1000; j++) {
                        sss.put(counter.getAndIncrement(), ThreadLocalRandom.current().nextInt(10));
                    }
                    cdl.countDown();
                }).start();
            }
            cdl.await();
            System.out.println("ActualSize: " + sss.getSize() + " took:" + (System.currentTimeMillis() - start) + " ms");
        }
    }
}



100 потоков, каждый пишет по 1000 элементов, 10 раз пробуем. Результаты:
ВАРИАНТ 1(Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>())):
ActualSize: 100000 took:199 ms
ActualSize: 100000 took:124 ms
ActualSize: 100000 took:147 ms
ActualSize: 100000 took:71 ms
ActualSize: 100000 took:75 ms
ActualSize: 100000 took:80 ms
ActualSize: 100000 took:84 ms
ActualSize: 100000 took:86 ms
ActualSize: 100000 took:97 ms
ActualSize: 100000 took:68 ms

ВАРИАНТ 2(CopyOnWriteArraySet):
ActualSize: 100000 took:1755 ms
ActualSize: 100000 took:2166 ms
ActualSize: 100000 took:1868 ms
ActualSize: 100000 took:1600 ms
ActualSize: 100000 took:1465 ms
ActualSize: 100000 took:1477 ms
ActualSize: 100000 took:1539 ms
ActualSize: 100000 took:1563 ms
ActualSize: 100000 took:1601 ms
ActualSize: 100000 took:1601 ms
...
Рейтинг: 0 / 0
20.02.2017, 08:11
    #39407594
just_vladimir
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
no56892Дело было вечером, делать было нечего...

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

class Test {

    private static class Storage<T> {

        final private Set<T>[] list;

        @SuppressWarnings("unchecked")
        public Storage(final int size) {
	//ВАРИАНТ 1:
            list = Stream.generate(() -> Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>()))
                    .limit(size)
                    .toArray(Set[]::new);
	//ВАРИАНТ 2:
            /*list = Stream.generate(CopyOnWriteArraySet::new)
                    .limit(size)
                    .toArray(Set[]::new);*/
        }

        public boolean put(T element, int box) {
            return list[box].add(element);
        }

	//for test, NotThreadSafe
        public int getSize() {
            int size = 0;
            for(Set<T> set : list) {
                size += set.size();
            }
            return size;
        }

    }

    public static void main (String[] args) throws java.lang.Exception {
        for(int r = 0; r < 10; r++) {
            final AtomicInteger counter = new AtomicInteger();
            final Storage<Integer> sss = new Storage<>(10);
            final CountDownLatch cdl = new CountDownLatch(100);

            long start = System.currentTimeMillis();
            for (int i = 0; i < 100; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 1000; j++) {
                        sss.put(counter.getAndIncrement(), ThreadLocalRandom.current().nextInt(10));
                    }
                    cdl.countDown();
                }).start();
            }
            cdl.await();
            System.out.println("ActualSize: " + sss.getSize() + " took:" + (System.currentTimeMillis() - start) + " ms");
        }
    }
}



100 потоков, каждый пишет по 1000 элементов, 10 раз пробуем. Результаты:
ВАРИАНТ 1(Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>())):
ActualSize: 100000 took:199 ms
ActualSize: 100000 took:124 ms
ActualSize: 100000 took:147 ms
ActualSize: 100000 took:71 ms
ActualSize: 100000 took:75 ms
ActualSize: 100000 took:80 ms
ActualSize: 100000 took:84 ms
ActualSize: 100000 took:86 ms
ActualSize: 100000 took:97 ms
ActualSize: 100000 took:68 ms

ВАРИАНТ 2(CopyOnWriteArraySet):
ActualSize: 100000 took:1755 ms
ActualSize: 100000 took:2166 ms
ActualSize: 100000 took:1868 ms
ActualSize: 100000 took:1600 ms
ActualSize: 100000 took:1465 ms
ActualSize: 100000 took:1477 ms
ActualSize: 100000 took:1539 ms
ActualSize: 100000 took:1563 ms
ActualSize: 100000 took:1601 ms
ActualSize: 100000 took:1601 ms
RTFM
...
Рейтинг: 0 / 0
20.02.2017, 08:12
    #39407595
just_vladimir
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Многопоточные коллекции
just_vladimirno56892Дело было вечером, делать было нечего...

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

class Test {

    private static class Storage<T> {

        final private Set<T>[] list;

        @SuppressWarnings("unchecked")
        public Storage(final int size) {
	//ВАРИАНТ 1:
            list = Stream.generate(() -> Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>()))
                    .limit(size)
                    .toArray(Set[]::new);
	//ВАРИАНТ 2:
            /*list = Stream.generate(CopyOnWriteArraySet::new)
                    .limit(size)
                    .toArray(Set[]::new);*/
        }

        public boolean put(T element, int box) {
            return list[box].add(element);
        }

	//for test, NotThreadSafe
        public int getSize() {
            int size = 0;
            for(Set<T> set : list) {
                size += set.size();
            }
            return size;
        }

    }

    public static void main (String[] args) throws java.lang.Exception {
        for(int r = 0; r < 10; r++) {
            final AtomicInteger counter = new AtomicInteger();
            final Storage<Integer> sss = new Storage<>(10);
            final CountDownLatch cdl = new CountDownLatch(100);

            long start = System.currentTimeMillis();
            for (int i = 0; i < 100; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 1000; j++) {
                        sss.put(counter.getAndIncrement(), ThreadLocalRandom.current().nextInt(10));
                    }
                    cdl.countDown();
                }).start();
            }
            cdl.await();
            System.out.println("ActualSize: " + sss.getSize() + " took:" + (System.currentTimeMillis() - start) + " ms");
        }
    }
}



100 потоков, каждый пишет по 1000 элементов, 10 раз пробуем. Результаты:
ВАРИАНТ 1(Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>())):
ActualSize: 100000 took:199 ms
ActualSize: 100000 took:124 ms
ActualSize: 100000 took:147 ms
ActualSize: 100000 took:71 ms
ActualSize: 100000 took:75 ms
ActualSize: 100000 took:80 ms
ActualSize: 100000 took:84 ms
ActualSize: 100000 took:86 ms
ActualSize: 100000 took:97 ms
ActualSize: 100000 took:68 ms

ВАРИАНТ 2(CopyOnWriteArraySet):
ActualSize: 100000 took:1755 ms
ActualSize: 100000 took:2166 ms
ActualSize: 100000 took:1868 ms
ActualSize: 100000 took:1600 ms
ActualSize: 100000 took:1465 ms
ActualSize: 100000 took:1477 ms
ActualSize: 100000 took:1539 ms
ActualSize: 100000 took:1563 ms
ActualSize: 100000 took:1601 ms
ActualSize: 100000 took:1601 ms
RTFM
Хотя лучше вот
YouTube Video
...
Рейтинг: 0 / 0
Форумы / Java [игнор отключен] [закрыт для гостей] / Многопоточные коллекции / 20 сообщений из 20, страница 1 из 1
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]