powered by simpleCommunicator - 2.0.53     © 2025 Programmizd 02
Форумы / NoSQL, Big Data [игнор отключен] [закрыт для гостей] / Spark 2.0
12 сообщений из 12, страница 1 из 1
Spark 2.0
    #39511924
мигель1
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Привет!
RDD, DataFrame, DataSet в чем разница?
...
Рейтинг: 0 / 0
Spark 2.0
    #39512168
Yo.!
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
...
Рейтинг: 0 / 0
Spark 2.0
    #39512176
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
тут лучше имхо, вместе с недостатками каждой модели - https://stackoverflow.com/questions/37301226/difference-between-dataset-api-and-dataframe
...
Рейтинг: 0 / 0
Spark 2.0
    #39543537
JJZ
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Эксперты по Apache Spark есть вопрос:
я правильно понимаю, что Spark внутри себя все равно использует подход MAP-SHUFFLE-REDUCE для вычислений в кластере, только
1) операции lazy(пытается оптимизировать весь dataflow до начала выполнения);
2) старается сделать по максимуму в оперативной памяти без сброса на диск промежуточных вычислений

Спасибо
...
Рейтинг: 0 / 0
Spark 2.0
    #39562321
Фотография dbms_photoshop
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
JJZя правильно понимаю, что Spark внутри себя все равно использует подход MAP-SHUFFLE-REDUCE для вычислений в кластере
Учи английский. На эту тему куча материалов в сети, книг, курсов (на coursera и прочих) и даже роликов на ютубе для самых ленивых.

Попробую помочь стартануть.
Запускаешь spark-shell (да, для того, чтоб поигаться с данными не надо билдить код и использовать spark-submit).
После запуска будет строка следующего содержания
Код: plaintext
Spark context Web UI available at http://

По этому адресу можно будет смотреть визуализированную информацию про выполнение
Understanding your Apache Spark Application Through Visualization .
Наример, если требуется выполнить SQL, то строится DAG.
Выполнение по факту состоит из jobs, они разбиваются на stages, а те в свою очередь на tasks.

Предположим, что создана таблица t (DDL например тут 20818940 ), тогда можно выполнить такой код
Код: sql
1.
2.
3.
4.
5.
6.
7.
8.
9.
val df = spark.sql("select type, value from t")
df.rdd.toDebugString
df.explain(true)
df.show

val df = spark.sql("select type, count(*) from t group by type")
df.rdd.toDebugString
df.explain(true)
df.show



df.rdd.toDebugString показывает RDD Lineage
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
scala> df.rdd.toDebugString
res0: String =
(2) MapPartitionsRDD[4] at rdd at <console>:26 []
 |  MapPartitionsRDD[3] at rdd at <console>:26 []
 |  MapPartitionsRDD[2] at rdd at <console>:26 []
 |  FileScanRDD[1] at rdd at <console>:26 []

scala> df.rdd.toDebugString
res3: String =
(400) MapPartitionsRDD[14] at rdd at <console>:26 []
  |   MapPartitionsRDD[13] at rdd at <console>:26 []
  |   MapPartitionsRDD[12] at rdd at <console>:26 []
  |   ShuffledRowRDD[11] at rdd at <console>:26 []
  +-(2) MapPartitionsRDD[10] at rdd at <console>:26 []
     |  MapPartitionsRDD[9] at rdd at <console>:26 []
     |  FileScanRDD[8] at rdd at <console>:26 []
Ключевое различие между двумя примерами, что для группировки выполняется Shuffle по нодам, где она выполняется.
Для простого сканирования в Shuffle нет необходимости.

df.explain(true) показывает
Код: plaintext
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
scala> df.explain(true)
== Parsed Logical Plan ==
'Project ['type, 'value]
+- 'UnresolvedRelation `t`

== Analyzed Logical Plan ==
type: string, value: int
Project [type#20, value#21]
+- SubqueryAlias t
   +- Relation[position#18,id#19,type#20,value#21] parquet

== Optimized Logical Plan ==
Project [type#20, value#21]
+- Relation[position#18,id#19,type#20,value#21] parquet

== Physical Plan ==
*Project [type#20, value#21]
+- *FileScan parquet t[type#20,value#21] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://.../t],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<type:string,value:int>
Часто говоря о SQL data frames хватился Catalyst Optimizer ( Deep Dive into Spark SQL’s Catalyst Optimizer ).
Соответственно смотря на "Analyzed Logical Plan" vs "Optimized Logical Plan" можно видеть что было оптимизировано.
В моем примере - ничего.

Если интересует более глубокое погружение - можно смотреть конкретный сгенерированный код
Код: sql
1.
2.
import org.apache.spark.sql.execution.debug._
df.debugCodegen


JJZ1) операции lazy(пытается оптимизировать весь dataflow до начала выполнения);Lazy имеет перпендикулярное отношение к спарку и его оптимизациям.
lazy это концепт из scala. Означает, что значение вычисляет тогда когда нужен его результат, а не тогда когда оно определено в коде.

JJZ2) старается сделать по максимуму в оперативной памяти без сброса на диск промежуточных вычисленийОн не старается по максимуму, а просто все делает в памяти. Если памяти не хватает, то валится out of memory exception.
...
Рейтинг: 0 / 0
Spark 2.0
    #39562322
Фотография dbms_photoshop
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
dbms_photoshopВыполнение по факту состоит из jobs, они разбиваются на stages, а те в свою очередь на tasks.Прикрепленный файл показывает картину для приведенного примера.
Первый запрос 1 stage (stage 0), второй - 2 stages (stage 1 and stage 2)

Число tasks было такое же как и число stages.

Простыми словами число jobs и stages зависит от сложности логики. Чем замысловатее логика - тем сложнее DAG.

Число tasks во многом определяется данными.
Если много partitions, то может быть огромное число tasks при сканировании.
Или же если много данных, то при shuffle число tasks может достигать числа nodes в кластере.
...
Рейтинг: 0 / 0
Spark 2.0
    #39564793
Фотография Apex
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
dbms_photoshoplazy это концепт из scala.

Только появилась эта концепция и была реализована задолго до Scala.

dbms_photoshopОн не старается по максимуму, а просто все делает в памяти. Если памяти не хватает, то валится out of memory exception.
Это не совсем так, ряд операций могут писать временные данные на диск, если памяти не хватает.
...
Рейтинг: 0 / 0
Spark 2.0
    #39564794
Фотография dbms_photoshop
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Apexdbms_photoshoplazy это концепт из scala.

Только появилась эта концепция и была реализована задолго до Scala.Ответ был дан в контексте дискуссии, хотя может стоило уточнить.
Apexdbms_photoshopОн не старается по максимуму, а просто все делает в памяти. Если памяти не хватает, то валится out of memory exception.
Это не совсем так, ряд операций могут писать временные данные на диск, если памяти не хватает.Можно детальнее со ссылками?
Походу есть оптимизация, если результат shuffle используется многократно, то может быть записан на диск тынц .
...
Рейтинг: 0 / 0
Spark 2.0
    #39568050
Фотография Apex
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
dbms_photoshopМожно детальнее со ссылками?
Apache Spark FAQ
Does my data need to fit in memory to use Spark?No. Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data. Likewise, cached datasets that do not fit in memory are either spilled to disk or recomputed on the fly when needed, as determined by the RDD's storage level.
...
Рейтинг: 0 / 0
Spark 2.0
    #39568678
Фотография dbms_photoshop
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Apexdbms_photoshopМожно детальнее со ссылками?
Apache Spark FAQ
Does my data need to fit in memory to use Spark?No. Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data. Likewise, cached datasets that do not fit in memory are either spilled to disk or recomputed on the fly when needed, as determined by the RDD's storage level.
Я думаю это относится к явному вызову cache() или persist().

Если же памяти не хватает для выполнения запроса то либо прервется всё выполнение с чем-то типа
Код: plaintext
Container killed by YARN for exceeding memory limits.
либо будут отваливаться контейнеры, например так
Код: plaintext
Container killed on request. Exit code is 143
либо еще как-то в зависимости от специфики выполняемого и конфигурации
(в моем случае менеджер ресурсов YARN)
...
Рейтинг: 0 / 0
Spark 2.0
    #39568736
Фотография Apex
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
dbms_photoshopApexпропущено...

Apache Spark FAQ
пропущено...
Я думаю это относится к явному вызову cache() или persist().


cache() или persist() используют storage memory, однако данные из execution memory тоже могут быть сброшены на диск. Косвенное подтверждение можно найти здесь:

https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L138

dbms_photoshopЕсли же памяти не хватает для выполнения запроса то либо прервется всё выполнение с чем-то типа
Код: plaintext
Container killed by YARN for exceeding memory limits.
либо будут отваливаться контейнеры, например так
Код: plaintext
Container killed on request. Exit code is 143
либо еще как-то в зависимости от специфики выполняемого и конфигурации
(в моем случае менеджер ресурсов YARN)
Это вообще не из той оперы, такое случается когда менеджер ресурсов YARN не может дать больше, чем просит Spark. К проблемам OOM самого Spark это не имеет отношения.
...
Рейтинг: 0 / 0
Spark 2.0
    #39569013
Фотография dbms_photoshop
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
ApexЭто вообще не из той оперы, такое случается когда менеджер ресурсов YARN не может дать больше, чем просит Spark. К проблемам OOM самого Spark это не имеет отношения.Ну воспроизвести довольно просто.
Создаем табличку с 1М строк и запрос с картезианцем и аналитикой без партиционирования.
Код: sql
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
create table t_big as
select * from
(with t10 as
(
select 0 id union all select 1 union all select 2 union all select 3 union all select 4
union all select 5 id union all select 6 union all select 7 union all select 8 union all select 9
),
t1000 as
(select t1.id * 1e0 + t2.id * 1e1 + t3.id * 1e2 id from t10 t1, t10 t2, t10 t3)
select t1.id + t2.id * 1e3 id, lpad('x', 1e3, 'x') padding
  from t1000 t1, t1000 t2) t
order by 1;


Код: sql
1.
2.
3.
4.
5.
6.
7.
8.
9.
spark.sql("""
select *
from
(select id, padding, rn
  from (select tt.*, row_number() over (order by id asc) rn from t_big tt) x) a
cross join  
(select id, padding, rn
  from (select tt.*, row_number() over (order by id asc) rn from t_big tt) x) b
""").show(1)


Код: plaintext
spark-shell --driver-memory 600M --executor-memory 600M
Код: sql
1.
2.
3.
4.
ERROR cluster.YarnScheduler: Lost executor nnn on qqq.com: Container marked as failed: container_eee on host: qqq.com. Exit status: 143.
Diagnostics: Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Killed by external signal


Код: plaintext
spark-shell --driver-memory 10G --executor-memory 10G
Код: sql
1.
2.
3.
4.
5.
6.
+---+--------------------+---+---+--------------------+---+
| id|             padding| rn| id|             padding| rn|
+---+--------------------+---+---+--------------------+---+
|  0|xxxxxxxxxxxxxxxxx...|  1|  0|xxxxxxxxxxxxxxxxx...|  1|
+---+--------------------+---+---+--------------------+---+
only showing top 1 row


https://gsamaras.wordpress.com/code/spark-container-exited-with-a-non-zero-exit-code-143/
...
Рейтинг: 0 / 0
12 сообщений из 12, страница 1 из 1
Форумы / NoSQL, Big Data [игнор отключен] [закрыт для гостей] / Spark 2.0
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]