- nebula 版本:3.2.1
- 部署方式:分布式
- 安装方式:RPM
- 是否为线上版本: N
配置exchange导入kafka数据,配置文件如下:
{
spark: {
app: {
name: Nebula Exchange 3.0.0
}
driver: {
cores: 1
maxResultSize: 1G
}
executor: {
memory:1G
}
cores: {
max: 16
}
}
nebula: {
address:{
graph:["192.168.1.35:9669","192.168.1.36:9669","192.168.1.37:9669"]
meta:["192.168.1.35:9559","192.168.1.36:9559","192.168.1.37:9559"]
}
user: xxxx
pswd: xxxx
space: test_kafka
connection: {
timeout: 3000
retry: 3
}
execution: {
retry: 3
}
error: {
max: 32
output: /mnt/errors
}
rate: {
limit: 1024
timeout: 1000
}
}
tags: [
{
name: sample
type: {
source: kafka
sink: client
}
service:"192.168.1.128:9092"
topic:"vertex_sample"
fields: [hash,hash_type]
nebula.fields: [hash,hash_type]
vertex: {
field:sample_id
}
batch: 256
partition: 32
}
]
}
spark版本:Spark version 2.3.2.3.1.0.0-78
exchange版本:nebula-exchange_spark_2.2-3.0.0.jar
报错如下:
23/02/11 10:58:57 WARN ClientUtils: Removing server xxxxx:6667 from bootstrap.servers as DNS resolution failed for xxxxx
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
at com.vesoft.nebula.exchange.reader.KafkaReader.read(StreamingBaseReader.scala:54)
at com.vesoft.nebula.exchange.Exchange$.com$vesoft$nebula$exchange$Exchange$$createDataSource(Exchange.scala:247)
at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:105)
at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:95)
at scala.collection.immutable.List.foreach(List.scala:381)
at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:95)
at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:60)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:654)
... 25 more
23/02/11 10:58:57 INFO SparkContext: Invoking stop() from shutdown hook
23/02/11 10:58:57 INFO AbstractConnector: Stopped Spark@24528a25{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
23/02/11 10:58:57 INFO SparkUI: Stopped Spark web UI at http://b4-udap73:4040
23/02/11 10:58:57 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/02/11 10:58:57 INFO MemoryStore: MemoryStore cleared
23/02/11 10:58:57 INFO BlockManager: BlockManager stopped
23/02/11 10:58:57 INFO BlockManagerMaster: BlockManagerMaster stopped
23/02/11 10:58:57 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/02/11 10:58:57 INFO SparkContext: Successfully stopped SparkContext
23/02/11 10:58:57 INFO ShutdownHookManager: Shutdown hook called
23/02/11 10:58:57 INFO ShutdownHookManager: Deleting directory /tmp/spark-311d2883-0f5f-4178-8b5e-cd0dfc129fa5
23/02/11 10:58:57 INFO ShutdownHookManager: Deleting directory /tmp/temporaryReader-270c7285-ee9c-44f4-8a92-45d4b76ff721
23/02/11 10:58:57 INFO ShutdownHookManager: Deleting directory /tmp/spark-a22ca912-882c-4999-82aa-03eb07233d27