nebula使用exchange导入kafka数据

  • nebula 版本:3.2.1
  • 部署方式:分布式
  • 安装方式:RPM
  • 是否为线上版本: N
    配置exchange导入kafka数据,配置文件如下:
{
  spark: {
    app: {
      name: Nebula Exchange 3.0.0
    }
    driver: {
      cores: 1
      maxResultSize: 1G
    }
    executor: {
        memory:1G
    }

    cores: {
      max: 16
    }
  }

  nebula: {
    address:{
      graph:["192.168.1.35:9669","192.168.1.36:9669","192.168.1.37:9669"]
      meta:["192.168.1.35:9559","192.168.1.36:9559","192.168.1.37:9559"]
    }
    user: xxxx
    pswd: xxxx

    space: test_kafka
    connection: {
      timeout: 3000
      retry: 3
    }
    execution: {
      retry: 3
    }
    error: {
      max: 32
      output: /mnt/errors
    }
    rate: {
      limit: 1024
      timeout: 1000
    }
  }
tags: [
    {
      name: sample
      type: {
        source: kafka
        sink: client
      }
     service:"192.168.1.128:9092"
     topic:"vertex_sample"  
      fields: [hash,hash_type]
      nebula.fields: [hash,hash_type]
      vertex: {
        field:sample_id
      }

      batch: 256
      partition: 32
    }
  ]
}

spark版本:Spark version 2.3.2.3.1.0.0-78
exchange版本:nebula-exchange_spark_2.2-3.0.0.jar
报错如下:

23/02/11 10:58:57 WARN ClientUtils: Removing server xxxxx:6667 from bootstrap.servers as DNS resolution failed for xxxxx
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
	at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
	at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
	at com.vesoft.nebula.exchange.reader.KafkaReader.read(StreamingBaseReader.scala:54)
	at com.vesoft.nebula.exchange.Exchange$.com$vesoft$nebula$exchange$Exchange$$createDataSource(Exchange.scala:247)
	at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:105)
	at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:95)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:95)
	at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:60)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:654)
	... 25 more
23/02/11 10:58:57 INFO SparkContext: Invoking stop() from shutdown hook
23/02/11 10:58:57 INFO AbstractConnector: Stopped Spark@24528a25{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
23/02/11 10:58:57 INFO SparkUI: Stopped Spark web UI at http://b4-udap73:4040
23/02/11 10:58:57 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/02/11 10:58:57 INFO MemoryStore: MemoryStore cleared
23/02/11 10:58:57 INFO BlockManager: BlockManager stopped
23/02/11 10:58:57 INFO BlockManagerMaster: BlockManagerMaster stopped
23/02/11 10:58:57 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/02/11 10:58:57 INFO SparkContext: Successfully stopped SparkContext
23/02/11 10:58:57 INFO ShutdownHookManager: Shutdown hook called
23/02/11 10:58:57 INFO ShutdownHookManager: Deleting directory /tmp/spark-311d2883-0f5f-4178-8b5e-cd0dfc129fa5
23/02/11 10:58:57 INFO ShutdownHookManager: Deleting directory /tmp/temporaryReader-270c7285-ee9c-44f4-8a92-45d4b76ff721
23/02/11 10:58:57 INFO ShutdownHookManager: Deleting directory /tmp/spark-a22ca912-882c-4999-82aa-03eb07233d27

exchange 是什么版本?

可以看下文档中给出的版本兼容性 https://docs.nebula-graph.com.cn/3.4.0/nebula-exchange/about-exchange/ex-ug-what-is-exchange/#_4

1 个赞

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。