exchange导入kafka数据只消费了单个topic的问题【已解决】

  • nebula 版本:2.6.1

  • 部署方式:单机

  • 安装方式:Docker

  • 是否为线上版本:Y

  • 硬件信息

    • 磁盘( 推荐使用 SSD)
    • CPU、内存信息
  • 问题的具体描述

按照官网示例在使用exchange导入kafka数据时,配置文件中本来配置了多个点、多个边,但是spark消费的时候只消费了第一个tags的数据,其他点边数据均未被消费。

  • 相关的 meta / storage / graph info 日志信息(尽量使用文本形式方便检索)

spark-submit打印日志如下:

22/01/13 18:47:40 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "74093b86-7570-4662-a5a4-965240e70b43",
  "runId" : "44b10b31-e2f8-40c1-9825-a22088f16dc4",
  "name" : null,
  "timestamp" : "2022-01-13T10:47:40.003Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getEndOffset" : 1,
    "setOffsetRange" : 6,
    "triggerExecution" : 7
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic_player]]",
    "startOffset" : {
      "topic_player" : {
        "0" : 16
      }
    },
    "endOffset" : {
      "topic_player" : {
        "0" : 16
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "ForeachBatchSink"
  }
}

运行脚本如下:

spark-submit\
  --master "local"\
  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3\
  --class com.vesoft.nebula.exchange.Exchange\
  /Users/xxx/code/nebula_test/jars/nebula-exchange-2.6.1.jar\
  -c /Users/xxx/code/nebula_test/nebula_exchange_kafka.config

nebula_exchange_kafka.config文件内容:

{
  # Spark 相关配置
  spark: {
    app: {
      name: Nebula Exchange 2.6.1
    }
    driver: {
      cores: 1
      maxResultSize: 1G
    }
    cores {
      max: 8
    }
  }

  # Nebula Graph 相关配置
  nebula: {
    address:{
      # 以下为 Nebula Graph 的 Graph 服务和 Meta 服务所在机器的 IP 地址及端口。
      # 如果有多个地址,格式为 "ip1:port","ip2:port","ip3:port"。
      # 不同地址之间以英文逗号 (,) 隔开。
      graph:["127.0.0.1:9669"]
      meta:["127.0.0.1:9559"]
    }
    # 填写的账号必须拥有 Nebula Graph 相应图空间的写数据权限。
    user: root
    pswd: nebula
    # 填写 Nebula Graph 中需要写入数据的图空间名称。
    space: kafka_basketballplayer
    connection {
      timeout: 30000
      retry: 3
    }
    execution {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors
    }
    rate: {
      limit: 1024
      timeout: 1000
    }
  }
  # 处理点
  tags: [
    # 设置 Tag player 相关信息。
    {
      # Nebula Graph 中对应的 Tag 名称。
      name: player
      type: {
        # 指定数据源文件格式,设置为 Kafka。
        source: kafka
        # 指定如何将点数据导入 Nebula Graph:Client 或 SST。
        sink: client
      }
      # Kafka 服务器地址。
      service: "192.168.170.46:9092"
      # 消息类别。
      topic: "topic_player"

      # Kafka 数据有固定的域名称:key、value、topic、partition、offset、timestamp、timestampType。
      # Spark 读取为 DataFrame 后,如果需要指定多个字段,用英文逗号(,)隔开。
      # 在 fields 里指定字段名称,例如用 key 对应 Nebula 中的 name, value 对应 Nebula 中的 age,示例如下:
      fields: [key,value]
      nebula.fields: [name,age]

      # 指定表中某一列数据为 Nebula Graph 中点 VID 的来源。
      # 这里的值 key 和上面的 key 重复,表示 key 既作为 VID,也作为属性 name。
      vertex:{
          field:key
      }

      # 单批次写入 Nebula Graph 的数据条数。
      batch: 1

      # Spark 分区数量
      partition: 1
      # 读取消息的间隔。单位:秒。
      interval.seconds: 10
    }
    # 设置 Tag team 相关信息。
    {
      name: team
      type: {
        source: kafka
        sink: client
      }
      service: "192.168.170.46:9092"
      topic: "topic_team"
      fields: [key]
      nebula.fields: [name]
      vertex:{
          field:key
      }
      batch: 1
      partition: 1
      interval.seconds: 10
    }

  ]

  # 处理边数据
  edges: [
    # 设置 Edge type follow 相关信息
    {
      # Nebula Graph 中对应的 Edge type 名称。
      name: follow

      type: {
        # 指定数据源文件格式,设置为 Kafka。
        source: kafka

        # 指定边数据导入 Nebula Graph 的方式,
        # 指定如何将点数据导入 Nebula Graph:Client 或 SST。
        sink: client
      }

      # Kafka 服务器地址。
      service: "192.168.170.46:9092"
      # 消息类别。
      topic: "topic_follow"

      # Kafka 数据有固定的域名称:key、value、topic、partition、offset、timestamp、timestampType。
      # Spark 读取为 DataFrame 后,如果需要指定多个字段,用英文逗号(,)隔开。
      # 在 fields 里指定字段名称,例如用 key 对应 Nebula 中的 degree,示例如下:
      fields: [key]
      nebula.fields: [degree]

      # 在 source 里,将 topic 中某一列作为边的起始点数据源。
      # 在 target 里,将 topic 中某一列作为边的目的点数据源。
      source:{
          field:timestamp
      }

      target:{
          field:offset
      }

      # 单批次写入 Nebula Graph 的数据条数。
      batch: 1

      # Spark 分区数量
      partition: 1

      # 读取消息的间隔。单位:秒。
      interval.seconds: 10
    }

    # 设置 Edge type serve 相关信息
    {
      name: serve
      type: {
        source: kafka
        sink: client
      }
      service: "192.168.170.46:9092"
      topic: "topic_serve"

      fields: [timestamp,offset]
      nebula.fields: [start_year,end_year]
      source:{
          field:key
      }

      target:{
          field:value
      }

      batch: 1
      partition: 1
      interval.seconds: 10
    }
  ]
}

我理解不同的顶点和边数据应该在不同topic中吧

对,目前是在不同的topic中,我copy官网的示例,2个点和2个边分别对应不同的kafka topic,但是运行起来后发现除了第一个点对应的topic有被消费外,其他数据均没有被消费,很奇怪。

这个没有报错吗 :rofl: 好像是会对配置进行检查,如果配置里超过一个source是kafk,应该会报错才对。

我之前提交过一个版本,现在不知道有没有改。
exchange消费kafka的时候一个程序只能消费一个tag/edge,你这里有多个tag和edge,得开多个不同的程序消费才行。

是的,@sworduo 的pr中有加入对kafka的check,这个是在2.6.1版本之后合入的,所以用户用的这个版本没有这个校验。

你有多个topic的话,可以分到不同的配置文件中,起不同的exchange任务。 因为流式数据源会一直去读取你的kafka数据,不会中断的, 后面的配置是走不过去的。

1 个赞

目前是没有报错的,不过加上了-D参数进行配置文件校验后,有提示Processing多个TAG和EDGE,如图所示。

OK,多谢~

此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。

浙ICP备20010487号