我使用了nebula-algorithm算法包进行二次开发,用到了连通分量算法connectedcomponent,包版本号:3.0.0
由于业务上的点ID为string类型,参考了官方提供的example:即使用了dense_rank函数全局排序后转码:
idDF.withColumn(“encodedId”, dense_rank().over(Window.orderBy(“id”)))
近期由于数据量逐渐变大,全局排序这块变慢,且逐渐成为性能瓶颈,调研后做了一个版本的优化:
val partition_max_size: Long = 1 * 10000 * 10000;
val encodeIdDF = idDF.withColumn(“partition_index”, spark_partition_id()) // 分区编号
.withColumn(“row_number”, dense_rank().over(Window.partitionBy(“partition_index”).orderBy(“id”))) // 分区内排序号
.withColumn(“encodedId”, col(“partition_index”) * partition_max_size + col(“row_number”)) // 根据partition_index、row_number,计算出全局id
即在分区内排序:encodedId = partition_index * partition_max_size + row_number;
验证过每个分区数据量均少于partition_max_size,且生成的encodeIdDF数据集中encodedId字段是满足全局唯一的,整个算法也正常跑通没有出现异常,耗时极大缩短了;
但是算法输出结果集上发现了异常:在全局排序版本的结果中,原来被划分为同一个连通分量值的点ID集合,在优化后的版本中被划分为多个连通分量值,且这个现象并非个例,结果数据几乎整体都变了;
请教这个问题是如何导致的呢?