powered by simpleCommunicator - 2.0.30     © 2024 Programmizd 02
Map
Форумы / Java [игнор отключен] [закрыт для гостей] / Itrator-to-Iterator transformation (Spark->mapPartitions())
13 сообщений из 13, страница 1 из 1
Itrator-to-Iterator transformation (Spark->mapPartitions())
    #40128852
hck2
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
spark, mapPartitions(), столкнулся с проблемой переполнения хипа практически в той же ситуации
https://stackoverflow.com/questions/42539315/apache-spark-effectively-using-mappartitions-in-java

там предложили вместо такой конструкции, которая в mapPartitions() набивает огромный ArrayList
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> {
    ArrayList<OutObj> out = new ArrayList<>();
    while(iter.hasNext()) {
        InObj current = iter.next();
        out.add(someChange(current));
    }
    return out.iterator();
});


такую
Код: java
1.
2.
3.
4.
5.
6.
rdd.mapPartitions((Iterator<InObj> iter) -> {
    Iterable<InObj> iterable = () -> iter;
    return StreamSupport.stream(iterable.spliterator(), false)
            .map(s -> transformRow(s)) // or whatever transformation
            .iterator();
});


вопрос - поможет ли вторая конструкция сберечь память ?
...
Рейтинг: 0 / 0
Itrator-to-Iterator transformation (Spark->mapPartitions())
    #40128875
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
hck2,

А попробовать не?

Ну во втором случае по идее мы приходим к lazy вычислению, и результат будет зависеть от терминальной операции, если ей для калькуляции нужна вся коллекция в любом случае, то от ООМ никуда не деться.
А зачем вообще юзается mapPartitions? Судя по коду там простая трансформация - так почему бы не использовать обычный map
...
Рейтинг: 0 / 0
Itrator-to-Iterator transformation (Spark->mapPartitions())
    #40128884
hck2
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
забыл ник

А попробовать не?

ну достаточно сложно. в реальной апликации сложно понять потому как спарк и с массивом прожевывает в стиле запустил - вылетел с ом, рестарт экзекьютера и оп, получилось. я полагаю, что на первую попытку в памяти экзекьютера что-то болтается, но после рестарта экзекьютера массив бывает влазит.
а синтетически - не умею максимальный хип замерять. умею дамп снимать, но это же в нужный момент надо сделать ... сложно.

забыл ник

Ну во втором случае по идее мы приходим к lazy вычислению, и результат будет зависеть от терминальной операции, если ей для калькуляции нужна вся коллекция в любом случае, то от ООМ никуда не деться.

в моем случае приходит длинная строка (ну в смысле в датасете миллионы строк), а в этой mapPartitions() строка парсится и в массив пихаются ошибки валидации строки. потом датасет пишет в паркет с struct полем (тот самый массив).
вот и мне кажется что по частям же return не отдаст, когда бы это код не выполнился. значит в какой-то момент ".map(s -> transformRow(s))" наплодит миллионы объектов

забыл ник

А зачем вообще юзается mapPartitions? Судя по коду там простая трансформация - так почему бы не использовать обычный map

код не мой, полагаю что думали это самый быстрый вариант, а памяти можно докинуть. я думал на UDF переделать, но они к гадалке не ходи - будут на порядок медленнее. простой map ... действительно у него пожалуй больше шансов на успех.
...
Рейтинг: 0 / 0
Itrator-to-Iterator transformation (Spark->mapPartitions())
    #40128885
SpringMan
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
hck2
вопрос - поможет ли вторая конструкция сберечь память ?

Должно сберечь. Просто flatMap то не вариант?
...
Рейтинг: 0 / 0
Itrator-to-Iterator transformation (Spark->mapPartitions())
    #40128890
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
IMHO. В спарк заложено штуки 4 стратегии использования памяти под RDD. Там типа memory, disk/memory, disk e.t.c.

Нужно работать используя данный подход. И еще там надо почитать про cache/persist. Я щас не помню но какая-то
разница есть.

Короче нет надежды что у worker хватит памяти отработать твою колбасу целиком.
...
Рейтинг: 0 / 0
Itrator-to-Iterator transformation (Spark->mapPartitions())
    #40128892
hck2
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
SpringMan
hck2
вопрос - поможет ли вторая конструкция сберечь память ?

Должно сберечь. Просто flatMap то не вариант?

flatMap вроде если мне на одну строку надо больше одной наплодить. у меня на одну строку хоть и много ошибок, но для спрака это одна ячейка со struct значением.


mayton
IMHO. В спарк заложено штуки 4 стратегии использования памяти под RDD. Там типа memory, disk/memory, disk e.t.c.

Нужно работать используя данный подход. И еще там надо почитать про cache/persist. Я щас не помню но какая-то
разница есть.

не, вроде не так. RDD это по любому в памяти. после вычисления RDD, его можно закешировать/заперсистить в memory, disk/memory и т.п. что бы напимер сделав банальный терминальный count() заново не пришлось бы вычислять.
то что у меня все ерроры валидации будут в памяти RDD это понятно, но сейчас, как я понимаю, кроме RDD они еще и дублируются в ArrayList<OutObj> out
...
Рейтинг: 0 / 0
Itrator-to-Iterator transformation (Spark->mapPartitions())
    #40128895
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Ты можешь это проверить. Возьми RDD который читается из CSV файла который многократно превышает
твою память или память воркера. И сделай count.
...
Рейтинг: 0 / 0
Itrator-to-Iterator transformation (Spark->mapPartitions())
    #40128896
hck2
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
mayton
Ты можешь это проверить. Возьми RDD который читается из CSV файла который многократно превышает
твою память или память воркера. И сделай count.

не понял идеи. если csv превышает и я сделаю repartition(1), что бы весь файл зачитывал один еднственный executor, он упадет. да.
в этом сомнений нет.
вот если бы было возможно подготовить такой файл, какой в структуры RDD влазит ровно один раз, а в виде RDD+ArrayList не влезет. но я хз как RDD хранит стринги. может более плотно чем ArrayList, а может и наоборот.
...
Рейтинг: 0 / 0
Itrator-to-Iterator transformation (Spark->mapPartitions())
    #40128898
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Чтоб не решать проблему XY.

Тебе надо просто сделать это

Код: java
1.
s -> transformRow(s)



для каждой строчки исходного датасета. Верно?
...
Рейтинг: 0 / 0
Itrator-to-Iterator transformation (Spark->mapPartitions())
    #40128899
hck2
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
mayton

для каждой строчки исходного датасета. Верно?

Верно.
...
Рейтинг: 0 / 0
Itrator-to-Iterator transformation (Spark->mapPartitions())
    #40128900
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Блин... Я готов признать как факт что ты видишь ошибку переполнения памяти.
Но я отказываюсь поверить в то что Spark не умеет трансформировать сеты без
оверхеда.

По поводу этой конструкции.

Код: java
1.
2.
3.
4.
5.
6.
rdd.mapPartitions((Iterator<InObj> iter) -> {
    Iterable<InObj> iterable = () -> iter;
    return StreamSupport.stream(iterable.spliterator(), false)
            .map(s -> transformRow(s)) // or whatever transformation
            .iterator();
});



Да я ее использовал. Только не в Spark. А в прикладном коде. Для превращения итератора в Stream.
Это работает и стрим не потребляет памяти. Но можно потерять параллелизм.
...
Рейтинг: 0 / 0
Itrator-to-Iterator transformation (Spark->mapPartitions())
    #40128905
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Несколько наводящих вопросов -
1) а почему все таки rdd? Если есть возможность я бы выкинул и заменил датасет/датафрейм
2) mapPartitions используется когда нужен некий общий контекст для всех строк партитишена, например коннекшен к Бд. В вашем случае такого не вижу, потому простой map и проще и безопаснее. Единственное, что могу предположить что в функции юзается какие-то регекспы или календарь, которые имеют Стейт и относительно дорогостоящи при создании. В таком случае может быть буст перформанса, но это такое...
3) что за ошибки то могут быть связаны со строкой? Может там стектрейс в строку хреначится на 10мб для каждой )))
...
Рейтинг: 0 / 0
Itrator-to-Iterator transformation (Spark->mapPartitions())
    #40128931
hck2
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
mayton

Да я ее использовал. Только не в Spark. А в прикладном коде. Для превращения итератора в Stream.
Это работает и стрим не потребляет памяти. Но можно потерять параллелизм.

спасибо! собственно я именно за этим ответом приходил
значит пробовать смысл есть.

забыл ник
Несколько наводящих вопросов -
1) а почему все таки rdd? Если есть возможность я бы выкинул и заменил датасет/датафрейм
2) mapPartitions используется когда нужен некий общий контекст для всех строк партитишена, например коннекшен к Бд. В вашем случае такого не вижу, потому простой map и проще и безопаснее. Единственное, что могу предположить что в функции юзается какие-то регекспы или календарь, которые имеют Стейт и относительно дорогостоящи при создании. В таком случае может быть буст перформанса, но это такое...
3) что за ошибки то могут быть связаны со строкой? Может там стектрейс в строку хреначится на 10мб для каждой )))


1) я то ипользую датасет api, но под низом у спарка это все равно RDD. я себе представлял данные в spark как набор байтов в объекте RDD, потому казалось, что у спарка не получится сделать ссылку из RDD на мой ArrayList, ему придется deep copy в RDD делать. а это представление у меня сложилось потому, что у спарка есть проект Tungsten, который научил спарк хранить данные в off-heap.
2) да, вытягивается конфигурация описывающая колонки соответствуящая файлику и подбирается класс для парсера
3) я думаю в партицию могут попасть миллионы строк, а у строки оказалось могут быть сотни ошибок. оказалось источник может набить строки непечатными символами, каждый символ запись в массивчике.
...
Рейтинг: 0 / 0
13 сообщений из 13, страница 1 из 1
Форумы / Java [игнор отключен] [закрыт для гостей] / Itrator-to-Iterator transformation (Spark->mapPartitions())
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]