Star

hive 导入 key not found

  • Nebula 1.2.0
  • Exchange1.1.0
    配置文件:
{
  # Spark 相关配置
  spark: {
    app: {
      name: Spark Writer
    }
    driver: {
      cores: 1
      maxResultSize: 1G
    }
    cores {
      max: 16
    }
  }
  # Nebula Graph 相关配置
  nebula: {
    address:{
      # 以下为 Nebula Graph 的 Graph 服务和 Meta 服务所在机器的 IP 地址及端口
      # 如果有多个地址,格式为 "ip1:port","ip2:port","ip3:port"
      # 不同地址之间以英文逗号 (,) 隔开
      graph:["10.86.87.15:3699"]
      meta:["10.86.87.15:45500"]
    }
    # 填写的账号必须拥有 Nebula Graph 相应图空间的写数据权限
    user: user
    pswd: password
    # 填写 Nebula Graph 中需要写入数据的图空间名称
    space: JC
    connection {
      timeout: 3000
      retry: 3
    }
    execution {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors
    }
    rate: {
      limit: 1024
      timeout: 1000
    }
  }
  # 处理标签
  tags: [
    # 设置标签相关信息
    {
      # Nebula Graph 中对应的标签名称。
      name: user
      type: {
        # 指定数据源文件格式,设置为 hive。
        source: hive
        # 指定点数据导入 Nebula Graph 的方式,
        # 可以设置为:client(以客户端形式导入)和 sst(以 SST 文件格式导入)。
        # 关于 SST 文件导入配置,参考文档:导入 SST 文件(https://
        # docs.nebula-graph.com.cn/nebula-exchange/
        # use-exchange/ex-ug-import-sst/)。
        sink: client
      }

      # 设置读取数据库 mooc 中 users 表数据的 SQL 语句
      exec: "select user_id,create_time from wujie.dim_usr_user_base_info where dt='2021-01-27'"

      # 在 fields 里指定 users 表中的列名称,其对应的 value
      # 会作为 Nebula Graph 中指定属性 userId (nebula.fields) 的数据源
      # fields 和 nebula.fields 里的配置必须一一对应
      # 如果需要指定多个列名称,用英文逗号(,)隔开
      fields: [user_id,create_time]
      nebula.fields: [user_id,create_time]

      # 指定表中某一列数据为 Nebula Graph 中点 VID 的来源。
      # vertex.field 的值必须与上述 fields 中的列名保持一致。
      # 如果数据不是 int 类型,则添加 vertex.policy 指定 VID 映射策略,建议设置为 "hash",参考以下 course 标签的设置。
      vertex: {
        field: user_id
        policy: "hash"
      }

      # 单次写入 Nebula Graph 的最大点数据量。
      batch: 256

      # Spark 分区数量
      partition: 32

      # isImplicit 的设置说明参考:https://github.com/vesoft-inc/nebula-java/
      # blob/v1.0/tools/exchange/src/main/resources/application.conf
      isImplicit: true
    }
    {
      name: phone
      type: {
        source: hive
        sink: client
      }
      exec: "select id from jc_antispam.dwd_identities_user_leader_bd_supplier_df where dt='2021-01-27' and id_type='phone' group by id"
      fields: [id]
      nebula.fields: [id]

      # 指定表中某一列数据为 Nebula Graph 中点 VID 的来源。
      # vertex.field 的值必须与上述 fields 中的列名保持一致。
      # 如果数据不是 int 类型,则添加 vertex.policy 指定 VID 映射策略,建议设置为 "hash"。
      vertex: {
        field: id
        policy: "hash"
      }
      batch: 256
      partition: 32
      isImplicit: true
    }

  ]

  # 处理边数据
  edges: [
    # 设置边类型 action 相关信息
    {
      # Nebula Graph 中对应的边类型名称。
      name: authorization

      type: {
        # 指定数据源文件格式,设置为 hive。
        source: hive

        # 指定边数据导入 Nebula Graph 的方式,
        # 可以设置为:client(以客户端形式导入)和 sst(以 SST 文件格式导入)。
        # 关于 SST 文件导入配置,参考文档:导入 SST 文件(https://
        # docs.nebula-graph.com.cn/nebula-exchange/
        # use-exchange/ex-ug-import-sst/)。
        sink: client
      }

      # 设置读取数据库 mooc 中 actions 表数据的 SQL 语句
      exec: "select id1,id2,rel_value,is_effect from jc_antispam.dwd_role_one_degree_rel_df where  id1_label = '用户' and id2_label = '手机号'  and rel_label = '授权'  and dt = '2021-01-26'"

      # 在 fields 里指定 actions 表中的列名称,其对应的 value
      # 会作为 Nebula Graph 中 action 的属性(nebula.fields)值来源
      # fields 和 nebula.fields 里的配置必须一一对应
      # 如果需要指定多个列名称,用英文逗号(,)隔开
      fields: [rel_value,is_effect]
      nebula.fields: [rel_value,is_effect]

      # 在 source 里,将 actions 表中某一列作为边起点数据源
      # 在 target 里,将 actions 表中某一列作为边终点数据源
      # 如果数据源是 int 或 long 类型,直接指定列名
      # 如果数据源不是 int 类型,则添加 vertex.policy 指定 VID 映射策略,建议设置为 "hash"
      source: {
        field: id1
        policy: "hash"
      }
      target: {
        field: id2
        policy: "hash"
      }

      # 单次写入 Nebula Graph 的最大点数据量。
      batch: 256

      # Spark 分区数量
      partition: 32
    }
  ]
}

报错信息

21/02/02 12:11:31 ERROR MetaClientImpl: Get tags Error: -23
Exception in thread "main" java.util.NoSuchElementException: key not found: user_id
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:59)
        at scala.collection.MapLike$class.apply(MapLike.scala:141)
        at scala.collection.AbstractMap.apply(Map.scala:59)
        at com.vesoft.nebula.tools.importer.utils.NebulaUtils$$anonfun$getDataSourceFieldType$1.apply(NebulaUtils.scala:65)
        at com.vesoft.nebula.tools.importer.utils.NebulaUtils$$anonfun$getDataSourceFieldType$1.apply(NebulaUtils.scala:64)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at com.vesoft.nebula.tools.importer.utils.NebulaUtils$.getDataSourceFieldType(NebulaUtils.scala:64)
        at com.vesoft.nebula.tools.importer.processor.VerticesProcessor.process(VerticesProcessor.scala:138)
        at com.vesoft.nebula.tools.importer.Exchange$$anonfun$main$2.apply(Exchange.scala:174)
        at com.vesoft.nebula.tools.importer.Exchange$$anonfun$main$2.apply(Exchange.scala:152)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at com.vesoft.nebula.tools.importer.Exchange$.main(Exchange.scala:152)
        at com.vesoft.nebula.tools.importer.Exchange.main(Exchange.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:780)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

检查一下 nebula 中有没有创建对应的 tag 或者 edge ?

cc @nicole @darionyaphet

是有创建的

报错是库里找不到吗?

这里报错了,你应该是使用的最新的1.2的docker镜像,客户端和最新的docker镜像是不兼容的,你可以使用20年12月之前的镜像。

https://docs.nebula-graph.com.cn/manual-CN/3.build-develop-and-administration/2.install/deploy-nebula-with-swarm/
是通过集群部署的,现在要怎么改?

@yee 用户这边是不是需要使用旧的镜像重新部署

你现在部署的 nebula 镜像是什么版本的? 1.2.0 ?

通过Docker Swarm部署怎么看是哪个版本?

可以看 docker image 的 tag

docker-compose --version
docker-compose version 1.24.0, build 0aa59064

这个吗

docker image 的 tag,类似这行中的 v2-nightly


image: vesoft/nebula-metad:nightly

你把 docker 镜像的 tag 都修改成 v1.2.0 类似 vesoft/nebula-metad:v1.2.0,重新 deploy stack 文件,再验证试试。

问题原因是这个哈

好的, 改成v1.2.0 就应该可以了是吧

是的,应该可以的

1赞

改了之后报链接失败。。。


Application Id: application_1612237606203_991080, Tracking URL: http://bigdata-nmg-hdprm00.nmg01:8088/proxy/application_1612237606203_991080/


[Stage 1:>                                                        (0 + 32) / 32]21/02/03 21:19:58 ERROR TaskSetManager: Task 9 in stage 1.0 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 1.0 failed 4 times, most recent failure: Lost task 9.3 in stage 1.0 (TID 449, bigdata-nmg-hdp4805.nmg01.diditaxi.com, executor 43): java.lang.RuntimeException: Connection Failed
        at com.vesoft.nebula.tools.importer.writer.NebulaGraphClientWriter.prepare(ServerBaseWriter.scala:153)
        at com.vesoft.nebula.tools.importer.processor.VerticesProcessor.com$vesoft$nebula$tools$importer$processor$VerticesProcessor$$processEachPartition(VerticesProcessor.scala:76)
        at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$process$4.apply(VerticesProcessor.scala:274)
        at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$process$4.apply(VerticesProcessor.scala:274)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2079)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2079)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2039)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2060)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2079)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2104)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2345)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2345)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2345)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2824)
        at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2344)
        at com.vesoft.nebula.tools.importer.processor.VerticesProcessor.process(VerticesProcessor.scala:274)
        at com.vesoft.nebula.tools.importer.Exchange$$anonfun$main$2.apply(Exchange.scala:174)
        at com.vesoft.nebula.tools.importer.Exchange$$anonfun$main$2.apply(Exchange.scala:152)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at com.vesoft.nebula.tools.importer.Exchange$.main(Exchange.scala:152)
        at com.vesoft.nebula.tools.importer.Exchange.main(Exchange.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:780)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Connection Failed
        at com.vesoft.nebula.tools.importer.writer.NebulaGraphClientWriter.prepare(ServerBaseWriter.scala:153)
        at com.vesoft.nebula.tools.importer.processor.VerticesProcessor.com$vesoft$nebula$tools$importer$processor$VerticesProcessor$$processEachPartition(VerticesProcessor.scala:76)
        at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$process$4.apply(VerticesProcessor.scala:274)
        at com.vesoft.nebula.tools.importer.processor.VerticesProcessor$$anonfun$process$4.apply(VerticesProcessor.scala:274)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2079)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2079)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
[Stage 1:>                                                        (0 + 31) / 32]21/02/03 21:20:00 ERROR TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/exchange-1.1.0.jar, byteCount=193661549, body=FileSegmentManagedBuffer{file=/home/gulfstream_antispam/yinjianbo/exchange-1.1.0.jar, offset=0, length=193661549}} to /10.77.31.36:48484; closing connection
java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
21/02/03 21:20:00 ERROR TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/exchange-1.1.0.jar, byteCount=193661549, body=FileSegmentManagedBuffer{file=/home/gulfstream_antispam/yinjianbo/exchange-1.1.0.jar, offset=0, length=193661549}} to /10.77.31.36:48476; closing connection
java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
21/02/03 21:20:00 ERROR TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/exchange-1.1.0.jar, byteCount=193661549, body=FileSegmentManagedBuffer{file=/home/gulfstream_antispam/yinjianbo/exchange-1.1.0.jar, offset=0, length=193661549}} to /10.77.73.35:56360; closing connection
java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
21/02/03 21:20:00 ERROR TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/exchange-1.1.0.jar, byteCount=193661549, body=FileSegmentManagedBuffer{file=/home/gulfstream_antispam/yinjianbo/exchange-1.1.0.jar, offset=0, length=193661549}} to /10.77.73.35:56362; closing connection
java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
21/02/03 21:20:00 ERROR TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/exchange-1.1.0.jar, byteCount=193661549, body=FileSegmentManagedBuffer{file=/home/gulfstream_antispam/yinjianbo/exchange-1.1.0.jar, offset=0, length=193661549}} to /10.77.73.35:56376; closing connection
java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
21/02/03 21:20:00 ERROR TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/exchange-1.1.0.jar, byteCount=193661549, body=FileSegmentManagedBuffer{file=/home/gulfstream_antispam/yinjianbo/exchange-1.1.0.jar, offset=0, length=193661549}} to /10.83.47.64:42594; closing connection
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
        at sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(FileChannelImpl.java:428)
        at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:493)
        at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:608)
        at io.netty.channel.DefaultFileRegion.transferTo(DefaultFileRegion.java:139)
        at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:121)
        at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:287)
        at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:237)
        at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:314)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:802)
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:319)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:637)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:745)
21/02/03 21:20:00 ERROR TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/exchange-1.1.0.jar, byteCount=193661549, body=FileSegmentManagedBuffer{file=/home/gulfstream_antispam/yinjianbo/exchange-1.1.0.jar, offset=0, length=193661549}} to /10.83.164.14:52668; closing connection
java.io.IOException: Broken pipe
        at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
        at sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(FileChannelImpl.java:428)
        at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:493)
        at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:608)
        at io.netty.channel.DefaultFileRegion.transferTo(DefaultFileRegion.java:139)
        at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:121)
        at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:287)
        at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:237)
        at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:314)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:802)
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:319)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:637)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:745)

这个是graphd连接失败,你看下graphd服务是否正常,也麻烦贴一下exchange的配置文件

之前按照之后结果是这样的

exchange的配置文件

{
  # Spark 相关配置
  spark: {
    app: {
      name: Spark Writer
    }
    driver: {
      cores: 1
      maxResultSize: 1G
    }
    cores {
      max: 16
    }
  }
  # Nebula Graph 相关配置
  nebula: {
    address:{
      # 以下为 Nebula Graph 的 Graph 服务和 Meta 服务所在机器的 IP 地址及端口
      # 如果有多个地址,格式为 "ip1:port","ip2:port","ip3:port"
      # 不同地址之间以英文逗号 (,) 隔开
      graph:["10.86.87.15:3699"]
      meta:["10.86.87.15:45500"]
    }
    # 填写的账号必须拥有 Nebula Graph 相应图空间的写数据权限
    user: user
    pswd: password
    # 填写 Nebula Graph 中需要写入数据的图空间名称
    space: JC
    connection {
      timeout: 3000
      retry: 3
    }
    execution {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors
    }
    rate: {
      limit: 1024
      timeout: 1000
    }
  }
  # 处理标签
  tags: [
    # 设置标签相关信息
    {
      # Nebula Graph 中对应的标签名称。
      name: user
      type: {
        # 指定数据源文件格式,设置为 hive。
        source: hive
        # 指定点数据导入 Nebula Graph 的方式,
        # 可以设置为:client(以客户端形式导入)和 sst(以 SST 文件格式导入)。
        # 关于 SST 文件导入配置,参考文档:导入 SST 文件(https://
        # docs.nebula-graph.com.cn/nebula-exchange/
        # use-exchange/ex-ug-import-sst/)。
        sink: client
      }

      # 设置读取数据库 mooc 中 users 表数据的 SQL 语句
      exec: "select user_id,create_time from wujie.dim_usr_user_base_info where dt='2021-01-27'"

      # 在 fields 里指定 users 表中的列名称,其对应的 value
      # 会作为 Nebula Graph 中指定属性 userId (nebula.fields) 的数据源
      # fields 和 nebula.fields 里的配置必须一一对应
      # 如果需要指定多个列名称,用英文逗号(,)隔开
      fields: [user_id,create_time]
      nebula.fields: [user_id,create_time]

      # 指定表中某一列数据为 Nebula Graph 中点 VID 的来源。
      # vertex.field 的值必须与上述 fields 中的列名保持一致。
      # 如果数据不是 int 类型,则添加 vertex.policy 指定 VID 映射策略,建议设置为 "hash",参考以下 course 标签的设置。
      vertex: {
        field: user_id
        policy: "hash"
      }

      # 单次写入 Nebula Graph 的最大点数据量。
      batch: 256

      # Spark 分区数量
      partition: 32

      # isImplicit 的设置说明参考:https://github.com/vesoft-inc/nebula-java/
      # blob/v1.0/tools/exchange/src/main/resources/application.conf
      isImplicit: true
    }
    {
      name: phone
      type: {
        source: hive
        sink: client
      }
      exec: "select id from jc_antispam.dwd_identities_user_leader_bd_supplier_df where dt='2021-01-27' and id_type='phone' group by id"
      fields: [id]
      nebula.fields: [id]

      # 指定表中某一列数据为 Nebula Graph 中点 VID 的来源。
      # vertex.field 的值必须与上述 fields 中的列名保持一致。
      # 如果数据不是 int 类型,则添加 vertex.policy 指定 VID 映射策略,建议设置为 "hash"。
      vertex: {
        field: id
        policy: "hash"
      }
      batch: 256
      partition: 32
      isImplicit: true
    }

  ]

  # 处理边数据
  edges: [
    # 设置边类型 action 相关信息
    {
      # Nebula Graph 中对应的边类型名称。
      name: authorization

      type: {
        # 指定数据源文件格式,设置为 hive。
        source: hive

        # 指定边数据导入 Nebula Graph 的方式,
        # 可以设置为:client(以客户端形式导入)和 sst(以 SST 文件格式导入)。
        # 关于 SST 文件导入配置,参考文档:导入 SST 文件(https://
        # docs.nebula-graph.com.cn/nebula-exchange/
        # use-exchange/ex-ug-import-sst/)。
        sink: client
      }

      # 设置读取数据库 mooc 中 actions 表数据的 SQL 语句
      exec: "select id1,id2,rel_value,is_effect from jc_antispam.dwd_role_one_degree_rel_df where  id1_label = '用户' and id2_label = '手机号'  and rel_label = '授权'  and dt = '2021-01-26'"

      # 在 fields 里指定 actions 表中的列名称,其对应的 value
      # 会作为 Nebula Graph 中 action 的属性(nebula.fields)值来源
      # fields 和 nebula.fields 里的配置必须一一对应
      # 如果需要指定多个列名称,用英文逗号(,)隔开
      fields: [rel_value,is_effect]
      nebula.fields: [rel_value,is_effect]

      # 在 source 里,将 actions 表中某一列作为边起点数据源
      # 在 target 里,将 actions 表中某一列作为边终点数据源
      # 如果数据源是 int 或 long 类型,直接指定列名
      # 如果数据源不是 int 类型,则添加 vertex.policy 指定 VID 映射策略,建议设置为 "hash"
      source: {
        field: id1
        policy: "hash"
      }
      target: {
        field: id2
        policy: "hash"
      }

      # 单次写入 Nebula Graph 的最大点数据量。
      batch: 256

      # Spark 分区数量
      partition: 32
    }
  ]
}

浙ICP备20010487号