Spark repartition和coalesce的区别

作者&投稿:支峰 (若有异议请与网页底部的电邮联系)
~ 有些时候,在很多partition的时候,我们想减少点partition的数量,不然写到HDFS上的文件数量也会很多很多。
我们使用reparation呢,还是coalesce。所以我们得了解这两个算子的内在区别。

要知道,repartition是一个消耗比较昂贵的操作算子,Spark出了一个优化版的repartition叫做coalesce,它可以尽量避免数据迁移,
但是你只能减少RDD的partition.

举个例子,有如下数据节点分布:

用coalesce,将partition减少到2个:

注意,Node1 和 Node3 不需要移动原始的数据

The repartition algorithm does a full shuffle and creates new partitions with data that’s distributed evenly.
Let’s create a DataFrame with the numbers from 1 to 12.

repartition 算法会做一个full shuffle然后均匀分布地创建新的partition。我们创建一个1-12数字的DataFrame测试一下。

刚开始数据是这样分布的:

我们做一个full shuffle,将其repartition为2个。

这是在我机器上数据分布的情况:
Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

The repartition method makes new partitions and evenly distributes the data in the new partitions (the data distribution is more even for larger data sets).
repartition方法让新的partition均匀地分布了数据(数据量大的情况下其实会更均匀)

coalesce用已有的partition去尽量减少数据shuffle。
repartition创建新的partition并且使用 full shuffle。
coalesce会使得每个partition不同数量的数据分布(有些时候各个partition会有不同的size)
然而,repartition使得每个partition的数据大小都粗略地相等。

coalesce 与 repartition的区别(我们下面说的coalesce都默认shuffle参数为false的情况)

repartition(numPartitions:Int):RDD[T]和coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T] repartition只是coalesce接口中shuffle为true的实现

有1w的小文件,资源也为--executor-memory 2g --executor-cores 2 --num-executors 5。
repartition(4):产生shuffle。这时会启动5个executor像之前介绍的那样依次读取1w个分区的文件,然后按照某个规则%4,写到4个文件中,这样分区的4个文件基本毫无规律,比较均匀。
coalesce(4):这个coalesce不会产生shuffle。那启动5个executor在不发生shuffle的时候是如何生成4个文件呢,其实会有1个或2个或3个甚至更多的executor在空跑(具体几个executor空跑与spark调度有关,与数据本地性有关,与spark集群负载有关),他并没有读取任何数据!

1.如果结果产生的文件数要比源RDD partition少,用coalesce是实现不了的,例如有4个小文件(4个partition),你要生成5个文件用coalesce实现不了,也就是说不产生shuffle,无法实现文件数变多。
2.如果你只有1个executor(1个core),源RDD partition有5个,你要用coalesce产生2个文件。那么他是预分partition到executor上的,例如0-2号分区在先executor上执行完毕,3-4号分区再次在同一个executor执行。其实都是同一个executor但是前后要串行读不同数据。与用repartition(2)在读partition上有较大不同(串行依次读0-4号partition 做%2处理)。

T表有10G数据 有100个partition 资源也为--executor-memory 2g --executor-cores 2 --num-executors 5。我们想要结果文件只有一个


平果县15688986136: Spark中repartition和coalesce的区别与使用场景解析 -
百顾纳科: repartition(numPartitions:Int):RDD[T]和coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T] 他们两个都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现,(假设RDD有N个分区,需要重新划分成M个分...

平果县15688986136: spark repartition 设置多少合适 -
百顾纳科: 当前任务可用集群资源core的2到3倍,可以最大化利用集群资源.

平果县15688986136: spark中saveAsTextFile如何最终生成一个文件 -
百顾纳科: 在该语句之前加上repartition(1),即写作以下形式: people.repartition(1).saveAsTextFile("out.txt")

平果县15688986136: spark 怎么控制数输出文件个个数 -
百顾纳科: numSplits:来自job.getNumMapTasks(),即在job启动时用org.apache.Hadoop.mapred.JobConf.setNumMapTasks(int n)设置的值,给M-R框架的Map数量的提示.goalSize:是输入总大小与提示Map task数量的比值,即期望每个Mapper处理多少的数据

平果县15688986136: 怎样理解spark中的partition和block的关系
百顾纳科: hdfs中的block是分布式存储的最小单元,类似于盛放文件的盒子,一个文件可能要占多个盒子,但一个盒子里的内容只可能来自同一份文件.假设block设置为128M,你的文件是250M,那么这份文件占3个block(128+128+2).这样的设计虽然会...

平果县15688986136: Spark中的aggregate和aggregateByKey的区别及疑惑 -
百顾纳科: 触发shuffle的常见算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等. 要解决数据倾斜的问题,首先要定位数据倾斜发生在什么地方,首先是哪个stage,直接在Web UI上看就可以,然后查看运行耗时的task,...

平果县15688986136: 怎样理解spark中的partition和block的关系 -
百顾纳科: map 是把 function 作用到每个 element,针对的是 element. mapPartitions 是把 function 作用到每个 partition,针对的是 partition 内部的 iterator.

平果县15688986136: spark技术栈有哪些组件 -
百顾纳科: 当下Hadoop的主要应用场景在归档、搜索引擎(老本家)及数据仓库上面,各个机构使用Hadoop不同的组件来实现自己的用例.而在这3个场景之外还有一个比较冷门的场景——流处理,这块源于Hadoop 2.0可结合其他框架的特性,而在将来,Hadoop肯定会发展到联机数据处理.

平果县15688986136: spark sortshuffle为什么要按partition排序 -
百顾纳科: 1, Spark Shuffle在最开始的时候只支持Hash-based Shuffle:默认Mapper阶段会为Reducer阶段的每一个Task单独创建一个文件来保存该Task中要使用的数据. 优点:就是操作数据简单. 缺点:但是在一些情况下(例如数据量非常大的情况)会造成大量文件(M*R,其中M代表Mapper中的所有的并行任务数量,R代表Reducer中所有的并行任务数据)大数据的随机磁盘I/O操作且会形成大量的Memory(极易造成OOM). 2,Hash-based Shuffle产生的问题: 第一:不能够处理大规模的数据 第二:Spark不能够运行在大规模的分布式集群上!

平果县15688986136: Spark求平均值的三种方法 -
百顾纳科: 方法一:利用groupByKey //求平均 方法一: groupByKeytextFile.mapToPair(line -> new Tuple2<>(line.split(" ")[0], Integer.parseInt(line.split(" ")[1]))).groupByKey().mapToPair(info -> {double sum = 0;double count = 0;Iterator<Integer> ...

本站内容来自于网友发表,不代表本站立场,仅表示其个人看法,不对其真实性、正确性、有效性作任何的担保
相关事宜请发邮件给我们
© 星空见康网