应用exchange导入kafka数据报错

应用exchange导入kafka的数据,kafka消息的无论如何输入都exchange无法消费,报 Field “xxx” does not exist.

  • nebula 版本:
  • 部署方式(分布式 / 单机 / Docker / DBaaS):docker
  • 硬件信息
    • 磁盘( 推荐使用 SSD) hdd
    • CPU、内存信息:
  • 出问题的 Space 的创建方式:还没有到graphd
  • 问题的具体描述
    application.conf配置:

  # Spark relation config
  spark: {
    app: {
      name: Nebula Exchange 2.0
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    executor: {
        memory:1G
    }

    cores:{
      max: 16
    }
  }

  # Nebula Graph relation config
  nebula: {
    address:{
      graph:["10.40.60.225:9669"]
      meta:["10.40.60.225:32801"]
    }
    user: 1
    pswd: 1
    space: hias

    # parameters for SST import, not required
    path:{
        local:"/tmp"
        remote:"/sst"
        hdfs.namenode: "hdfs://name_node:9000"
    }

    connection {
      timeout: 3000
      retry: 3
    }

    execution {
      retry: 3
    }

    error: {
      max: 32
      # failed import job will be recorded in output path
      output: /tmp/errors
    }

    rate: {
      limit: 1024
      timeout: 1000
    }
  }

  # Processing tags
  # There are tag config examples for different dataSources.
  tags: [
    # KAFKA
    {
      name: company
      type: {
        source: kafka
        sink: client
      }
      service: "10.40.60.106:9092"
      topic: "topic-company"
      fields: [name, credit]
      nebula.fields: [name, credit]
      vertex: {
        field: id
      }
      partition: 5
      batch: 5
      interval.seconds: 10
    }
  ]

  # Processing edges
  # There are edge config examples for different dataSources.
  edges: [
    # KAFKA
    {
      name: invest
      type: {
        source: kafka
        sink: client
      }
      service: "10.40.60.106:9092"
      topic: "topic-invest"
      fields: [rate]
      nebula.fields: [rate]
      source: source
      target: target
      partition: 5
      batch: 1000
      interval.seconds: 10
    }
  ]
}

报错信息如下:

Caused by: java.lang.IllegalArgumentException: Field "id" does not exist.
Available fields: key, value, topic, partition, offset, timestamp, timestampType
        at org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:303)
        at org.apache.spark.sql.types.StructType$$anonfun$fieldIndex$1.apply(StructType.scala:303)
        at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
        at scala.collection.AbstractMap.getOrElse(Map.scala:59)
        at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:302)
        at com.vesoft.nebula.exchange.processor.VerticesProcessor$$anonfun$3.apply(VerticesProcessor.scala:203)
        at com.vesoft.nebula.exchange.processor.VerticesProcessor$$anonfun$3.apply(VerticesProcessor.scala:201)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.mapelements_doConsume_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.deserializetoobject_doConsume_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at com.vesoft.nebula.exchange.processor.VerticesProcessor.com$vesoft$nebula$exchange$processor$VerticesProcessor$$processEachPartition(VerticesProcessor.scala:80)
        at com.vesoft.nebula.exchange.processor.VerticesProcessor$$anonfun$process$3$$anonfun$apply$4.apply(VerticesProcessor.scala:235)
        at com.vesoft.nebula.exchange.processor.VerticesProcessor$$anonfun$process$3$$anonfun$apply$4.apply(VerticesProcessor.scala:235)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2113)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2113)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

kafka消息格式如下:
{“id”:“123”,“name”:"测试数据"123,“credit”:“123”}

1 个赞

我编辑了下帖子,代码段包裹信息的 markdown 语法是

```
log
```

反斜杠是为了防止被解析我加上的,如果要使用的话是需要删掉反斜杠的哈(我删掉了你的反斜杠)

好的

顺便问下,我看你的配置信息里的 Exchange 是 2.0 的,你的 Nebula 版本信息是多少呢?因为 Exchange 2.0 对应的是 Nebula 2.0,Exchange 1.0 对应的是 Nebula 1.x 版本,1.x 和 2.0 还是有些区别的。所以最好下次补充下工具和 Nebula 的版本信息哈

1 个赞

你在spark-shell中执行下如下命令,看下读出来的kafka数据样式:

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "10.40.60.106:9092")
      .option("subscribe", "topic-company")
      .load()
      .show()

你配置的 “id”,“name”, “credit”应该和读出来的dataframe的schema不一致。

嗯 nebula版本也是2.0的。

1 个赞

我这边spark 版本没有show() Schema是这样的

我再补充下环境信息吧,
spark kafka 都是cdh集成的,
spark 版本是:2.4.0-cdh6.1.1
kafka 版本是:2.11-2.0.0-cdh6.1.1
nebula studio 和graphd 都已经用2.0版本了(这里当前阶段一切正常)。

1 个赞

看schema就可以。
spark读取你的kafka后得到的DataFrame的列分别是 “key”,“value”,“topic”,“partition”…
所以你在配置文件中配置的fields 和vertex.field应该是上面中的列名

fields: [name, credit]
      nebula.fields: [name, credit]
      vertex: {
        field: id
      }

“key”,“value”,“topic”,“partition”…这些域都是kafka中topic默认的域,
一般关键信息都是在value中,如果要使用value中的字段,除了修改exchange,还有没有其他方式?
这块不是很熟悉,麻烦了

上面的那个问题暂时先用key和value来处理,不过又遇到了新问题,
spark-submit之后,运行的很好,但只要一生产消息,就报错,错误如图:

该问题参考下面帖子的解决方案
使用exchange并发 spark-submit --master "local[16]" 报错 - #20 由 fengsen-neu

先说filesystem closed的问题吧。
我修改了spark-site.xml的配置,增加了

 <property>
    <name>fs.hdfs.impl.disable.cache</name>
    <value>true</value>
  </property>

也重启了,但是没有用。
最后找到两种解决方案:
方案1:

将ErrorHandler类中的fileSystem.close()注释掉之后可以了。

方案2:

application.conf 中这里

    # parameters for SST import, not required
    path:{
        local:"/tmp"
        remote:"/sst"
        hdfs.namenode: "hdfs://${ip}:9000"
    }

hdfs.namenode 填写真实的hdfs,貌似也不报错了。

schema只支持kafka自有域变量的问题,这个我修改的源码,思路是这样的:

从kafka中获取value域,将value通过from_json解析,然后将这个json作为Dataframe返回,不过,转为自定义的Dataframe时需要申明StructType,所以我又在配置文件中kafka部分增加了schema配置,用于记录StructType 的ddl。类似这样的格式:“source STRING, target STRING , rate DOUBLE”

目前这种方式跑通了,不过,我不知道这种方式是否合理。

2 个赞

这个配置不是加在spark的配置中,这是hadoop的配置,要加在hdfs-site.xml中。

请问这个有通用的解决办法吗?或者这个解决放大的代码有check in到社区分支吗?

请问这种方式是需要修改源码吗,具体怎么修改的呢? git上还没修复吗?

@jens 不好意思没有及时回复。目前 @bupt_guojun 同学正在修补这个问题,他搞定之后会贡献给社区,请通过 https://github.com/vesoft-inc/nebula-spark-utils/issues/130 关注进展。