exchange2.1.0 导入kafka数据遇到reseting offset问题

exchange2.1.0 使用导入命令后一直在重复这个步骤 请问有什么解决办法吗?

{
  # Spark相关配置
  spark: {
    app: {
      name: Nebula Exchange 2.1.0
    }
    driver: {
      cores: 1
      maxResultSize: 1G
    }
    cores {
      max: 16
    }
  }


  # Nebula Graph相关配置
  nebula: {
    address:{
      # 以下为Nebula Graph的Graph服务和Meta服务所在机器的IP地址及端口。
      # 如果有多个地址,格式为 "ip1:port","ip2:port","ip3:port"。
      # 不同地址之间以英文逗号 (,) 隔开。
      graph:["121.37.153.8:9669"]
      meta:["121.37.153.8:9559"]
    }
    # 填写的账号必须拥有Nebula Graph相应图空间的写数据权限。
    user: root
    pswd: nebula
    # 填写Nebula Graph中需要写入数据的图空间名称。
    space: BCC
    connection {
      timeout: 3000
      retry: 3
    }
    execution {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors
    }
    rate: {
      limit: 1024
      timeout: 1000
    }
  }
  # 处理点
  tags: [
    # 设置Tag player相关信息。
    {
      # Nebula Graph中对应的Tag名称。
      name: board
      type: {
        # 指定数据源文件格式,设置为Kafka。
        source: kafka
        # 指定如何将点数据导入Nebula Graph:Client或SST。
        sink: client
      }
      # Kafka服务器地址。
      service: "121.37.153.8:9092"
      # 消息类别。
      topic: "test"

      # 在fields里指定player表中的列名称,其对应的value会作为Nebula Graph中指定属性。
      # fields和nebula.fields里的配置必须一一对应。
      # 如果需要指定多个列名称,用英文逗号(,)隔开。
      fields: [board_name]
      nebula.fields: [board_name]

      # 指定表中某一列数据为Nebula Graph中点VID的来源。
      vertex:{
          field:board_id
      }


      # 单次写入 Nebula Graph 的最大数据条数。
      batch: 10

      # Spark 分区数量
      partition: 10
      # 读取消息的间隔。单位:秒。
      interval.seconds: 10
    }
    # 设置Tag team相关信息。
  ]

}

{“board_id”: “board-1”, “board_name”: “主板”}
{“board_id”: “board-2”, “board_name”: “电源单板”}
{“board_id”: “board-3”, “board_name”: “主板2”}
{“board_id”: “board-4”, “board_name”: “电源单板2”}
这是kafka的数据格式

有人帮忙解答一下吗

当前exchange的kafka数据源只支持配置内置的几个key,未对value做json解析,不能配置board_name。
可配选项:key, value, topic, partition, offset, timestamp, timestampType 字段

resetting offset 的问题,你看下当前配置的topic中是否有数据流

那用flink可以配置自定义的key吗

目前不支持自定义key,你可以自己解析一下value中的json

浙ICP备20010487号