- nebula 版本:2.0.1
- 用的nebula-spark-connector 2.0的版本:nebula-spark-utils/nebula-spark-connector/README_CN.md at v2.0.0 · vesoft-inc/nebula-spark-utils · GitHub
- 部署方式(分布式 / 单机 / Docker / DBaaS):单机
- 是否为线上版本:N
- 问题的具体描述:使用spark-connector将hive数据导入到nebula2.0.1中,报错。
建立点的语句:
CREATE TAG APPLYINFO(APP_NO string,PRODUCT_CD string,ECIF_NO string,GPS_L string,GPS_B string,NETWORK_TYPE string,CREATED_DATETIME timestamp,LAST_MODIFIED_DATETIME timestamp);
建立边的语句:
CREATE EDGE PERSON2CARD(ID_NO string,CARD_NO string,CREATED_DATETIME timestamp,LAST_MODIFIED_DATETIME timestamp);
使用spark-connector将hive数据写到nebula2.0.1,代码如下
使用spark-connector,将hive相关数据写入nebula2.0.1脚本,报错,spark-connector错误日志如下:
- 相关的 meta / storage / graph info 日志信息
graph错误信息:E0602 15:30:18.195520 84304 HeaderServerChannel.cpp:114] Received invalid request from client: N6apache6thrift9transport19TTransportExceptionE: Header transport frame is too large: 1583156490 (hex 0x5e5d0d0a) (transport N6apache6thrift5async12TAsyncSocketE, address *******, port 24569)
//创建dataframe的程序
val spark = SparkSession.builder().master("yarn").appName(appName).enableHiveSupport().getOrCreate()
spark.sql("use " +dataBase)
val dataSet1: Dataset[Row] = spark.sql("select app_no_tag_id,APP_NO,PRODUCT_CD,ECIF_NO,GPS_L,GPS_B,NETWORK_TYPE,CREATED_DATETIME,LAST_MODIFIED_DATETIME from tmp.table1")
val dataFrame: DataFrame = dataSet1.toDF()
//写入点的代码
def writeVertex(dataFrame: DataFrame,metaAddress: String,graphAddress: String,userName:String,passwd:String,space: String,tagName: String,vidField: String,batch: Int): Unit = {
dataFrame.show()
dataFrame.persist(StorageLevel.MEMORY_AND_DISK_SER)
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress(metaAddress)
.withGraphAddress(graphAddress)
.withConenctionRetry(3)
.build()
val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
.builder()
.withUser(userName)
.withPasswd(passwd)
.withSpace(space)
.withTag(tagName)
.withVidField(vidField)
.withVidAsProp(false)
.withBatch(batch)
.withVidPolicy("hash")
.build()
dataFrame.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
}