使用PartitionBy按键分割和有效计算RDD组

我已经实现了一个解决方案RDD[K, V]通过密钥对RDD[K, V]进行分组,并使用partitionByPartitioner根据每个组(K, RDD[V])计算数据。 尽管如此,我不确定它是否真的有效率,我想要有你的观点。

下面是一个示例:根据[K: Int, V: Int] ,计算每个K组的V s均值,知道它应该是分布的,并且V值可能非常大。 这应该给:

List[K, V] => (K, mean(V))

简单的Partitioner类:

class MyPartitioner(maxKey: Int) extends Partitioner {

    def numPartitions = maxKey

    def getPartition(key: Any): Int = key match {
      case i: Int if i < maxKey => i
    }
  }

分区代码:

val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7))

      val rdd = sc.parallelize(l)
      val p =  rdd.partitionBy(new MyPartitioner(4)).cache()

      p.foreachPartition(x => {
        try {
          val r = sc.parallelize(x.toList)
          val id = r.first() //get the K partition id
          val v = r.map(x => x._2)
          println(id._1 + "->" + mean(v))
        } catch {
          case e: UnsupportedOperationException => 0
        }
      })

输出是:

1->13, 2->4, 3->7

我的问题是:

  • 调用partitionBy时真的发生了什么? (对不起,我没有找到足够的规格)
  • 根据分区映射真的很高效,因为知道在我的生产环境中,对于非常多的值(如样本为100万),它不会有太多密钥(对于样本为50)
  • paralellize(x.toList)的成本是多少? 这是否一致? (我需要RDD输入mean()
  • 你会怎么做呢?
  • 问候


    你的代码不应该工作。 您无法将SparkContext对象传递给执行者。 (它不是可Serializable 。)我也不明白为什么你需要。

    要计算平均值,您需要计算总和和计数并计算它们的比率。 默认的分区将会很好。

    def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = {
      case class SumCount(sum: Double, count: Double)
      val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
        (sc, v) => SumCount(sc.sum + v, sc.count + 1.0),
        (sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count))
      sumCounts.map(sc => sc.sum / sc.count)
    }
    

    这是一种高效的单程计算,可以很好地推广。

    链接地址: http://www.djcxy.com/p/83629.html

    上一篇: Using PartitionBy to split and efficiently compute RDD groups by Key

    下一篇: Why is assignment of Double to Single allowed with Option Strict On