应用exchange导入kafka的数据,kafka消息的无论如何输入都exchange无法消费,报 Field “xxx” does not exist.
- nebula 版本:
- 部署方式(分布式 / 单机 / Docker / DBaaS):docker
- 硬件信息
- 磁盘( 推荐使用 SSD) hdd
- CPU、内存信息:
- 出问题的 Space 的创建方式:还没有到graphd
- 问题的具体描述
application.conf配置:
# Spark relation config
spark: {
app: {
name: Nebula Exchange 2.0
}
driver: {
cores: 1
maxResultSize: 1G
}
executor: {
memory:1G
}
cores:{
max: 16
}
}
# Nebula Graph relation config
nebula: {
address:{
graph:["10.40.60.225:9669"]
meta:["10.40.60.225:32801"]
}
user: 1
pswd: 1
space: hias
# parameters for SST import, not required
path:{
local:"/tmp"
remote:"/sst"
hdfs.namenode: "hdfs://name_node:9000"
}
connection {
timeout: 3000
retry: 3
}
execution {
retry: 3
}
error: {
max: 32
# failed import job will be recorded in output path
output: /tmp/errors
}
rate: {
limit: 1024
timeout: 1000
}
}
# Processing tags
# There are tag config examples for different dataSources.
tags: [
# KAFKA
{
name: company
type: {
source: kafka
sink: client
}
service: "10.40.60.106:9092"
topic: "topic-company"
fields: [name, credit]
nebula.fields: [name, credit]
vertex: {
field: id
}
partition: 5
batch: 5
interval.seconds: 10
}
]
# Processing edges
# There are edge config examples for different dataSources.
edges: [
# KAFKA
{
name: invest
type: {
source: kafka
sink: client
}
service: "10.40.60.106:9092"
topic: "topic-invest"
fields: [rate]
nebula.fields: [rate]
source: source
target: target
partition: 5
batch: 1000
interval.seconds: 10
}
]
}
报错信息如下:
Caused by: java.lang.IllegalArgumentException: Field "id" does not exist.
Available fields: key, value, topic, partition, offset, timestamp, timestampType
at org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:303)
at org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:303)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:59)
at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:302)
at com.vesoft.nebula.exchange.processor.VerticesProcessor$$anonfun$3.apply(VerticesProcessor.scala:203)
at com.vesoft.nebula.exchange.processor.VerticesProcessor$$anonfun$3.apply(VerticesProcessor.scala:201)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.mapelements_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.deserializetoobject_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at com.vesoft.nebula.exchange.processor.VerticesProcessor.com$vesoft$nebula$exchange$processor$VerticesProcessor$$processEachPartition(VerticesProcessor.scala:80)
at com.vesoft.nebula.exchange.processor.VerticesProcessor$$anonfun$process$3$$anonfun$apply$4.apply(VerticesProcessor.scala:235)
at com.vesoft.nebula.exchange.processor.VerticesProcessor$$anonfun$process$3$$anonfun$apply$4.apply(VerticesProcessor.scala:235)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2113)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2113)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
kafka消息格式如下:
{“id”:“123”,“name”:"测试数据"123,“credit”:“123”}