-
nebula版本:2.5.0
-
spark版本:2.4.5
-
部署方式:分布式
-
安装方式:源码编译
-
是否为线上版本:N
-
问题的具体描述:
user类TAG创建时,属性cc为空。之后用Nebula-Spark-Connector读取某个csv数据,其中包含cc列,需要把Nebula中cc属性值UPDATE。代码运行没有报错,但是打开Nebula发现cc属性值并没有更新。
- scala核心代码
import org.apache.spark.sql.SparkSession
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import com.vesoft.nebula.connector.{
NebulaConnectionConfig,
WriteMode,
WriteNebulaVertexConfig
}
import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter
// 创建sparkSession
val spark = SparkSession.builder()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
//PART1.写入数据
// 读取数据
val df = spark.read.csv("tmp/errors/cc")
.select("_c0","_c6")
.withColumnRenamed("_c0","id")
.withColumnRenamed("_c6","cc")
// 写入数据配置
//连接nebula配置
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("10.18.200.30:9559")
.withGraphAddress("10.18.200.30:9669")
.withConenctionRetry(2)
.build()
val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace("test_space")
.withTag("user")
.withVidField("id")
.withVidAsProp(false)
.withUser("root")
.withPasswd("nebula")
.withBatch(1000)
.withWriteMode(WriteMode.UPDATE)
.build()
// 写入数据
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
//********稍后查看df********
//PART2.查看update后的数据
//读点
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test_space")
.withLabel("user")
.withReturnCols(List("cc"))
.withNoColumn(false)
.withLimit(100)
.withPartitionNum(15)
.build()
val df_new = spark.read.nebula(config, nebulaReadVertexConfig)
.loadVerticesToDF().withColumnRenamed("_vertexId","id")
//********稍后查看df_new,与df对比********
运行完以上代码后:
查看df,cc列都是有值的,但是df_new,cc列依然都是空的
查看某个特定id: