exchange读取kafka写入nebula报错

  • nebula 版本:3.2.0
  • 安装方式:Docker
  • spark版本:cdh6.3.2自带的spark2.4.0
  • exchange版本:3.0.0
  • 是否为线上版本:N
  • 硬件信息
    • 磁盘( HDD)
    • CPU、内存信息 (20C128G)
  • 问题的具体描述
  • Can not define any other configs when kafka exists
[root@hadoop nebula-exchange]# spark-submit \
> --name "kafkaToNebula" \
> --master "local" \
> --class com.vesoft.nebula.exchange.Exchange \
> /opt/nebula-exchange/nebula-exchange_spark_2.4/target/nebula-exchange_spark_2.4-3.0.0.jar \
> -c /opt/nebula-exchange/kafka_application.conf
22/09/14 11:02:10 INFO config.Configs$: DataBase Config com.vesoft.exchange.common.config.DataBaseConfigEntry@2833c390
22/09/14 11:02:10 INFO config.Configs$: User Config com.vesoft.exchange.common.config.UserConfigEntry@7f08e74
22/09/14 11:02:10 INFO config.Configs$: Connection Config Some(Config(SimpleConfigObject({"retry":3,"timeout":3000})))
22/09/14 11:02:10 INFO config.Configs$: Execution Config com.vesoft.exchange.common.config.ExecutionConfigEntry@c757796a
22/09/14 11:02:10 INFO config.Configs$: Source Config Kafka source server: 10.30.6.71:9092 topic:topic_name1 startingOffsets:latest maxOffsetsPerTrigger:None
22/09/14 11:02:10 INFO config.Configs$: Sink Config Kafka source server: 10.30.6.71:9092 topic:topic_name1 startingOffsets:latest maxOffsetsPerTrigger:None
22/09/14 11:02:10 INFO config.Configs$: name player  batch 10
22/09/14 11:02:10 INFO config.Configs$: Tag Config: Tag name: player, source: Kafka source server: 10.30.6.71:9092 topic:topic_name1 startingOffsets:latest maxOffsetsPerTrigger:None, sink: Nebula sink addresses: [10.30.6.71:32788], vertex field: personId, vertex policy: None, batch: 10, partition: 10.
Exception in thread "main" java.lang.IllegalArgumentException: Can not define any other configs when kafka exists
        at com.vesoft.exchange.common.config.Configs$$anonfun$parse$1.apply(Configs.scala:349)
        at com.vesoft.exchange.common.config.Configs$$anonfun$parse$1.apply(Configs.scala:347)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at com.vesoft.exchange.common.config.Configs$.parse(Configs.scala:347)
        at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:38)
        at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
22/09/14 11:02:11 INFO util.ShutdownHookManager: Shutdown hook called
22/09/14 11:02:11 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-3f711747-116f-448b-a5f8-6eb9da652a72
  • kafka_application.conf
{
  # Spark 相关配置
  spark: {
    app: {
      name: Nebula Exchange 3.0.0
    }
    driver: {
      cores: 1
      maxResultSize: 1G
    }
    cores: {
      max: 12
    }
  }

  # NebulaGraph 相关配置
  nebula: {
    address:{
      # 以下为 NebulaGraph 的 Graph 服务和 Meta 服务所在机器的 IP 地址及端口。
      # 如果有多个地址,格式为 "ip1:port","ip2:port","ip3:port"。
      # 不同地址之间以英文逗号 (,) 隔开。
      #graph:["10.30.6.71:9669"]
      #meta:["10.30.6.71:9559"]
      graph:["10.30.6.71:32788"]
      meta:["10.30.6.71:32772"]
    }
    # 填写的账号必须拥有 NebulaGraph 相应图空间的写数据权限。
    user: root
    pswd: yw12580..
    # 填写 NebulaGraph 中需要写入数据的图空间名称。
    space: basketballplayer
    connection: {
      timeout: 3000
      retry: 3
    }
    execution: {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors
    }
    rate: {
      limit: 1024
      timeout: 1000
    }
  }
  # 处理点
  tags: [
    # 设置 Tag player 相关信息。
    {
      # NebulaGraph 中对应的 Tag 名称。
      name: player
      type: {
        # 指定数据源文件格式,设置为 Kafka。
        source: kafka
        # 指定如何将点数据导入 NebulaGraph:Client 或 SST。
        sink: client
      }
      # Kafka 服务器地址。
      service: "10.30.6.71:9092"
      # 消息类别。
      topic: "topic_name1"

      # 在 fields 里指定 Kafka value 中的字段名称,多个字段用英文逗号(,)隔开。Spark Structured Streaming 读取 Kafka 数据后会将其以 JSON 格式存储于 value 字段中,而这里的 fields 要配置 JSON 的 key 名。示例如下:
      fields: [personName, personAge]
      # 设置与 fields 中的 key 对应的 NebulaGraph 属性名,key 的 value 将保存为相应的属性值。下方设置会将 personName 的 value 保存到 NebulaGraph 中的 name 属性,personAge 的 value 则保存到 age 属性。
      nebula.fields: [name, age]

      # 指定表中某一列数据为 NebulaGraph 中点 VID 的来源。
      # 这里的值 key 和上面的 key 重复,表示 key 既作为 VID,也作为属性 name。
      vertex:{
          field:personId
      }

      # 单批次写入 NebulaGraph 的数据条数。
      batch: 10

      # Spark 分区数量
      partition: 10
      # 读取消息的间隔。单位:秒。
      interval.seconds: 10
    }
    # 设置 Tag team 相关信息。
    {
      name: team
      type: {
        source: kafka
        sink: client
      }
      service: "10.30.6.71:9092"
      topic: "topic_name2"
      fields: [key]
      nebula.fields: [name]
      vertex:{
          field:teamId
      }
      batch: 10
      partition: 10
      interval.seconds: 10
    }

  ]

  # 处理边数据
  edges: [
    # 设置 Edge type follow 相关信息
    {
      # NebulaGraph 中对应的 Edge type 名称。
      name: follow

      type: {
        # 指定数据源文件格式,设置为 Kafka。
        source: kafka

        # 指定边数据导入 NebulaGraph 的方式,
        # 指定如何将点数据导入 NebulaGraph:Client 或 SST。
        sink: client
      }

      # Kafka 服务器地址。
      service: "10.30.6.71:9092"
      # 消息类别。
      topic: "topic_name3"

      # 在 fields 里指定 Kafka value 中的字段名称,多个字段用英文逗号(,)隔开。Spark Structured Streaming 读取 Kafka 数据后会将其以 JSON 格式存储于 value 字段中,而这里的 fields 要配置 JSON 的 key 名。示例如下:
      fields: [degree]
      # 设置与 fields 中的 key 对应的 NebulaGraph 属性名,key 的 value 将保存为相应的属性值。下方设置会将 degree 的 value 保存到 NebulaGraph 中的 degree 属性。
      nebula.fields: [degree]

      # 在 source 里,将 topic 中某一列作为边的起始点数据源。
      # 在 target 里,将 topic 中某一列作为边的目的点数据源。
      source:{
          field:srcPersonId
      }

      target:{
          field:dstPersonId
      }

      # 指定一个列作为 rank 的源(可选)。
      #ranking: rank

      # 单批次写入 NebulaGraph 的数据条数。
      batch: 10

      # Spark 分区数量
      partition: 10

      # 读取消息的间隔。单位:秒。
      interval.seconds: 10
    }

    # 设置 Edge type serve 相关信息
    {
      name: serve
      type: {
        source: kafka
        sink: client
      }
      service: "10.30.6.71:9092"
      topic: "topic_name4"

      fields: [startYear,endYear]
      nebula.fields: [start_year,end_year]
      source:{
          field:personId
      }

      target:{
          field:teamId
      }

      # 指定一个列作为 rank 的源(可选)。
      #ranking: rank

      batch: 10
      partition: 10
      interval.seconds: 10
    }
  ]
}

由于kafka是流式读取,所以当存在一个kafka配置时,不允许再有其他tag/edge的配置。
你如果是多个topic,请分别配置多个exchange 配置文件,然后提交多个任务。

当配置一个topic时,任务可以提交成功,但kafka数据无法写入,是我的数据格式或者配置文件有问题吗?exchange的streaming程序也没有每10s输出结果。

  • kafka player主题数据格式:
{"personId":"player101","personName":"JoJ0","personAge":34}
  • kafka_application.conf
{
  spark: {
    app: {
      name: Nebula Exchange 3.0.0
    }
    driver: {
      cores: 1
      maxResultSize: 1G
    }
    cores: {
      max: 8
    }
  }

  nebula: {
    address:{
      graph:["10.30.6.71:32788"]
      meta:["10.30.6.71:32772"]
    }
    user: root
    pswd: yw12580..
    space: basketballplayer
    connection: {
      timeout: 3000
      retry: 3
    }
    execution: {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors
    }
    rate: {
      limit: 1024
      timeout: 1000
    }
  }
  tags: [
    {
      name: player
      type: {
        source: kafka
        sink: client
      }
      service: "10.30.6.71:9092"
      topic: "player"

      fields: [personName, personAge]
      nebula.fields: [name, age]

      vertex:{
          field:personId
      }

      batch: 10
      partition: 10
      interval.seconds: 10
   }
  ]
}

commitId是null呢,你的startoffset没设置的话默认是lastest,配一下startOffset呢

1 个赞

刚测试了下,不设置startoffset默认是earliest,加上该参数仍然是earliest,可以提供一份可运行的afka_application.conf吗?我照着修改下。


浙ICP备20010487号