Nebula 2.5.0 使用exchange 生成sst文件报错java.io.NotSerializableException: com.google.common.net.HostAndPort

  • nebula 版本:2.5.0

  • 部署方式(分布式 CDH6.3.2 集群):

  • 是否为线上版本: N

  • 硬件信息

    • 磁盘:200G
    • CPU、内存信息: 4核16G内存
  • 问题的具体描述

  • 使用的spark-submit 提交命令:

# !/bin/bash

set -x

spark-submit \
--master yarn \
--files /data/nebula-datas/player-csv-sst.conf \
--deploy-mode cluster \
--class com.vesoft.nebula.exchange.Exchange   /data/softs/nebula-sst/nebula-exchange-2.5.0.jar  \
-c /data/nebula-datas/player-csv-sst.conf

从报错日志信息看,任务提交的时候,spark task 执行的时候,对象未序列化, 使用的是官网手册中player 的实例数据,完全按照操作步骤来的, 不知道这是spark-submit 命令上哪里有问题么,我们环境是yarn-cluster模式,另外还有一个问题,就是一直报配置文件not exist , 我们使用yarn-cluster模式,在启动的时候也指定–files 配置文件名,为什么任务执行还是找不到配置呢?我在CDH中每个节点的对应目录下创建该目录,把文件放过去,再指定–files /data/nebula-datas/player-csv-sst.conf 就可以running,但是running 报错如上

  • 相关的 meta / storage / graph info 日志信息(尽量使用文本形式方便检索)
21/09/13 14:04:33 INFO conf.HiveConf: Found configuration file file:/etc/hive/conf.cloudera.hive/hive-site.xml
21/09/13 14:04:33 INFO security.YARNHadoopDelegationTokenManager: Attempting to load user's ticket cache.
21/09/13 14:04:33 INFO yarn.Client: Submitting application application_1628391375886_0163 to ResourceManager
21/09/13 14:04:34 INFO impl.YarnClientImpl: Submitted application application_1628391375886_0163
21/09/13 14:04:35 INFO yarn.Client: Application report for application_1628391375886_0163 (state: ACCEPTED)
21/09/13 14:04:35 INFO yarn.Client:
         client token: N/A
         diagnostics: AM container is launched, waiting for AM container to Register with RM
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: root.users.root
         start time: 1631513073997
         final status: UNDEFINED
         tracking URL: http://CDH01:8088/proxy/application_1628391375886_0163/
         user: root
21/09/13 14:04:36 INFO yarn.Client: Application report for application_1628391375886_0163 (state: ACCEPTED)
21/09/13 14:04:37 INFO yarn.Client: Application report for application_1628391375886_0163 (state: ACCEPTED)
21/09/13 14:04:38 INFO yarn.Client: Application report for application_1628391375886_0163 (state: ACCEPTED)
21/09/13 14:04:39 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:39 INFO yarn.Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: CDH02
         ApplicationMaster RPC port: 35514
         queue: root.users.root
         start time: 1631513073997
         final status: UNDEFINED
         tracking URL: http://CDH01:8088/proxy/application_1628391375886_0163/
         user: root
21/09/13 14:04:40 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:41 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:42 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:43 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:44 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:45 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:46 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:47 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:48 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:49 INFO yarn.Client: Application report for application_1628391375886_0163 (state: ACCEPTED)
21/09/13 14:04:49 INFO yarn.Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: root.users.root
         start time: 1631513073997
         final status: UNDEFINED
         tracking URL: http://CDH01:8088/proxy/application_1628391375886_0163/
         user: root
21/09/13 14:04:50 INFO yarn.Client: Application report for application_1628391375886_0163 (state: ACCEPTED)
21/09/13 14:04:51 INFO yarn.Client: Application report for application_1628391375886_0163 (state: ACCEPTED)
21/09/13 14:04:52 INFO yarn.Client: Application report for application_1628391375886_0163 (state: ACCEPTED)
21/09/13 14:04:53 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:53 INFO yarn.Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: CDH02
         ApplicationMaster RPC port: 37239
         queue: root.users.root
         start time: 1631513073997
         final status: UNDEFINED
         tracking URL: http://CDH01:8088/proxy/application_1628391375886_0163/
         user: root
21/09/13 14:04:54 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:55 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:56 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:57 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:58 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:04:59 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:05:00 INFO yarn.Client: Application report for application_1628391375886_0163 (state: RUNNING)
21/09/13 14:05:01 INFO yarn.Client: Application report for application_1628391375886_0163 (state: FINISHED)
21/09/13 14:05:01 INFO yarn.Client:
         client token: N/A
         diagnostics: User class threw exception: org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2346)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:618)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3037)
        at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3035)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2735)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
        at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3349)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3345)
        at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2734)
        at com.vesoft.nebula.exchange.processor.VerticesProcessor.process(VerticesProcessor.scala:179)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:153)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:128)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:128)
        at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)
Caused by: java.io.NotSerializableException: com.google.common.net.HostAndPort
Serialization stack:
        - object not serializable (class: com.google.common.net.HostAndPort, value: 192.168.1.118:9559)
        - writeObject data (class: scala.collection.immutable.List$SerializationProxy)
        - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@7e86dfd0)
        - writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
        - object (class scala.collection.immutable.$colon$colon, List(192.168.1.118:9559))
        - field (class: com.vesoft.nebula.exchange.processor.VerticesProcessor$$anonfun$process$1, name: address$1, type: class scala.collection.immutable.List)
        - object (class com.vesoft.nebula.exchange.processor.VerticesProcessor$$anonfun$process$1, <function1>)
        - field (class: org.apache.spark.sql.execution.MapPartitionsExec, name: func, type: interface scala.Function1)
        - object (class org.apache.spark.sql.execution.MapPartitionsExec, MapPartitions <function1>, obj#32: scala.Tuple2
+- DeserializeToObject createexternalrow(_c0#10.toString, _c1#11.toString, _c2#12.toString, StructField(_c0,StringType,true), StructField(_c1,StringType,true), StructField(_c2,StringType,true)), obj#31: org.apache.spark.sql.Row
   +- Exchange RoundRobinPartitioning(32)
      +- *(1) FileScan csv [_c0#10,_c1#11,_c2#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://192.168.1.115:8020/data/nebula/vertex_player.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c0:string,_c1:string,_c2:string>
)
        - field (class: org.apache.spark.sql.execution.InputAdapter, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
        - object (class org.apache.spark.sql.execution.InputAdapter, MapPartitions <function1>, obj#32: scala.Tuple2
+- DeserializeToObject createexternalrow(_c0#10.toString, _c1#11.toString, _c2#12.toString, StructField(_c0,StringType,true), StructField(_c1,StringType,true), StructField(_c2,StringType,true)), obj#31: org.apache.spark.sql.Row
   +- Exchange RoundRobinPartitioning(32)
      +- *(1) FileScan csv [_c0#10,_c1#11,_c2#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://192.168.1.115:8020/data/nebula/vertex_player.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c0:string,_c1:string,_c2:string>
)
        - field (class: org.apache.spark.sql.execution.SerializeFromObjectExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
        - object (class org.apache.spark.sql.execution.SerializeFromObjectExec, SerializeFromObject [input[0, scala.Tuple2, true]._1 AS value#27 AS _1#33, input[0, scala.Tuple2, true]._2 AS value#28 AS _2#34]
+- MapPartitions <function1>, obj#32: scala.Tuple2
   +- DeserializeToObject createexternalrow(_c0#10.toString, _c1#11.toString, _c2#12.toString, StructField(_c0,StringType,true), StructField(_c1,StringType,true), StructField(_c2,StringType,true)), obj#31: org.apache.spark.sql.Row
      +- Exchange RoundRobinPartitioning(32)
         +- *(1) FileScan csv [_c0#10,_c1#11,_c2#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://192.168.1.115:8020/data/nebula/vertex_player.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c0:string,_c1:string,_c2:string>
)
        - field (class: org.apache.spark.sql.execution.ProjectExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
        - object (class org.apache.spark.sql.execution.ProjectExec, Project [_1#33 AS key#35, _2#34 AS value#36]
+- SerializeFromObject [input[0, scala.Tuple2, true]._1 AS value#27 AS _1#33, input[0, scala.Tuple2, true]._2 AS value#28 AS _2#34]
   +- MapPartitions <function1>, obj#32: scala.Tuple2
      +- DeserializeToObject createexternalrow(_c0#10.toString, _c1#11.toString, _c2#12.toString, StructField(_c0,StringType,true), StructField(_c1,StringType,true), StructField(_c2,StringType,true)), obj#31: org.apache.spark.sql.Row
         +- Exchange RoundRobinPartitioning(32)
            +- *(1) FileScan csv [_c0#10,_c1#11,_c2#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://192.168.1.115:8020/data/nebula/vertex_player.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c0:string,_c1:string,_c2:string>
)
        - field (class: org.apache.spark.sql.execution.SortExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
        - object (class org.apache.spark.sql.execution.SortExec, Sort [key#35 ASC NULLS FIRST], false, 0
+- Project [_1#33 AS key#35, _2#34 AS value#36]
   +- SerializeFromObject [input[0, scala.Tuple2, true]._1 AS value#27 AS _1#33, input[0, scala.Tuple2, true]._2 AS value#28 AS _2#34]
      +- MapPartitions <function1>, obj#32: scala.Tuple2
         +- DeserializeToObject createexternalrow(_c0#10.toString, _c1#11.toString, _c2#12.toString, StructField(_c0,StringType,true), StructField(_c1,StringType,true), StructField(_c2,StringType,true)), obj#31: org.apache.spark.sql.Row
            +- Exchange RoundRobinPartitioning(32)
               +- *(1) FileScan csv [_c0#10,_c1#11,_c2#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://192.168.1.115:8020/data/nebula/vertex_player.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_c0:string,_c1:string,_c2:string>
)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 4)
        - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
        - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
        ... 47 more

         ApplicationMaster host: CDH02
         ApplicationMaster RPC port: 37239
         queue: root.users.root
         start time: 1631513073997
         final status: FAILED
         tracking URL: http://CDH01:8088/proxy/application_1628391375886_0163/
         user: root
21/09/13 14:05:01 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2346)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:618)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3037)
        at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3035)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2735)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
        at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
        at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3349)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3345)
        at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2734)
        at com.vesoft.nebula.exchange.processor.VerticesProcessor.process(VerticesProcessor.scala:179)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:153)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:128)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:128)
        at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)
Caused by: java.io.NotSerializableException: com.google.common.net.HostAndPort

你 Exchange 的版本号是多少,然后看你的报错信息,:thinking: 好像是配置有问题,贴一下 storage、graph、meta 怎么配

你好,nebula-graphd.conf 配置如下:

--local_conf=true
########## basics ##########
# Whether to run as a daemon process
--daemonize=true
# The file to host the process id
--pid_file=pids/nebula-graphd.pid
# Whether to enable optimizer
--enable_optimizer=true
# The default charset when a space is created
--default_charset=utf8
# The defaule collate when a space is created
--default_collate=utf8_bin
# Whether to use the configuration obtained from the configuration file
--local_config=true

########## logging ##########
# The directory to host logging files
--log_dir=logs
# Log level, 0, 1, 2, 3 for INFO, WARNING, ERROR, FATAL respectively
--minloglevel=0
# Verbose log level, 1, 2, 3, 4, the higher of the level, the more verbose of the logging
--v=0
# Maximum seconds to buffer the log messages
--logbufsecs=0
# Whether to redirect stdout and stderr to separate output files
--redirect_stdout=true
# Destination filename of stdout and stderr, which will also reside in log_dir.
--stdout_log_file=graphd-stdout.log
--stderr_log_file=graphd-stderr.log
# Copy log messages at or above this level to stderr in addition to logfiles. The numbers of severity levels INFO, WARNING, ERROR, and FATAL are 0, 1, 2, and 3, respectively.
--stderrthreshold=2

########## query ##########
# Whether to treat partial success as an error.
# This flag is only used for Read-only access, and Modify access always treats partial success as an error.
--accept_partial_success=false

########## networking ##########
# Comma separated Meta Server Addresses
--meta_server_addrs=192.168.1.118:9559
# Local IP used to identify the nebula-graphd process.
# Change it to an address other than loopback if the service is distributed or
# will be accessed remotely.
--local_ip=192.168.1.118
# Network device to listen on
--listen_netdev=any
# Port to listen on
--port=9669
# To turn on SO_REUSEPORT or not
--reuse_port=false
# Backlog of the listen socket, adjust this together with net.core.somaxconn
--listen_backlog=1024
# Seconds before the idle connections are closed, 0 for never closed
--client_idle_timeout_secs=0
# Seconds before the idle sessions are expired, 0 for no expiration
--session_idle_timeout_secs=0
# The number of threads to accept incoming connections
--num_accept_threads=1
# The number of networking IO threads, 0 for # of CPU cores
--num_netio_threads=0
# The number of threads to execute user queries, 0 for # of CPU cores
--num_worker_threads=0
# HTTP service ip
--ws_ip=192.168.1.118
# HTTP service port
--ws_http_port=19669
# HTTP2 service port
--ws_h2_port=19670
# storage client timeout
--storage_client_timeout_ms=60000
# Port to listen on Meta with HTTP protocol, it corresponds to ws_http_port in metad's configuration file
--ws_meta_http_port=19559

########## authentication ##########
# Enable authorization
--enable_authorize=false
# User login authentication type, password for nebula authentication, ldap for ldap authentication, cloud for cloud authentication
--auth_type=password

########## memory ##########
# System memory high watermark ratio
--system_memory_high_watermark_ratio=0.8
  • nebula-metad.conf 配置如下:
--local_conf=true
########## basics ##########
# Whether to run as a daemon process
--daemonize=true
# The file to host the process id
--pid_file=pids/nebula-metad.pid

########## logging ##########
# The directory to host logging files
--log_dir=logs
# Log level, 0, 1, 2, 3 for INFO, WARNING, ERROR, FATAL respectively
--minloglevel=0
# Verbose log level, 1, 2, 3, 4, the higher of the level, the more verbose of the logging
--v=0
# Maximum seconds to buffer the log messages
--logbufsecs=0
# Whether to redirect stdout and stderr to separate output files
--redirect_stdout=true
# Destination filename of stdout and stderr, which will also reside in log_dir.
--stdout_log_file=metad-stdout.log
--stderr_log_file=metad-stderr.log
# Copy log messages at or above this level to stderr in addition to logfiles. The numbers of severity levels INFO, WARNING, ERROR, and FATAL are 0, 1, 2, and 3, respectively.
--stderrthreshold=2

########## networking ##########
# Comma separated Meta Server addresses
--meta_server_addrs=192.168.1.118:9559
# Local IP used to identify the nebula-metad process.
# Change it to an address other than loopback if the service is distributed or
# will be accessed remotely.
--local_ip=192.168.1.118
# Meta daemon listening port
--port=9559
# HTTP service ip
--ws_ip=192.168.1.118
# HTTP service port
--ws_http_port=19559
# HTTP2 service port
--ws_h2_port=19560
# Port to listen on Storage with HTTP protocol, it corresponds to ws_http_port in storage's configuration file
--ws_storage_http_port=19779

########## storage ##########
# Root data path, here should be only single path for metad
--data_path=data/meta

########## Misc #########
# The default number of parts when a space is created
--default_parts_num=100
# The default replica factor when a space is created
--default_replica_factor=1

--heartbeat_interval_secs=10

  • nebula-storaged.conf 配置如下:
--local_conf=true
########## basics ##########
# Whether to run as a daemon process
--daemonize=true
# The file to host the process id
--pid_file=pids/nebula-storaged.pid
# Whether to use the configuration obtained from the configuration file
--local_config=true

########## logging ##########
# The directory to host logging files
--log_dir=logs
# Log level, 0, 1, 2, 3 for INFO, WARNING, ERROR, FATAL respectively
--minloglevel=0
# Verbose log level, 1, 2, 3, 4, the higher of the level, the more verbose of the logging
--v=0
# Maximum seconds to buffer the log messages
--logbufsecs=0
# Whether to redirect stdout and stderr to separate output files
--redirect_stdout=true
# Destination filename of stdout and stderr, which will also reside in log_dir.
--stdout_log_file=storaged-stdout.log
--stderr_log_file=storaged-stderr.log
# Copy log messages at or above this level to stderr in addition to logfiles. The numbers of severity levels INFO, WARNING, ERROR, and FATAL are 0, 1, 2, and 3, respectively.
--stderrthreshold=2

########## networking ##########
# Comma separated Meta server addresses
--meta_server_addrs=192.168.1.118:9559
# Local IP used to identify the nebula-storaged process.
# Change it to an address other than loopback if the service is distributed or
# will be accessed remotely.
--local_ip=192.168.1.118
# Storage daemon listening port
--port=9779
# HTTP service ip
--ws_ip=192.168.1.118
# HTTP service port
--ws_http_port=19779
# HTTP2 service port
--ws_h2_port=19780
# heartbeat with meta service
--heartbeat_interval_secs=10

######### Raft #########
# Raft election timeout
--raft_heartbeat_interval_secs=30
# RPC timeout for raft client (ms)
--raft_rpc_timeout_ms=500
## recycle Raft WAL
--wal_ttl=14400

########## Disk ##########
# Root data path. Split by comma. e.g. --data_path=/disk1/path1/,/disk2/path2/
# One path per Rocksdb instance.
--data_path=data/storage

# Minimum reserved bytes of each data path
--minimum_reserved_bytes=268435456

# The default reserved bytes for one batch operation
--rocksdb_batch_size=4096
# The default block cache size used in BlockBasedTable.
# The unit is MB.
--rocksdb_block_cache=4
# The type of storage engine, `rocksdb', `memory', etc.
--engine_type=rocksdb

# Compression algorithm, options: no,snappy,lz4,lz4hc,zlib,bzip2,zstd
# For the sake of binary compatibility, the default value is snappy.
# Recommend to use:
#   * lz4 to gain more CPU performance, with the same compression ratio with snappy
#   * zstd to occupy less disk space
#   * lz4hc for the read-heavy write-light scenario
--rocksdb_compression=lz4

# Set different compressions for different levels
# For example, if --rocksdb_compression is snappy,
# "no:no:lz4:lz4::zstd" is identical to "no:no:lz4:lz4:snappy:zstd:snappy"
# In order to disable compression for level 0/1, set it to "no:no"
--rocksdb_compression_per_level=

# Whether or not to enable rocksdb's statistics, disabled by default
--enable_rocksdb_statistics=false

# Statslevel used by rocksdb to collection statistics, optional values are
#   * kExceptHistogramOrTimers, disable timer stats, and skip histogram stats
#   * kExceptTimers, Skip timer stats
#   * kExceptDetailedTimers, Collect all stats except time inside mutex lock AND time spent on compression.
#   * kExceptTimeForMutex, Collect all stats except the counters requiring to get time inside the mutex lock.
#   * kAll, Collect all stats
--rocksdb_stats_level=kExceptHistogramOrTimers

# Whether or not to enable rocksdb's prefix bloom filter, disabled by default.
--enable_rocksdb_prefix_filtering=false

############## rocksdb Options ##############
# rocksdb DBOptions in json, each name and value of option is a string, given as "option_name":"option_value" separated by comma
--rocksdb_db_options={}
# rocksdb ColumnFamilyOptions in json, each name and value of option is string, given as "option_name":"option_value" separated by comma
--rocksdb_column_family_options={"write_buffer_size":"67108864","max_write_buffer_number":"4","max_bytes_for_level_base":"268435456"}
# rocksdb BlockBasedTableOptions in json, each name and value of option is string, given as "option_name":"option_value" separated by comma
--rocksdb_block_based_table_options={"block_size":"8192"}

谢谢,麻烦一起帮忙看下

关于配置文件,readme 和 文档中都有说明如果在yarn cluster模式下提交,你可以去看一下。

guava包中的HostAndPort无法序列化,你看下guava依赖包有没有自动上传到applicationId对应的hdfs路径上。 HostAndPort这个工具类是实现了序列化的。

1 个赞

这个guava 包我理解是Nebula-exchange 打在包里面的呀,我提交spark-submit 的时候看了启动脚本中upload到任务对应的hdfs 目录下了,这块如下;

21/09/13 15:25:08 INFO yarn.Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
21/09/13 15:25:08 INFO yarn.Client: Setting up container launch context for our AM
21/09/13 15:25:08 INFO yarn.Client: Setting up the launch environment for our AM container
21/09/13 15:25:08 INFO yarn.Client: Preparing resources for our AM container
21/09/13 15:25:08 INFO yarn.Client: Uploading resource file:/data/softs/nebula-sst/nebula-exchange-2.5.0.jar -> hdfs://CDH01:8020/user/root/.sparkStaging/application_1628391375886_0164/nebula-exchange-2.5.0.jar
21/09/13 15:25:08 INFO yarn.Client: Uploading resource file:/data/nebula-datas/player-csv-sst.conf -> hdfs://CDH01:8020/user/root/.sparkStaging/application_1628391375886_0164/player-csv-sst.conf
21/09/13 15:25:09 INFO yarn.Client: Uploading resource file:/tmp/spark-3b9df7de-8b38-473c-96c3-875fc56f21b3/__spark_conf__9093740899145509425.zip -> hdfs://CDH01:8020/user/root/.sparkStaging/application_1628391375886_0164/__spark_conf__.zip
21/09/13 15:25:09 INFO spark.SecurityManager: Changing view acls to: root
21/09/13 15:25:09 INFO spark.SecurityManager: Changing modify acls to: root
21/09/13 15:25:09 INFO spark.SecurityManager: Changing view acls groups to:
21/09/13 15:25:09 INFO spark.SecurityManager: Changing modify acls groups to:
21/09/13 15:25:09 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/09/13 15:25:09 INFO conf.HiveConf: Found configuration file file:/etc/hive/conf.cloudera.hive/hive-site.xml

关于配置文件,也是参照了readme文档中的提交方式的,就是没有发现我的提交命令参数上有什么异常,对比了readme和文档说明,也去了spark官网上看,我没发现我的spark-submit上参数有哪里不对…

另外,补充一下我们的是CDH 6.3.2 版本,spark 的版本是2.4.0 hadoop 的版本是3.0.0

readme中的提交命令还有这两个参数,你加上试下

guava包是打在exchange包中的,我们在yarn cluster模式下测试没有出现过你这个序列化的问题,看下你的环境中spark/jars目录下是否有guava包存在。

你好,我们CDH 6.3.2版本中spark jars 下的guava包 默认版本是上面2个,请问你们spark yarn-cluster下的 这2个包是什么版本?我手动下载通过–conf 上传

guava-14.0.1.jar

你好,实践下来,发现如下问题:
按照您这边提供的参数,我重新执行了一下还是报一样的错误,跟上文一模一样,提交脚本如下:

# !/bin/bash
set -x
spark-submit \
--master yarn \
--files /data/nebula-datas/player-csv-sst.conf \
--deploy-mode cluster \
--conf spark.driver.extraClassPath=./guava-14.0.1.jara \
--conf spark.executor.extraClassPath=./guava-14.0.1.jar \
--class com.vesoft.nebula.exchange.Exchange   /data/softs/nebula-sst/nebula-exchange-2.5.0.jar  \
-c /data/nebula-datas/player-csv-sst.conf

继续尝试,在CDH 的spark jar目录下把grava-11.0.2.jar,这个其实是指向/opt/cloudera/parcels/CDH/jars/grava-11.0.2.jar 的软链接, 在/opt/cloudera/parcels/CDH/jars/grava-11.0.2.jar 目录下,多个版本,如下图

删除该软链接之后,重新创建一个guava-14.0.1.jar的软链接,命令如下

cd /opt/cloudera/parcels/CDH/lib/spark/jars
rm -rf guava-11.0.2.jar
ln -s ../../../jars/guava-14.0.1.jar  guava-14.0.1.jar

执行spark-submit 顺利生成了sst文件,如下:
image

在这里,去掉了下面2个参数,执行也正常,说明,手动添加这个jar包不起作用,这样的话是不是必须要修改CDH集群中guava的依赖包了?

--conf spark.driver.extraClassPath=./guava-14.0.1.jara \
--conf spark.executor.extraClassPath=./guava-14.0.1.jar \

guava包其实不用上传到hdfs集群的

  1. 前面提到的 在spark-submit 命令中增加–conf 是解决你的 配置文件不存在的问题的,和guava包没关系。
  2. guava包要放在 spark集群的 SPARH_HOME/jars目录下。 是否必须要修改CDH集群的guava依赖包? 目前看是的,如果你有其他应用依赖了其他版本的guava,可以写个脚本自动进行jar包切换。

请问是这样么? 为啥我试了一下还是不行呢
image

zhek这块我现在的提交命令如下:

# !/bin/bash

set -x

spark-submit \
--master yarn \
--conf spark.driver.extraClassPath=./player-csv-sst.conf \
--conf spark.executor.extraClassPath=./player-csv-sst.conf \
--deploy-mode cluster \
--class com.vesoft.nebula.exchange.Exchange   /data/softs/nebula-sst/nebula-exchange-2.5.0.jar  \
-c player-csv-sst.conf

现在saprk-submit 脚本和我的配置文件都在一个目录下,也加了–conf ,为啥还是找不到配置文件?报错如下:

下面回复请忽略,已解决配置文件,但是还是想梳理一下,因为exchange只是一个spark应用,按理来说,应用在运行的时候需要什么我程序打在自己包里,没问题,现在感觉是因为CDH6.3.2 中spark 2.4.0 所依赖的包有多个,但是CDH 通过软链接只指定了最低版本,而我们的spark应用需要的是高版本guave的,造成了低版本的spark环境不兼容高版本guave应用的情况,是这样么?

是的,如果你环境中没这么多guava的话, 是不用做其他事情的。

大佬,现在SST文件也生成了,我在hdfs 上也查看到了,现在在终端中下载sst问价报failed,错误如下:
image

  • 生成的sst文件如下:

  • 我通过hdfs dfs -get 下载也是正常下载到本地的,另外说明一下: 我们的CDH 和我部署的Nebula 服务不是在同一台机器,但是这台机器我配置了hadoop环境,如下:

  • 请问在终端中报错,没有提示哪里具体报错,这块怎么排查,日志在哪里能看到?

要每台storage实例所在的机器都安装hadoop且配置hadoop_home。 还有你的nebula有按照文档增加端口配置么 导入SST文件数据 - Nebula Graph Database 手册

其实 我更建议你手动下载sst 到nebula_home/data/storage/nebula/space_id/download目录下,比安装hadoop要快 :joy:

但是手动下载nebula_home/data/storage/nebula/space_id/download目录下,我有以下疑问:

    1. 是否必须下载sst文件到我们的Nebula 集群是否所有节点?比如我5台Nebula集群,那我必须下载到这5台服务器的nebula_home/data/storage/nebula/space_id/download目录下 么?
    1. 我们的Nebula集群和CDH 集群在不同的机器上,下载的话,这块咋做稍微好点呢?
      是通过脚本中写hdfs dfs -get /sst file:///sst (CDH 集群中本地目录)
      然后copy到nebula_home/data/storage/nebula/space_id/download?
  1. 是的
  2. 是你说的这种方式, 现在CDH集群get,然后scp到nebula集群

补充上一个问题,我看了下部署的 nebula_home/data/storage/nebula/space_id/download,
还有个问题就是, 我怎么知道我的这个space_id是啥呢?这个怎么查看space_id?

https://docs.nebula-graph.com.cn/2.5.0/3.ngql-guide/9.space-statements/4.describe-space/

浙ICP备20010487号