项目github地址:bitcarmanlee easy-algorithm-interview-and-practice
欢迎大家star,留言,一起学习进步
日常工作中,排序是道绕过不过去的侃,我们每天都会面对各种各样的排序需求。那么在spark中如何排序呢?我们来看一些很有代表性的例子。
1.最简单的排序假设有个RDD[Int]类型的数据,需要按数据大小进行排序,那这个排序算最简单的:
sc.parallelize(Array(1,3,2,4,6,5)).sortBy(x => x).collect()代码运行的结果:
Array[Int] = Array(1, 2, 3, 4, 5, 6) 2.kv结构的排序在kv结构的数据中,按value排序是常见的需求:
sc.parallelize(Array(("a", 1), ("c", 3), ("b", 2), ("d", 4))).sortBy(_._2)代码运行的结果:
Array[(String, Int)] = Array((a,1), (b,2), (c,3), (d,4)) 3.定制排序规则有如下结构的数据:
<10 6094308<100 234975<20 2286079<200 1336431希望按照<后面的数字大小排序,得到如下结果:
<10 6094308<20 2286079<100 234975<200 1336431代码如下:
val array = Array(("<10",6094308), ("<100",234975), ("<20",2286079),("<200",1336431));sc.parallelize(array).sortBy({item => item._1.substring(1, item._1.length).toInt}).collect()要理解上述代码的原理,我们需要分析一下sortBy的源码。
/** * Return this RDD sorted by the given key function. */ def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope { this.keyBy[K](f) .sortByKey(ascending, numPartitions) .values }sortBy必需传入的一个参数为f: (T) => K,T为array中的元素类型。
/** * Creates tuples of the elements in this RDD by applying `f`. */ def keyBy[K](f: T => K): RDD[(K, T)] = withScope { val cleanedF = sc.clean(f) map(x => (cleanedF(x), x)) }传入的f: (T) => K作用在keyBy方法上,生成了一个RDD[(K, T)]的数据。
然后调用sortByKey,最后取出里面的T,得到的就是原始array中的类型!
我们再来看看sortByKey的源码
/** * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling * `collect` or `save` on the resulting RDD will return or output an ordered list of records * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in * order of the keys). */ // TODO: this currently doesn't work on P other than Tuple2! def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }大家看到sortByKey的源码可能会有疑惑,难道sortByKey不能指定排序方式么?不能像sortBy那样传入一个函数么?
其实是可以的。sortByKey位于OrderedRDDFunctions类中,OrderedRDDFunctions中有一个隐藏变量:
我们重写这个变量以后,就可以改变排序规则。
以第三部分的需求为例,我们用sortByKey可以这么做:
最后的输出结果为:
Array[(String, Int)] = Array((<10,6094308), (<20,2286079), (<100,234975), (<200,1336431))同样达到了我们的目的!