pyspark + nebula-spark-connector 读取数据时参数.option("returnCols")可以返回点的tag吗

nebula-spark-connector 2.6.1

如下代码,returnCols只能返回指定点或边的属性吗,能否返回类似tage(vertex)作为column呢?

self.dfc = self.spark.read.format(
    "com.vesoft.nebula.connector.NebulaDataSource").option(
    "type", "edge").option(
    "spaceName", nebula_config.get("space")).option(
    "label", f"{self.tag[i]}").option(
    "returnCols", "creation_time").option(
    "metaAddress", nebula_config.get("metaAddress")).option(
    "partitionNumber", nebula_config.get("partitionNumber")).load()

type edge 的话,底层只会用 scanedges 哈,spark 的读(非 query)其实是跳过了 graphd,直接扫 storaged,所以确实没法取回点上信息哈

你是想在返回结果中增加一列用来标识每一行数据是属于哪个tag吗?

ps: spark connector读到的数据不止有属性,还有点的id ,边的src id,dst id和rank值。

1 个赞

好滴,我是想读出数据做图算法后再将结果写回到nebula,但是似乎pyspark并不能像scala一样通过配置自动写回?

由于我的_srcId和_dstId类型较多,尝试用spark-connector来反写,但是一次也只能够写一种vertex tag类型

现在的思路改为用spark-connector读出不同的tag数据再联表,尝试了一下可以解决,不知道后期效率如何~

对的,想读出具体的tag来做筛选,但是不符合底层逻辑了,在改变思路 :joy:

@nicole 无论 scala, python 我猜测从程序也有调用 main 的方式吧?

不过如果不调用 main,可以参考我的代码,这里有写回的实现

https://github.com/wey-gu/nebulagraph-ai/blob/main/ng_ai/nebula_writer.py

其实可以实现的,每次从nebula读出来的数据是一个dataframe,你可以采用df.withColumn的方法把tag名加到dataframe的每一行中。
但我没明白在原数据中增加tag名是想在写入的时候做什么操作?

不用调main,直接用writer写就好

1 个赞

嗯嗯,现在也是这样做的

是因为我在进行图计算的过程中边类型比较多,导致会有大量不同tag下的_srcid或_dstid,最后写回图谱时需要有一个column把不同的tag筛出来再写入,大体这样的一个思路

感谢,我研究一下~

此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。