pyspark使用Connector出错

nebula版本:3.1.0
使用的安装文件:nebula-graph-3.1.0.el7.x86_64.rpm

spark版本:2.4.0
pyspark版本:2.4.0
scala版本:2.11.12

connector版本:3.0.0
下载地址:https://repo1.maven.org/maven2/com/vesoft/nebula-spark-connector/3.0.0/nebula-spark-connector-3.0.0.jar

pyspark代码

from pyspark.sql import SparkSession


if __name__ == "__main__":
    spark = SparkSession.builder.appName("es_group").getOrCreate()
    df = spark.read.format("com.vesoft.nebula.connector.NebulaDataSource")\
        .option("type", "vertex")\
        .option("spaceName", "investor")\
        .option("label", "company")\
        .option("returnCols", "name")\
        .option("metaAddress", "192.168.7.155:9559")\
        .option("partitionNumber", 1)\
        .load()
    df.show()
    spark.stop()

报错信息

22/08/11 14:50:25 INFO connector.NebulaDataSource: create reader
22/08/11 14:50:25 INFO connector.NebulaDataSource: options {spacename=investor, paths=[], returncols=name, metaaddress=192.168.7.155:9559, label=company, type=vertex, partitionnumber=1}
22/08/11 14:50:25 INFO reader.NebulaDataSourceVertexReader: dataset's schema: StructType(StructField(_vertexId,StringType,false), StructField(name,StringType,true))
22/08/11 14:50:26 INFO connector.NebulaDataSource: create reader
22/08/11 14:50:26 INFO connector.NebulaDataSource: options {spacename=investor, paths=[], returncols=name, metaaddress=192.168.7.155:9559, label=company, type=vertex, partitionnumber=1}
22/08/11 14:50:26 INFO v2.DataSourceV2Strategy: 
Pushing operators to class com.vesoft.nebula.connector.NebulaDataSource
Pushed Filters: 
Post-Scan Filters: 
Output: _vertexId#0, name#1
         
22/08/11 14:50:26 ERROR util.Utils: uncaught error in thread spark-listener-group-eventLog, stopping SparkContext
java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonGenerator.writeTypePrefix(Lcom/fasterxml/jackson/core/type/WritableTypeId;)Lcom/fasterxml/jackson/core/type/WritableTypeId;
	at com.fasterxml.jackson.databind.jsontype.impl.TypeSerializerBase.writeTypePrefix(TypeSerializerBase.java:46)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:600)
	at com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
	at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3905)
	at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:3219)
	at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:106)
	at org.apache.spark.scheduler.EventLoggingListener.org$apache$spark$scheduler$EventLoggingListener$$logEvent(EventLoggingListener.scala:148)
	at org.apache.spark.scheduler.EventLoggingListener.onOtherEvent(EventLoggingListener.scala:283)
	at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:82)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1350)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
22/08/11 14:50:26 ERROR util.Utils: throw uncaught fatal error in thread spark-listener-group-eventLog
java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonGenerator.writeTypePrefix(Lcom/fasterxml/jackson/core/type/WritableTypeId;)Lcom/fasterxml/jackson/core/type/WritableTypeId;
	at com.fasterxml.jackson.databind.jsontype.impl.TypeSerializerBase.writeTypePrefix(TypeSerializerBase.java:46)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:600)
	at com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
	at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3905)
	at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:3219)
	at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:106)
	at org.apache.spark.scheduler.EventLoggingListener.org$apache$spark$scheduler$EventLoggingListener$$logEvent(EventLoggingListener.scala:148)
	at org.apache.spark.scheduler.EventLoggingListener.onOtherEvent(EventLoggingListener.scala:283)
	at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:82)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1350)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
Exception in thread "spark-listener-group-eventLog" java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonGenerator.writeTypePrefix(Lcom/fasterxml/jackson/core/type/WritableTypeId;)Lcom/fasterxml/jackson/core/type/WritableTypeId;
	at com.fasterxml.jackson.databind.jsontype.impl.TypeSerializerBase.writeTypePrefix(TypeSerializerBase.java:46)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:600)
	at com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
	at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3905)
	at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:3219)
	at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:106)
	at org.apache.spark.scheduler.EventLoggingListener.org$apache$spark$scheduler$EventLoggingListener$$logEvent(EventLoggingListener.scala:148)
	at org.apache.spark.scheduler.EventLoggingListener.onOtherEvent(EventLoggingListener.scala:283)
	at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:82)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1350)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
22/08/11 14:50:26 INFO server.AbstractConnector: Stopped Spark@57fadbb6{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
22/08/11 14:50:26 INFO ui.SparkUI: Stopped Spark web UI at http://st-police-cdh000:4040
22/08/11 14:50:26 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread
22/08/11 14:50:26 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
22/08/11 14:50:26 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
22/08/11 14:50:26 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
22/08/11 14:50:26 INFO cluster.YarnClientSchedulerBackend: Stopped
22/08/11 14:50:27 INFO codegen.CodeGenerator: Code generated in 140.281051 ms
Traceback (most recent call last):
  File "/home/test/nebula_read_spark.py", line 14, in <module>
    df.show()
  File "/opt/cloudera/parcels/CDH-6.2.1-1.cdh6.2.1.p0.1425774/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 378, in show
  File "/opt/cloudera/parcels/CDH-6.2.1-1.cdh6.2.1.p0.1425774/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/cloudera/parcels/CDH-6.2.1-1.cdh6.2.1.p0.1425774/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/opt/cloudera/parcels/CDH-6.2.1-1.cdh6.2.1.p0.1425774/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o72.showString.
: java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonGenerator.writeStartObject(Ljava/lang/Object;)V
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:151)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
	at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3905)
	at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:3219)
	at org.apache.spark.rdd.RDDOperationScope.toJson(RDDOperationScope.scala:52)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:142)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

22/08/11 14:50:27 INFO storage.DiskBlockManager: Shutdown hook called
22/08/11 14:50:27 INFO util.ShutdownHookManager: Shutdown hook called

我不太懂 java/scala/spark 哈,不过似乎是这个包有多个版本,选了不兼容的哪一个?

应该不是版本的问题吧,我照着下边的案例来是可以成功的,但是通过submit提交任务就有出现问题中的情况

嗯嗯,这个例子是我研究、写的,我还没试过 submit 的方式呢,只在 pyspark shell 弄过,这两者的 runtime 有什么区别造成的?

cc @nicole

看下你的 spark-submit所使用的spark环境中的jackson包是不是和connector中的冲突了

1 个赞

@nicole 建议,前边我引用的这段似乎就是 fasterxml.jackson 在 submit spark runtime 存在多个版本,submit 和 本地 pyspark shell 中的版本情况不同,submit 时候有多个版本,选择了那个不兼容的?

我主要是做python开发的,这块不是很熟
不过我的spark jar包里是这样的,我应该替换哪些包呢

1 个赞

jackson-databind 换成2.6.7或者2.4.2试下

1 个赞

欢迎弄好了依赖冲突之后发文章分享、来贡献加 submit 的 例子/FAQ 呢:+1:t2:

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。