nebula-exchange edge write error

  • nebula 版本:3.0.0

  • 部署方式:单机

  • 安装方式:TAR

  • 是否为线上版本:Y

  • 硬件信息

    • 磁盘:300G
    • CPU、内存: 4核16G
  • 问题的具体描述
    使用spark导入maxcompute数据出现写入异常,未知原因

  • 相关的日志信息

22/06/06 02:27:07 ERROR NebulaGraphClientWriter: write edge failed for SyntaxError: Out of range: near 'xxxxxxxx'

把 exchange 的版本号补充下,然后配置文件也贴一下

版本:commit b20fa1ad8a561f02b75849e058da06aecb906843 (HEAD → v3.0.0, tag: v3.0.0, origin/v3.0.0)
配置我不太清楚你需要哪部分,脱敏后的我全发出来吧

{
  # Spark relation com.vesoft.exchange.common.config
  spark: {
    app: {
      name: Nebula Exchange 3.0.0
    }

    master:local

    driver: {
      cores: 2
      maxResultSize: 4G
    }

    executor: {
        memory:4G
    }

    cores:{
      max: 4
    }
  }

  # Nebula Graph relation com.vesoft.exchange.common.config
  nebula: {
    account:{
      graph:["127.0.0.1:9669"]
      meta:["127.0.0.1:9559"]
    }
    user: root
    pswd: nebula
    space: bigdata

    # if com.vesoft.exchange.common.config graph ssl encrypted transmission
    ssl:{
        # if enable is false, other params of ssl are invalid.
        enable:{
            graph:false
            meta:false
        }
        # ssl sign type: CA or SELF
        sign.type:ca

        # if sign.type is CA, make sure com.vesoft.exchange.common.config the ca.param. If you submit exchange application with cluster, please make sure each worker has the ca files.
        ca.param: {
            caCrtFilePath:"/path/caCrtFilePath"
            crtFilePath:"/path/crtFilePath"
            keyFilePath:"/path/keyFilePath"
        }

        # if sign.type is SELF, make sure com.vesoft.exchange.common.config the self.param. If you submit exchange application with cluster, please make sure each worker has the ca files.
        self.param: {
            crtFilePath:"/path/crtFilePath"
            keyFilePath:"/path/keyFilePath"
            password:"nebula"
        }
    }


    # nebula client connection parameters
    connection {
      # socket connect & execute timeout, unit: millisecond
      timeout: 30000
    }

    error: {
      # max number of failures, if the number of failures is bigger than max, then exit the application.
      max: 32
      # failed import job will be recorded in output path
      output: /tmp/errors
    }

    # use google's RateLimiter to limit the requests send to NebulaGraph
    rate: {
      # the stable throughput of RateLimiter                                                                                               
      limit: 1024
      # Acquires a permit from RateLimiter, unit: MILLISECONDS
      # if it can't be obtained within the specified timeout, then give up the request.
      timeout: 1000
    }
  }

  # Processing tags
  # There are tag com.vesoft.exchange.common.config examples for different dataSources.
  tags: [
    # MaxCompute
    {
      name: account
      type:{
        source:maxcompute
        sink:client
      }
      table:dwd_account
      project:bigdata
      odpsUrl:"http://service.cn-hongkong.maxcompute.aliyun.com/api"
      tunnelUrl:"http://dt.cn-hongkong.maxcompute.aliyun.com"
      accessKeyId:accessKey
      accessKeySecret:accessSecret
      # default numPartitions is 1
      numPartitions:1
      # maxcompute sql sentence only uses table name. make sure that table name is the same with {table}'s value'.
      sentence:"select * from dwd_account"
      fields:[account, label]
      nebula.fields:[account, label]
      vertex:{
        field: account
      }
      partition:1
      batch:10
    }
  ]

  # Processing edges
  # There are edge com.vesoft.exchange.common.config examples for different dataSources.
  edges: [
    # MaxCompute
    {
      name: account_rel
      type:{
        source:maxcompute
        sink:client
      }
      table:dwd_account_rel
      project:bigdata
      odpsUrl:"http://service.cn-hongkong.maxcompute.aliyun.com/api"
      tunnelUrl:"http://dt.cn-hongkong.maxcompute.aliyun.com"
      accessKeyId:accessKey
      accessKeySecret:accessSecret
      # maxcompute sql sentence only uses table name.
      sentence:"select * from dwd_account_rel"
      fields:[id,src_account,dst_account]
      nebula.fields:[id,src_account,dst_account]
      source:{
        field: src_account
      }
      target:{
        field: dst_account
      }
      ranking: account_group_no
      partition:1
      batch:10
    }
  ]
}

还有一个问题,就是我的导入速度非常的慢,我第一次搞不太清楚如何调整,点数大约250w,边数量大约14000w,报错是写入时长已经1个多小时了吧,这样尝试会有点伤,我感觉等你们确定了问题之后,我得小数据量进行测试才行

这个。。配置是一台普通款笔记本吗?数量量大概多大。

不是,spark是之前测试的服务器,4核16g的,maxcompute数据量目前有10多个G,先不考虑这个速度的问题吧,报错那个有进展了么,我之前小数据量应该是成功过了的

这里是不是因为是敏感所以其实是有 keyid 信息的但是被你修正成了对应的字段名

这种的话,你试试把 edge 那边的 batch 从 10 调整到 5,看看会不会还报错呢

对的,这个是脱敏了的,实际上这是正确的哈

这个batch会增加耗时么

write edge failed for SyntaxError: Out of range: near这个错误的原因是啥啊

会因为数据本身可能存在异常导致么

会,- -。我只是想让你试试那个报错是不是因为 batch 设置太大了(虽然我觉得 10 不大

:thinking: 我对了下你的配置信息,是没问题的。。除非你的原始数据有问题,比如多了一个字段之类的?

那关于增加数据导入速度的问题呢,是需要扩展spark集群么

batch 这个参数调就好了。如果是机械盘的话,换成固态盘也会有一波性能提升。

是阿里云ESSD了,这个batch一般情况可以调到多少合适

- -。调参啊,你可以 2x,2x 的调,从 10 调到 20,到 40,你看耗时…

spark集群的配置有啥要求,基础或者推荐配置有啥讲究么,给点提示呗

之前看到一个帖子,按理说更新会比插入慢,是需要删除空间重新搞,这个有啥清空的技能么,比如mysql清表的操作,我不清楚底层原理这个比喻是否恰当哈