nebula导入hive里面的json失败:key not found

nebula 1.2.0
exchange 1.1.0

userInfo.json:

[{"userId": 1}, {"userId": 2}, {"userId": 3}, {"userId": 4}]

user表:

>>>describe user
>>>
===================
| Field  | Type   |
===================
| userId | string |
-------------------

报错:

Exception in thread "main" java.util.NoSuchElementException: key not found: userId
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:59)
        at scala.collection.MapLike$class.apply(MapLike.scala:141)
        at scala.collection.AbstractMap.apply(Map.scala:59)
        at com.vesoft.nebula.tools.importer.utils.NebulaUtils$$anonfun$getDataSourceFieldType$1.apply(NebulaUtils.scala:65)
        at com.vesoft.nebula.tools.importer.utils.NebulaUtils$$anonfun$getDataSourceFieldType$1.apply(NebulaUtils.scala:64)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at com.vesoft.nebula.tools.importer.utils.NebulaUtils$.getDataSourceFieldType(NebulaUtils.scala:64)
        at com.vesoft.nebula.tools.importer.processor.VerticesProcessor.process(VerticesProcessor.scala:137)
        at com.vesoft.nebula.tools.importer.Exchange$$anonfun$main$2.apply(Exchange.scala:174)
        at com.vesoft.nebula.tools.importer.Exchange$$anonfun$main$2.apply(Exchange.scala:152)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at com.vesoft.nebula.tools.importer.Exchange$.main(Exchange.scala:152)
        at com.vesoft.nebula.tools.importer.Exchange.main(Exchange.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

这是application.conf:

{
  # Spark relation config
  spark: {
    app: {
      name: Spark Writer
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    cores {
      max: 16
    }
  }

  # Nebula Graph relation config
  nebula: {
    address:{
      graph:["9.135.95.249:13708"]
      meta:["9.135.95.249:22343"]
    }
    user: root
    pswd: nebula
    space: loadCsv

    connection {
      timeout: 3000
      retry: 3
    }

    execution {
      retry: 3
    }

    error: {
      max: 32
      output: /tmp/errors
    }

    rate: {
      limit: 1024
      timeout: 1000
    }
  }

  # Processing tags
  tags: [

    # Loading tag from HDFS and data type is json
    {
      name: user
      type: {
        source: json
        sink: client
      }
      path: "hdfs://9.135.146.42/plato/userInfo.json"
      fields: ["userId"]
      nebula.fields: ["userId"]
      vertex: {
        field:userId,
        policy:"hash"
      }
      separator: ","
      header: true
      batch: 256
      partition: 32
      isImplicit: true
    }
  ]
}

导入csv也一样不知道发生什么事了
真的可以说是非常之绝望了

你可以参考 hive 导入 key not found

嗯嗯,之前看过,但是我不是按照docker部署的,我是安装在本地的,而且我没有报这个错:

ERROR meta.MetaClientImpl: Get tags Error: -23

你看下你在nebula-graph里面创建的schema,你的userinfo对应到nebula-graph的tag,查询下,然后截图。

这个是因为配置文件中配置的 nebula.fields: [“userId”]字段在NebulaGraph中不存在。 检查下NebulaGraph与Hive中userId相对应的字段叫什么然后修改下配置文件。

好的我怕数据类型不对,做了两个,都是同样问题
image
image
我的原表如下:
image

你的nebula-graph是用rpm/deb安装的吗?用的是哪个安装包?还有你的metad有几个?
看这个链接 https://docs.nebula-graph.com.cn/manual-CN/1.overview/2.quick-start/2.FAQ/#_4 ,把二进制版本号发下。

我用的git安装的,我的metad只有一个,二进制版本信息如下

nebula-graphd version , Build Time: Jan 19 2021 18:20:05
This source code is licensed under Apache 2.0 License, attached with Common Clause Condition 1.0.

你是下载源码自己编译安装的吧,连版本号和commitid都没有,至少也要提供commit id,不然我们没办法知道你具体都版本。你可以用你 java client对应都版本通过metaclient去取下 loadCsv 的 user 这个tag,看能不能正常取到。例子参考 https://github.com/vesoft-inc/nebula-java/blob/v1.0/examples/src/main/java/com/vesoft/nebula/examples/MetaClientExample.java

你好, dingding 请问下, 我尝试用Flink连接Nebula, 发现graph可以连接, 但是meta连接不上; 这个错误是连接不上meta? 还是连接上了meta后获取schema出错(我之前因为删除space没有自动删除对应的目录文件, 所以自己删除过space目录)


这个错误不是连不上meta,是你访问的space下面没有你要访问的player, 你通过 console 切到 flinkSink 这个space下面,然后show tags,结果截图下。

你的意思就是要提前创建space和tag, edge :joy:, 我还以为是自己自动创建,

肯定是你提前创建好的呀

1 个赞

感谢大佬指导, 对了, 顺便请教下, 我们准备测试下用 kafka → flink → Nebula的导入速度, 用这套代码没问题吧
image

可以的

1 个赞

hello, dingding
请教下有关Flink的导入Nebula, 我看你们的demo是用的source是一个list然后sink写到Nebula中, 但是实际场景中肯定不是list的source, 我请教下我这里是kafaka的source的话, 这块应该是如何写的呢?

我也是可以自己吧kakfa中的数据处理成一条消息对应假如50行记录, 就是咨询下你, 在用Flink对接Nebula的话, 一条消息是一行记录和多行记录, 它的这写入速度有区别吗??? 最后这个到底是一条一条的写入Nebula还是批量写???


在这个帖子中已回复你的问题

1 个赞