kafka-nebula导入数据出现数据丢失

通过kafka-nebula的过程出现边数据丢失,传130多万只有100多万左右边成功插入,exchange有读到130多万条数据并处理。但是通过maxcompute-nebula边能全部传入,无重复rank值,请问有没有什么办法解决,这是什么原因导致的问题呢?

kafka-nebula配置信息:

{
  # Spark 相关配置
  spark: {
    app: {
      name: Nebula Exchange 3.0.0
    }
    driver: {
      cores: 1
      maxResultSize: 0
    }
    cores: {
      max: 16
    }
  }

  # Nebula Graph 相关配置
  nebula: {
    address:{
      # 以下为 Nebula Graph 的 Graph 服务和 Meta 服务所在机器的 IP 地址及端口。
      # 如果有多个地址,格式为 "ip1:port","ip2:port","ip3:port"。
      # 不同地址之间以英文逗号 (,) 隔开。
      graph:["ip:9669"]
      meta:["ip:9559"]
    }
    # 填写的账号必须拥有 Nebula Graph 相应图空间的写数据权限。
    user: root
    pswd: ***
    # 填写 Nebula Graph 中需要写入数据的图空间名称。
    space: graphchain_prod
    connection: {
      timeout: 3000
      retry: 3
    }
    execution: {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors
    }
    rate: {
      limit: 1024
      timeout: 1000
    }
  }
# 处理边数据
  edges: [
    # 设置 Edge type common_transaction 相关信息
    {
      # Nebula Graph 中对应的 Edge type 名称。
      name: common_transaction

      type: {
        # 指定数据源文件格式,设置为 Kafka。
        source: kafka

        # 指定边数据导入 Nebula Graph 的方式,
        # 指定如何将点数据导入 Nebula Graph:Client 或 SST。
        sink: client
      }

      # Kafka 服务器地址。
      service: "***"
      # 消息类别。
      topic: "***"

      # 在 fields 里指定 Kafka value 中的字段名称,多个字段用英文逗号(,)隔开。Spark Structured Streaming 读取 Kafka 数据后会将其以 JSON 格式存储于 value 字段中,而这里的 fields 要配置 JSON 的 key 名。示例如下:
      fields: [hash,from_address,to_address,block_hash,block_no,currency,currency_name,currency_symbol,quantity,circa_quantity,fee,tx_time,blockchain,tx_index]
      # 设置与 fields 中的 key 对应的 Nebula Graph 属性名,key 的 value 将保存为相应的属性值。下方设置会将 degree 的 value 保存到 Nebula Graph 中的 degree 属性。
      nebula.fields: [hash,from_address,to_address,block_hash,block_no,currency,currency_name,currency_symbol,quantity,circa_quantity,fee,tx_time,blockchain,tx_index]

      # 在 source 里,将 topic 中某一列作为边的起始点数据源。
      # 在 target 里,将 topic 中某一列作为边的目的点数据源。
      source:{
          field:from_vid
      }

      target:{
          field:to_vid
      }

      # 指定一个列作为 rank 的源(可选)。
      #ranking: rank

      # 单批次写入 Nebula Graph 的数据条数。
      batch: 100

      # Spark 分区数量
      partition: 32

      # 读取消息的间隔。单位:秒。
      interval.seconds: 600
    }
	]
}

运行日志:

"stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[eth_common_transaction]]",
    "startOffset" : {
      "eth_common_transaction" : {
        "8" : 4011989,
        "11" : 4011989,
        "2" : 4011990,
        "5" : 4011987,
        "4" : 4011989,
        "7" : 4011987,
        "10" : 4011986,
        "1" : 4011989,
        "9" : 4011990,
        "3" : 4011988,
        "6" : 4011987,
        "0" : 4011990
      }
    },
    "endOffset" : {
      "eth_common_transaction" : {
        "8" : 4122239,
        "11" : 4122239,
        "2" : 4122240,
        "5" : 4122236,
        "4" : 4122256,
        "7" : 4122254,
        "10" : 4122253,
        "1" : 4122256,
        "9" : 4122326,
        "3" : 4122324,
        "6" : 4122323,
        "0" : 4122326
      }
    },
    "numInputRows" : 1323411,
    "inputRowsPerSecond" : 13234.11,
    "processedRowsPerSecond" : 6937.425300371139

请问 exchange 导入是全成功的对么?没有报错?然后 show stats 得到的少了?
NebulaGraph 的版本是 3.0.0?还是?

另外如果是边的话,可以确定 type,src,dst (因为rank确认了没有重复)的三元组没有重复的? Nebula 把四元组作为 key,没有单独的key存在,如果重复就是覆盖,不会 insert 新的数据。

1、导入全部成功
2、有一个超时报错
3、同样的数据,使用maxcompte导入数据是正常的,使用kafka导入数据缺少
4、nebula版本3.0.0
5、三元组没有重复

处理数据的日志:

22/08/04 10:28:10 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "740e7812-b2d3-45ca-b224-b87d6c1b645a",
  "runId" : "fbfeb4c5-e245-416f-bb75-d119ecacb35a",
  "name" : null,
  "timestamp" : "2022-08-04T02:25:00.000Z",
  "batchId" : 3,
  "numInputRows" : 1323411,
  "inputRowsPerSecond" : 13234.11,
  "processedRowsPerSecond" : 6937.425300371139,
  "durationMs" : {
    "addBatch" : 190674,
    "getBatch" : 1,
    "getEndOffset" : 0,
    "queryPlanning" : 45,
    "setOffsetRange" : 7,
    "triggerExecution" : 190764,
    "walCommit" : 20
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[eth_common_transaction]]",
    "startOffset" : {
      "eth_common_transaction" : {
        "8" : 4011989,
        "11" : 4011989,
        "2" : 4011990,
        "5" : 4011987,
        "4" : 4011989,
        "7" : 4011987,
        "10" : 4011986,
        "1" : 4011989,
        "9" : 4011990,
        "3" : 4011988,
        "6" : 4011987,
        "0" : 4011990
      }
22/08/04 10:28:10 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 100000 milliseconds, but spent 190765 milliseconds
22/08/04 10:28:10 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-b621d7e0-3288-4331-ae72-ddd4417f1404--791777280-driver-0] Resetting offset for partition eth_common_transaction-8 to offset 4124723.
22/08/04 10:28:10 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-b621d7e0-3288-4331-ae72-ddd4417f1404--791777280-driver-0] Resetting offset for partition eth_common_transaction-5 to offset 4124720.
22/08/04 10:28:10 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-b621d7e0-3288-4331-ae72-ddd4417f1404--791777280-driver-0] Resetting offset for partition eth_common_transaction-2 to offset 4124724.
22/08/04 10:28:10 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-b621d7e0-3288-4331-ae72-ddd4417f1404--791777280-driver-0] Resetting offset for partition eth_common_transaction-11 to offset 4124723.
22/08/04 10:28:10 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-b621d7e0-3288-4331-ae72-ddd4417f1404--791777280-driver-0] Resetting offset for partition eth_common_transaction-9 to offset 4124724.
22/08/04 10:28:10 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-b621d7e0-3288-4331-ae72-ddd4417f1404--791777280-driver-0] Resetting offset for partition eth_common_transaction-6 to offset 4124720.
22/08/04 10:28:10 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-b621d7e0-3288-4331-ae72-ddd4417f1404--791777280-driver-0] Resetting offset for partition eth_common_transaction-3 to offset 4124721.
22/08/04 10:28:10 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-b621d7e0-3288-4331-ae72-ddd4417f1404--791777280-driver-0] Resetting offset for partition eth_common_transaction-0 to offset 4124723.
22/08/04 10:28:10 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-b621d7e0-3288-4331-ae72-ddd4417f1404--791777280-driver-0] Resetting offset for partition eth_common_transaction-7 to offset 4124720.
22/08/04 10:28:10 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-b621d7e0-3288-4331-ae72-ddd4417f1404--791777280-driver-0] Resetting offset for partition eth_common_transaction-4 to offset 4124722.
22/08/04 10:28:10 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-b621d7e0-3288-4331-ae72-ddd4417f1404--791777280-driver-0] Resetting offset for partition eth_common_transaction-1 to offset 4124722.
22/08/04 10:28:10 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-b621d7e0-3288-4331-ae72-ddd4417f1404--791777280-driver-0] Resetting offset for partition eth_common_transaction-10 to offset 4124719.
22/08/04 10:28:10 INFO CheckpointFileManager: Writing atomically to file:/tmp/temporary-e54fb006-8937-4e18-9bf2-6e9a805a79bb/offsets/4 using temp file file:/tmp/temporary-e54fb006-8937-4e18-9bf2-6e9a805a79bb/offsets/.4.39bd0fe8-03ff-419f-8dc1-21ada0680be3.tmp
22/08/04 10:28:10 INFO CheckpointFileManager: Renamed temp file file:/tmp/temporary-e54fb006-8937-4e18-9bf2-6e9a805a79bb/offsets/.4.39bd0fe8-03ff-419f-8dc1-21ada0680be3.tmp to file:/tmp/temporary-e54fb006-8937-4e18-9bf2-6e9a805a79bb/offsets/4
22/08/04 10:28:10 INFO MicroBatchExecution: Committed offsets for batch 4. Metadata OffsetSeqMetadata(0,1659580090768,Map(spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider, spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion -> 2, spark.sql.streaming.multipleWatermarkPolicy -> min, spark.sql.streaming.aggregation.stateFormatVersion -> 2, spark.sql.shuffle.partitions -> 200))

感谢,能想办法给出最小重现么?比如构造100 条数据导入只有70 条。

从maxcompute进行导入的配置也贴一下吧。
如果kafka数据少 在你所配置的/tmp/errors 目录下应该有文件生成。

1 个赞

1、maxcompute配置:

{
  # Spark 相关配置
  spark: {
    app: {
      name: Nebula Exchange 3.0.0
    }
    driver: {
      cores: 1
      maxResultSize: 1G
    }
    cores: {
      max: 16
    }
  }

# Nebula Graph 相关配置
  nebula: {
    address:{
      # 以下为 Nebula Graph 的 Graph 服务和 Meta 服务所在机器的 IP 地址及端口。
      # 如果有多个地址,格式为 "ip1:port","ip2:port","ip3:port"。
      # 不同地址之间以英文逗号 (,) 隔开。
      graph:["*"]
      meta:["*"]
    }
    # 填写的账号必须拥有 Nebula Graph 相应图空间的写数据权限。
    user: root
    pswd: *
    # 填写 Nebula Graph 中需要写入数据的图空间名称。
    space: *
    connection: {
      timeout: 3000
      retry: 3
    }
    execution: {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors
    }
    rate: {
      limit: 1024
      timeout: 1000
    }
  }
# 处理边数据
  edges: [
    # 设置 Edge type common_transaction 相关信息
    {
      # Nebula Graph 中对应的 Edge type 名称。
      name: common_transaction

      type:{
        # 指定数据源文件格式,设置为 MaxCompute。
        source:maxcompute

        # 指定边数据导入 Nebula Graph 的方式,
        # 指定如何将点数据导入 Nebula Graph:Client 或 SST。
        sink:client
      }

      # MaxCompute 的表名
      table:dwd_eth_transactions_edge_updata

      # MaxCompute 的项目名
      project:graphchain

      # MaxCompute 服务的 odpsUrl 和 tunnelUrl,
      # 地址可在 https://help.aliyun.com/document_detail/34951.html 查看。
      odpsUrl:"http://service.cn-hongkong.maxcompute.aliyun-inc.com/api"
      tunnelUrl:"http://dt.cn-hongkong.maxcompute.aliyun-inc.com"

      # MaxCompute 服务的 accessKeyId 和 accessKeySecret。
      accessKeyId:LTAI5tFMsN2sak4cNbNX6Kwi
      accessKeySecret:Y2I6x1z5foYtYD5ykKHLpJMYy1mVUl

      # MaxCompute 表的分区描述,该配置可选。
      partitionSpec:"time=20220727"

      # 请确保 SQL 语句中的表名和上方 table 的值相同,该配置可选。 
      sentence:"select hash, from_address, to_address, block_hash, block_no, currency, currency_name, currency_symbol, quantity, circa_quantity, fee, tx_time, blockchain, tx_index, from_vid, to_vid, rank from dwd_eth_transactions_edge_updata"

      # 在 fields 里指定 follow 表中的列名称,其对应的 value 会作为 Nebula Graph 中指定属性。
      # fields 和 nebula.fields 里的配置必须一一对应。
      # 如果需要指定多个列名称,用英文逗号(,)隔开。
      fields:[hash, from_address, to_address, block_hash, block_no, currency, currency_name, currency_symbol, quantity, circa_quantity, fee, tx_time, blockchain, tx_index]
      nebula.fields:[hash, from_address, to_address, block_hash, block_no, currency, currency_name, currency_symbol, quantity, circa_quantity, fee, tx_time, blockchain, tx_index]

      # 在 source 里,将 filtration_transactions_data 表中某一列作为边的起始点数据源。
      source:{
        field: from_vid
      }

      # 在 target 里,将 filtration_transactions_data 表中某一列作为边的目的点数据源。
      target:{
        field: to_vid
      }

      # 指定一个列作为 rank 的源(可选)。
      ranking: rank

      # Spark 分区数量
      partition:32

      # 单批次写入 Nebula Graph 的数据条数。
      batch:100
    }
  ]
}

2、/tmp/errors 目录下没有文件生成

  1. maxcompute的配置文件中有配ranking,但你的fields字段中没有rank这个字段。
    并且kafka配置文件中没有配ranking,你再确认下两个数据源的配置是否一致吧
  2. 本地磁盘/tmp/errors/中没有文件生成,那看下hdfs上是否有
1 个赞

浙ICP备20010487号