转载地址:www.jianshu.com/p/027dd699b6d9
案例来源:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
Spark算子大致上可分三大类算子:
1、Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Value型的数据。
2、Key-Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据。
3、Action算子,这类算子会触发SparkContext提交作业。
一、Value型Transformation算子
1)map:map是对RDD中的每个元素都执行一个指定函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length) val c = a.zip(b)c.collect res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
2)flatMap:与map类似,将原RDD中的每个元素通过函数f转换为新的元素,并将这些元素放入一个集合,构成新的RDD
val a = sc.parallelize(1 to 10, 5) a.flatMap(1 to _).collect res47: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect res85: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)
3)mapPartiions:mapPartitions是map的一个变种。map的输入函数应用于RDD中的每个元素,而mapPartitions的输入函数应用于每个分区,也就是把每个分区中的内容作为整体来处理的。
val x = sc.parallelize(1 to 10, 3) x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
4)glom(形成一个Array数组):将每个分区内的元素组成一个数组,分区不变
val a = sc.parallelize(1 to 100, 3) a.glom.collect res8: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66), Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100))
5)union:合并同一数据类型元素,但不去重。合并后返回同类型的数据元素
val a = sc.parallelize(1 to 3, 1) val b = sc.parallelize(5 to 7, 1) (a ++ b).collect res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)
6)cartesian(笛卡尔操作):对输入RDD内的所有元素计算笛卡尔积
val x = sc.parallelize(List(1,2,3,4,5)) val y = sc.parallelize(List(6,7,8,9,10)) x.cartesian(y).collect res0: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10))
7)groupBy(生成相应的key,相同的放在一起)
val a = sc.parallelize(1 to 9, 3) a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7, 9)))
8)filter
val a = sc.parallelize(1 to 10, 3) val b = a.filter(_ % 2 == 0) b.collect res3: Array[Int] = Array(2, 4, 6, 8, 10)
9)distinct(去重)
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.distinct.collect res6: Array[String] = Array(Dog, Gnu, Cat, Rat)
10)subtract(去掉含有重复的项)
val a = sc.parallelize(1 to 9, 3) val b = sc.parallelize(1 to 3, 3) val c = a.subtract(b) c.collect res3: Array[Int] = Array(6, 9, 4, 7, 5, 8)
11)sample
val a = sc.parallelize(1 to 10000, 3) a.sample(false, 0.1, 0).count res24: Long = 960
12)takesample
val x = sc.parallelize(1 to 1000, 3) x.takeSample(true, 100, 1) res3: Array[Int] = Array(339, 718, 810, 105, 71, 268, 333, 360, 341, 300, 68, 848, 431, 449, 773, 172, 802, 339, 431, 285, 937, 301, 167, 69, 330, 864, 40, 645, 65, 349, 613, 468, 982, 314, 160, 675, 232, 794, 577, 571, 805, 317, 136, 860, 522, 45, 628, 178, 321, 482, 657, 114, 332, 728, 901, 290, 175, 876, 227, 130, 863, 773, 559, 301, 694, 460, 839, 952, 664, 851, 260, 729, 823, 880, 792, 964, 614, 821, 683, 364, 80, 875, 813, 951, 663, 344, 546, 918, 436, 451, 397, 670, 756, 512, 391, 70, 213, 896, 123, 858)
13)cache、persist
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.getStorageLevel res0: org.apache.spark.storage.StorageLevel = StorageLevel(false, false, false, false, 1) c.cache c.getStorageLevel res2: org.apache.spark.storage.StorageLevel = StorageLevel(false, true, false, true, 1)
二、Key-Value型Transformation算子
处理数据类型为Key-Value的Transmation算子,大致可以分为三类:
1.输入输出分区1对1
2.聚集操作
3.连接
输入输出分区1对1
1)mapValues:顾名思义就是输入函数应用于RDD中KV(Key-Value)类型元素中的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为Key-Value对的RDD
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.mapValues("x" + _ + "x").collect res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))
聚集操作-对一个RDD聚集
2)combineByKey:与reduceByKey类似,相当于将元素(int, int)KV对变换为(int, Seq[int])新的KV对
val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3) val c = b.zip(a) val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y) d.collect res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))
3)reduceByKey:对元素为KV对的RDD中Key相同的元素的Value进行reduce操作,即两个值合并为一个值。因此,Key相同的多个元素的值被合并为一个值,然后与原RDD中的Key组成一个新的KV对
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey(_ + _).collect res86: Array[(Int, String)] = Array((3,dogcatowlgnuant)) val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey(_ + _).collect res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))
聚集操作-对一个RDD聚集
4)partitionBy:根据KV对的Key对RDD进行分区
(对RDD进行分区操作)
聚集操作-对两个RDD聚集
5)cogroup:一组强大的函数,可以对多达3个RDD根据key进行分组,将每个Key相同的元素分别聚集为一个集合
val a = sc.parallelize(List(1, 2, 1, 3), 1) val b = a.map((_, "b")) val c = a.map((_, "c")) b.cogroup(c).collect res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array( (2,(ArrayBuffer(b),ArrayBuffer(c))), (3,(ArrayBuffer(b),ArrayBuffer(c))), (1,(ArrayBuffer(b, b),ArrayBuffer(c, c))) )
连接
6)join:本质是对两个含有KV对元素的RDD进行coGroup算子协同划分,再通过flatMapValues将合并的数据分散。
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val d = c.keyBy(_.length) b.join(d).collect res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))
7)leftOutJoin:相当于在join基础上判断一侧的RDD是否为空,如果为空,则填充空,如果有数据,则将数据进行连接计算,然后返回结果。
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val d = c.keyBy(_.length) b.leftOuterJoin(d).collect res1: Array[(Int, (String, Option[String]))] = Array((6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))), (3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))), (8,(elephant,None)))
8)rightOutJoin:相当于在join基础上判断一侧的RDD是否为空,如果为空,则填充空,如果有数据,则将数据进行连接计算,然后返回结果。
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val d = c.keyBy(_.length) b.rightOuterJoin(d).collect res2: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), (3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)), (4,(None,wolf)), (4,(None,bear)))
三、Actions算子
Action算子—无输出
Action算子—HDFS
Action算子—Scala集合及数据类型
1)foreach:
是对RDD中的每个元素执行无参数的f函数,返回Unit。定义如下:
def foreach(f: T => Unit)
val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3) c.foreach(x => println(x + "s are yummy")) lions are yummy gnus are yummy crocodiles are yummy ants are yummy whales are yummy dolphins are yummy spiders are yummy
2)saveAsTextFile:函数将RDD保存为文本至HDFS指定目录,每次输出一行。
val a = sc.parallelize(1 to 10000, 3) a.saveAsTextFile("mydata_a") 14/04/03 21:11:36 INFO FileOutputCommitter: Saved output of task 'attempt_201404032111_0000_m_000002_71' to file:/home/cloudera/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a
3)saveAsObjectFile:将RDD分区中每10个元素保存为一个数组并将其序列化,映射为(null, BytesWritable(Y))的元素,以SequenceFile的格式写入HDFS
val x = sc.parallelize(1 to 100, 3) x.saveAsObjectFile("objFile") val y = sc.objectFile[Int]("objFile") y.collect res52: Array[Int] = Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
4)collect:将RDD分散存储的元素转换为单机上的Scala数组并返回,类似于toArray功能
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.collect res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)
5)collectAsMap:与collect类似,将元素类型为key-value对的RDD,转换为Scala Map并返回,保存元素的KV结构。
val a = sc.parallelize(List(1, 2, 1, 3), 1) val b = a.zip(a) b.collectAsMap res1: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)
6)reduceByKeyLocally:先reduce,然后collectAsMap。
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey(_ + _).collect res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))
7)lookup:扫描RDD的所有元素,选择与参数匹配的Key,并将其Value以Scala sequence的形式返回
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.lookup(5) res0: Seq[String] = WrappedArray(tiger, eagle)
8)count:返回RDD中的元素个数。
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.count res2: Long = 4
9)top:返回RDD中最大/最小的K个元素。
val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2) c.top(2) res28: Array[Int] = Array(9, 8)
10)reduce:对RDD中的所有元素进行reduceLeft操作。
val a = sc.parallelize(1 to 100, 3) a.reduce(_ + _) res41: Int = 5050
11)fold:与reduce类似,不同的是每次对分区内的value聚集时,分区内初始化的值为zero value。
val a = sc.parallelize(List(1,2,3), 3) a.fold(0)(_ + _) res59: Int = 6
12)aggregate:允许用户对RDD使用两个不同的reduce函数,第一个reduce函数对各个分区内的数据聚集,每个分区得到一个结果。第二个reduce函数对每个分区的结果进行聚集,最终得到一个总的结果。aggregate相当于对RDD内的元素数据归并聚集,且这种聚集是可以并行的。而fold与reduced的聚集是串行的。
val z = sc.parallelize(List(1,2,3,4,5,6), 2) // lets first print out the contents of the RDD with partition labels def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = { iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } z.mapPartitionsWithIndex(myfunc).collect res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6]) z.aggregate(0)(math.max(_, _), _ + _) res40: Int = 9
参考:http://www.ithao123.cn/content-2407104.html
Comments
comments powered by zero