20200617 说说你理解的sparkRdd的 partion


说说你理解的sparkRdd的 partion(☆☆☆☆☆)

Spark中提供了通用的接口来抽象每个Rdd,这些接口包括

  1. 分区信息
  2. 依赖关系
  3. 函数,基于父Rdd的计算方法
  4. 划分策略和数据位置的元数据

举个🌰
一个HDFS文件的RDD将文件的每个文件块表示为一个分区,并且知道每个文件块的位置,对RDD 进行map之后分区将具有相同的划分

RDD分区的作用

一个HDFS文件的RDD将文件的每个文件块表示为一个分区,并且知道每个文件块的位置信息。这些对应着数据块的分区分布到集群的节点中,因此,分区的多少涉及对这个RDD进行并行计算的粒度。首先,分区是一个逻辑概念, 变换前后的新旧分区在物理上可能是同一块内存或者是存储。需要注意的是,如果没有指定分区数将使用默认值,而默认值是该程序所分配到CPU核数,如果是从HDFS文件创建,默认为文件的数据块数。

移动计算而不移动数据

在Spark形成任务有向无环图时,会尽可能地把计算分配到靠近数据的位置,减少数据的网络传输。当RDD分区被缓存, 则计算应该被发送到缓存分区所在的节点进行,另外,RDD的血统也会影响子RDD的位置,回溯RDD的血统,直到找到具有首选位置属性的父RDD,并据此决定子RDD的位置。

RDD分区函数

分区的划分对于shuffle类操作很关键,决定了该操作的父RDD和子RDD的依赖类型。比如之前提到的join操作,如果是协同划分的话,两个父RDD之间, 父RDD与子RDD之间能形成一致的分区安排。即同一个Key保证被映射到同一个分区,这样就是窄依赖。而如果不是协同划分,就会形成宽依赖。所谓的协同划分就是指定分区划分器以产生前后一致的分区安排。
Spark提供两种划分器,HashPartitioner (哈希分区划分器),(RangePartitioner) 范围分区划分器. 需要注意的是分区划分器只存在于PairRDD中,普通非(K,V)类型的Partitioner为None.

在以下程序中,首先构造一个MappedRDD,其partitioner的值为none,然后对RDD进行groupByKey操作group_rdd变量,对于groupByKey操作而言,这里创建了新的HashPartitioner对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
scala> var part=sc.textFile("file:/hadoop/spark/README.md")
part: org.apache.spark.rdd.RDD[String] = /hadoop/spark/README.md MapPartitionsRDD[12] at textFile at <console>:24
scala> part.partitioner
res11: Option[org.apache.spark.Partitioner] = None
val group_rdd=part.map(x=>(x,x)).groupByKey(new org.apache.spark.HashPartitioner(4))
group_rdd: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[16] scala> group_rdd.partitioner
res14: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@4)
at groupByKey at <console>:26
#查看每个分区的内容
scala> part.mapPartitionsWithIndex{(partid,iter)=>{
| var part_map=scala.collection.mutable.Map[String,List[String]]()
| var part_name="part_"+partid
| part_map(part_name)=List[String]()
| while(iter.hasNext){
| part_map(part_name):+=iter.next()}
| part_map.iterator}}.collect
res19: Array[(String, List[String])] = Array((part_0,List(# Apache Spark, "", Spark is a fast and general cluster computing system for Big Data. It provides, high-level APIs in Scala, Java, Python, and R, and an optimized engine that, supports general computation graphs for data analysis. It also supports a, rich set of higher-level tools including Spark SQL for SQL and DataFrames,, MLlib for machine learning, GraphX for graph processing,, and Spark Streaming for stream processing., "", <http://spark.apache.org/>, "", "", ## Online Documentation, "", You can find the latest Spark documentation, including a programming, guide, on the [project web page](http://spark.apache.org/documentation.html)., This README file only contains basic setup instructions., "", ## Building Spark, "", Spark ..

使用以下分区函数进行分区

  • coalesce(numpartitions:Int,shuffle:Boolean=false):RDD[T]

  • repartition(numPartitions:Int):RDD[T]

    coalesce和repartition都是对RDD进行重新分区。coalesce操作使用HashPartitioner进行重分区,第一个参数为重分区的数目,第二个为是否shuffle,默认情况为false。repartition操作是coalesce函数第二个参数为true的实现。如果分区的数目大于原来的分区数,那么必须指定shuffle参数为true,否则分区数不变。

  • glom():RDD[Array[T]]

    glom操作是RDD中的每一个分区所有类型为T的数据转变成元素类型为T的数组[Array[T]]

  • mapPartitions

    mapPartitions操作和map类似,只不过映射的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器,mapPartitionsWithIndex作用类似于mapPartitions,只是输入参数多了一个分区索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
scala> var rdd1=sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at makeRDD at <console>:24
#mapPartitions累加每个分区的数
scala> var rdd3=rdd1.mapPartitions{x=>{
| var result=List[Int]()
| var i=0
| while(x.hasNext){
| i+=x.next()}
| result.::(i).iterator}}
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[24] at mapPartitions at <console>:26
scala> rdd3.collect
res20: Array[Int] = Array(3, 12)
scala> var rdd2=rdd1.mapPartitionsWithIndex{
| (x,iter)=>{
| var result=List[String]()
| var i=0
| while(iter.hasNext){
| i+=iter.next()}
| result.::(x+"|"+i).iterator}}
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at mapPartitionsWithIndex at <console>:26
scala> rdd2.collect
res21: Array[String] = Array(0|3, 1|12)
  • partitionBy(partitioner:Partitioner):RDD[(K,V)]

partitionBy操作根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
scala> var rdd1=sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[28] at makeRDD at <console>:24
scala> var rdd2=rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[29] at partitionBy at <console>:26
#查看分区中的元素
scala> rdd2.mapPartitionsWithIndex{
     | (partIdx,iter)=>{
     | var part_map=scala.collection.mutable.Map[String,List[(Int,String)]]()
     | while(iter.hasNext){
     | var part_name="part_"+partIdx
     | var elem=iter.next()
     | if(part_map.contains(part_name)){
     | var elems=part_map(part_name)
     |  elems::=elem
     | part_map(part_name)=elems
     | }else{
     | part_map(part_name)=List[(Int,String)]{elem}
     | }}
     | part_map.iterator}}.collect
res23: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))

分区数的计算

  1. 通过scala 集合方式parallelize生成rdd,如, val rdd = sc.parallelize(1 to 10)这种方式下,如果在parallelize操作时没有指定分区数,则rdd的分区数 = sc.defaultParallelism

  2. 通过textFile方式生成的rdd,如val rdd = sc.textFile(“path/file”)

有两种情况:

  • 从本地文件file:///生成的rdd,操作时如果没有指定分区数,则默认分区数规则为:(按照官网的描述,本地file的分片规则,应该按照hdfs的block大小划分,但实测的结果是固定按照32M来分片,可能是bug,不过不影响使用因为spark能用所有hadoop接口支持的存储系统,所以spark textFile使用hadoop接口访问本地文件时和访问hdfs还是有区别的)rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions)

  • 从hdfs分布式文件系统hdfs://生成的rdd,操作时如果没有指定分区数,则默认分区数规则为:rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)

spark.default.parallelism的默认值(前提是配置文件spark-default.conf中没有显示的配置,如果配置了,则spark.default.parallelism = 配置的值)

参考

https://blog.csdn.net/justlpf/article/details/80107582
https://blog.csdn.net/weixin_37353303/article/details/86575171


文章作者: Callable
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Callable !
评论
  目录