通过Nebula-Spark-Connector更新节点属性无效

  • nebula版本:2.5.0

  • spark版本:2.4.5

  • 部署方式:分布式

  • 安装方式:源码编译

  • 是否为线上版本:N

  • 问题的具体描述:

user类TAG创建时,属性cc为空。之后用Nebula-Spark-Connector读取某个csv数据,其中包含cc列,需要把Nebula中cc属性值UPDATE。代码运行没有报错,但是打开Nebula发现cc属性值并没有更新。

  • scala核心代码
    import org.apache.spark.sql.SparkSession
    import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
    import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
    import com.vesoft.nebula.connector.{
      NebulaConnectionConfig,
      WriteMode,
      WriteNebulaVertexConfig
    }
    import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter


    //    创建sparkSession
    val spark = SparkSession.builder()
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()


//PART1.写入数据

    //   读取数据
    val df = spark.read.csv("tmp/errors/cc")
      .select("_c0","_c6")
      .withColumnRenamed("_c0","id")
      .withColumnRenamed("_c6","cc")

    //  写入数据配置

    //连接nebula配置
    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress("10.18.200.30:9559")
        .withGraphAddress("10.18.200.30:9669")
        .withConenctionRetry(2)
        .build()

    val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
      .builder()
      .withSpace("test_space")
      .withTag("user")
      .withVidField("id")
      .withVidAsProp(false)
      .withUser("root")
      .withPasswd("nebula")
      .withBatch(1000)
      .withWriteMode(WriteMode.UPDATE)
      .build()

    //  写入数据
    df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
    //********稍后查看df********


//PART2.查看update后的数据

    //读点
    val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("test_space")
      .withLabel("user")
      .withReturnCols(List("cc"))
      .withNoColumn(false)
      .withLimit(100)
      .withPartitionNum(15)
      .build()

    
   val df_new = spark.read.nebula(config, nebulaReadVertexConfig)
      .loadVerticesToDF().withColumnRenamed("_vertexId","id")
    //********稍后查看df_new,与df对比********

运行完以上代码后:
查看df,cc列都是有值的,但是df_new,cc列依然都是空的


查看某个特定id:

过一段时间查询下呢?cc 属性更新了吗?

昨晚上投的任务,跑完没报错,cc属性没更新,今天早上来看cc属性还是没更新

你的tmp/errors/cc数据中的id 在nebula中已经存在了么,若id不存在update是不会成功的。
你要看下你的程序日志,更新失败的数据在日志中会有显示的。

1 个赞

tmp/errors/cc数据中的id 在nebula中全都存在。

事实上,这部分数据,本身就是从nebula中读取后,做了一些计算,得到cc列,先保存到tmp/errors/cc中了。

贴一下你的Spark执行日志吧,看一下批量update操作是否操作成功了

此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。