|
Itrator-to-Iterator transformation (Spark->mapPartitions())
|
|||
---|---|---|---|
#18+
spark, mapPartitions(), столкнулся с проблемой переполнения хипа практически в той же ситуации https://stackoverflow.com/questions/42539315/apache-spark-effectively-using-mappartitions-in-java там предложили вместо такой конструкции, которая в mapPartitions() набивает огромный ArrayList Код: java 1. 2. 3. 4. 5. 6. 7. 8.
такую Код: java 1. 2. 3. 4. 5. 6.
вопрос - поможет ли вторая конструкция сберечь память ? ... |
|||
:
Нравится:
Не нравится:
|
|||
23.01.2022, 20:39 |
|
Itrator-to-Iterator transformation (Spark->mapPartitions())
|
|||
---|---|---|---|
#18+
hck2, А попробовать не? Ну во втором случае по идее мы приходим к lazy вычислению, и результат будет зависеть от терминальной операции, если ей для калькуляции нужна вся коллекция в любом случае, то от ООМ никуда не деться. А зачем вообще юзается mapPartitions? Судя по коду там простая трансформация - так почему бы не использовать обычный map ... |
|||
:
Нравится:
Не нравится:
|
|||
23.01.2022, 22:25 |
|
Itrator-to-Iterator transformation (Spark->mapPartitions())
|
|||
---|---|---|---|
#18+
забыл ник А попробовать не? ну достаточно сложно. в реальной апликации сложно понять потому как спарк и с массивом прожевывает в стиле запустил - вылетел с ом, рестарт экзекьютера и оп, получилось. я полагаю, что на первую попытку в памяти экзекьютера что-то болтается, но после рестарта экзекьютера массив бывает влазит. а синтетически - не умею максимальный хип замерять. умею дамп снимать, но это же в нужный момент надо сделать ... сложно. забыл ник Ну во втором случае по идее мы приходим к lazy вычислению, и результат будет зависеть от терминальной операции, если ей для калькуляции нужна вся коллекция в любом случае, то от ООМ никуда не деться. в моем случае приходит длинная строка (ну в смысле в датасете миллионы строк), а в этой mapPartitions() строка парсится и в массив пихаются ошибки валидации строки. потом датасет пишет в паркет с struct полем (тот самый массив). вот и мне кажется что по частям же return не отдаст, когда бы это код не выполнился. значит в какой-то момент ".map(s -> transformRow(s))" наплодит миллионы объектов забыл ник А зачем вообще юзается mapPartitions? Судя по коду там простая трансформация - так почему бы не использовать обычный map код не мой, полагаю что думали это самый быстрый вариант, а памяти можно докинуть. я думал на UDF переделать, но они к гадалке не ходи - будут на порядок медленнее. простой map ... действительно у него пожалуй больше шансов на успех. ... |
|||
:
Нравится:
Не нравится:
|
|||
23.01.2022, 23:07 |
|
Itrator-to-Iterator transformation (Spark->mapPartitions())
|
|||
---|---|---|---|
#18+
hck2 вопрос - поможет ли вторая конструкция сберечь память ? Должно сберечь. Просто flatMap то не вариант? ... |
|||
:
Нравится:
Не нравится:
|
|||
23.01.2022, 23:28 |
|
Itrator-to-Iterator transformation (Spark->mapPartitions())
|
|||
---|---|---|---|
#18+
IMHO. В спарк заложено штуки 4 стратегии использования памяти под RDD. Там типа memory, disk/memory, disk e.t.c. Нужно работать используя данный подход. И еще там надо почитать про cache/persist. Я щас не помню но какая-то разница есть. Короче нет надежды что у worker хватит памяти отработать твою колбасу целиком. ... |
|||
:
Нравится:
Не нравится:
|
|||
23.01.2022, 23:53 |
|
Itrator-to-Iterator transformation (Spark->mapPartitions())
|
|||
---|---|---|---|
#18+
SpringMan hck2 вопрос - поможет ли вторая конструкция сберечь память ? Должно сберечь. Просто flatMap то не вариант? flatMap вроде если мне на одну строку надо больше одной наплодить. у меня на одну строку хоть и много ошибок, но для спрака это одна ячейка со struct значением. mayton IMHO. В спарк заложено штуки 4 стратегии использования памяти под RDD. Там типа memory, disk/memory, disk e.t.c. Нужно работать используя данный подход. И еще там надо почитать про cache/persist. Я щас не помню но какая-то разница есть. не, вроде не так. RDD это по любому в памяти. после вычисления RDD, его можно закешировать/заперсистить в memory, disk/memory и т.п. что бы напимер сделав банальный терминальный count() заново не пришлось бы вычислять. то что у меня все ерроры валидации будут в памяти RDD это понятно, но сейчас, как я понимаю, кроме RDD они еще и дублируются в ArrayList<OutObj> out ... |
|||
:
Нравится:
Не нравится:
|
|||
24.01.2022, 00:21 |
|
Itrator-to-Iterator transformation (Spark->mapPartitions())
|
|||
---|---|---|---|
#18+
Ты можешь это проверить. Возьми RDD который читается из CSV файла который многократно превышает твою память или память воркера. И сделай count. ... |
|||
:
Нравится:
Не нравится:
|
|||
24.01.2022, 00:36 |
|
Itrator-to-Iterator transformation (Spark->mapPartitions())
|
|||
---|---|---|---|
#18+
mayton Ты можешь это проверить. Возьми RDD который читается из CSV файла который многократно превышает твою память или память воркера. И сделай count. не понял идеи. если csv превышает и я сделаю repartition(1), что бы весь файл зачитывал один еднственный executor, он упадет. да. в этом сомнений нет. вот если бы было возможно подготовить такой файл, какой в структуры RDD влазит ровно один раз, а в виде RDD+ArrayList не влезет. но я хз как RDD хранит стринги. может более плотно чем ArrayList, а может и наоборот. ... |
|||
:
Нравится:
Не нравится:
|
|||
24.01.2022, 00:53 |
|
Itrator-to-Iterator transformation (Spark->mapPartitions())
|
|||
---|---|---|---|
#18+
Чтоб не решать проблему XY. Тебе надо просто сделать это Код: java 1.
для каждой строчки исходного датасета. Верно? ... |
|||
:
Нравится:
Не нравится:
|
|||
24.01.2022, 00:59 |
|
Itrator-to-Iterator transformation (Spark->mapPartitions())
|
|||
---|---|---|---|
#18+
mayton для каждой строчки исходного датасета. Верно? Верно. ... |
|||
:
Нравится:
Не нравится:
|
|||
24.01.2022, 01:03 |
|
Itrator-to-Iterator transformation (Spark->mapPartitions())
|
|||
---|---|---|---|
#18+
Блин... Я готов признать как факт что ты видишь ошибку переполнения памяти. Но я отказываюсь поверить в то что Spark не умеет трансформировать сеты без оверхеда. По поводу этой конструкции. Код: java 1. 2. 3. 4. 5. 6.
Да я ее использовал. Только не в Spark. А в прикладном коде. Для превращения итератора в Stream. Это работает и стрим не потребляет памяти. Но можно потерять параллелизм. ... |
|||
:
Нравится:
Не нравится:
|
|||
24.01.2022, 01:08 |
|
Itrator-to-Iterator transformation (Spark->mapPartitions())
|
|||
---|---|---|---|
#18+
Несколько наводящих вопросов - 1) а почему все таки rdd? Если есть возможность я бы выкинул и заменил датасет/датафрейм 2) mapPartitions используется когда нужен некий общий контекст для всех строк партитишена, например коннекшен к Бд. В вашем случае такого не вижу, потому простой map и проще и безопаснее. Единственное, что могу предположить что в функции юзается какие-то регекспы или календарь, которые имеют Стейт и относительно дорогостоящи при создании. В таком случае может быть буст перформанса, но это такое... 3) что за ошибки то могут быть связаны со строкой? Может там стектрейс в строку хреначится на 10мб для каждой ))) ... |
|||
:
Нравится:
Не нравится:
|
|||
24.01.2022, 03:02 |
|
Itrator-to-Iterator transformation (Spark->mapPartitions())
|
|||
---|---|---|---|
#18+
mayton Да я ее использовал. Только не в Spark. А в прикладном коде. Для превращения итератора в Stream. Это работает и стрим не потребляет памяти. Но можно потерять параллелизм. спасибо! собственно я именно за этим ответом приходил значит пробовать смысл есть. забыл ник Несколько наводящих вопросов - 1) а почему все таки rdd? Если есть возможность я бы выкинул и заменил датасет/датафрейм 2) mapPartitions используется когда нужен некий общий контекст для всех строк партитишена, например коннекшен к Бд. В вашем случае такого не вижу, потому простой map и проще и безопаснее. Единственное, что могу предположить что в функции юзается какие-то регекспы или календарь, которые имеют Стейт и относительно дорогостоящи при создании. В таком случае может быть буст перформанса, но это такое... 3) что за ошибки то могут быть связаны со строкой? Может там стектрейс в строку хреначится на 10мб для каждой ))) 1) я то ипользую датасет api, но под низом у спарка это все равно RDD. я себе представлял данные в spark как набор байтов в объекте RDD, потому казалось, что у спарка не получится сделать ссылку из RDD на мой ArrayList, ему придется deep copy в RDD делать. а это представление у меня сложилось потому, что у спарка есть проект Tungsten, который научил спарк хранить данные в off-heap. 2) да, вытягивается конфигурация описывающая колонки соответствуящая файлику и подбирается класс для парсера 3) я думаю в партицию могут попасть миллионы строк, а у строки оказалось могут быть сотни ошибок. оказалось источник может набить строки непечатными символами, каждый символ запись в массивчике. ... |
|||
:
Нравится:
Не нравится:
|
|||
24.01.2022, 10:19 |
|
|
start [/forum/topic.php?fid=59&msg=40128899&tid=2120263]: |
0ms |
get settings: |
10ms |
get forum list: |
6ms |
check forum access: |
1ms |
check topic access: |
1ms |
track hit: |
35ms |
get topic data: |
2ms |
get forum data: |
1ms |
get page messages: |
262ms |
get tp. blocked users: |
0ms |
others: | 372ms |
total: | 690ms |
0 / 0 |