flink run 运行nebula flink connector抛异常ClassCastException

  • nebula 版本:3.2.1
  • 部署方式:单机
  • 安装方式:RPM
  • 是否上生产环境:N
  • 硬件信息
    • 磁盘( 推荐使用 SSD)
    • CPU 8核、内存16GB
  • 问题的具体描述
  • 相关的 meta / storage / graph info 日志信息(尽量使用文本形式方便检索)
    java版本8或11
    maven版本3.8.8
    flink 版本1.11.6
    nebula flink connector 版本3.3.0或者3.0.0
    flink-root-client-localhost.localdomain.log输出如下
2023-04-06 03:46:41,094 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------
2023-04-06 03:46:41,099 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.11.6, Scala: 2.11, Rev:826a237, Date:2021-12-14T23:38:16+01:00)
2023-04-06 03:46:41,099 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  OS current user: root
2023-04-06 03:46:41,100 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2023-04-06 03:46:41,100 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 11/11.0.18+9-LTS-195
2023-04-06 03:46:41,100 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Maximum heap size: 4000 MiBytes
2023-04-06 03:46:41,100 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JAVA_HOME: /home/java/jdk-11.0.18
2023-04-06 03:46:41,101 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  No Hadoop Dependency available
2023-04-06 03:46:41,101 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM Options:
2023-04-06 03:46:41,101 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog.file=/home/flink/flink-1.11.6/log/flink-root-client-localhost.localdomain.log
2023-04-06 03:46:41,101 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configuration=file:/home/flink/flink-1.11.6/conf/log4j-cli.properties
2023-04-06 03:46:41,102 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configurationFile=file:/home/flink/flink-1.11.6/conf/log4j-cli.properties
2023-04-06 03:46:41,102 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlogback.configurationFile=file:/home/flink/flink-1.11.6/conf/logback.xml
2023-04-06 03:46:41,102 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Program Arguments:
2023-04-06 03:46:41,105 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     run
2023-04-06 03:46:41,105 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     /home/demo-flink-1.0-SNAPSHOT.jar
2023-04-06 03:46:41,105 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Classpath: /home/flink/flink-1.11.6/lib/flink-csv-1.11.6.jar:/home/flink/flink-1.11.6/lib/flink-json-1.11.6.jar:/home/flink/flink-1.11.6/lib/flink-shaded-zookeeper-3.4.14.jar:/home/flink/flink-1.11.6/lib/flink-table_2.11-1.11.6.jar:/home/flink/flink-1.11.6/lib/flink-table-blink_2.11-1.11.6.jar:/home/flink/flink-1.11.6/lib/log4j-1.2-api-2.16.0.jar:/home/flink/flink-1.11.6/lib/log4j-api-2.16.0.jar:/home/flink/flink-1.11.6/lib/log4j-core-2.16.0.jar:/home/flink/flink-1.11.6/lib/log4j-slf4j-impl-2.16.0.jar:/home/flink/flink-1.11.6/lib/flink-dist_2.11-1.11.6.jar:::
2023-04-06 03:46:41,105 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------
2023-04-06 03:46:41,111 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, localhost
2023-04-06 03:46:41,112 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123
2023-04-06 03:46:41,112 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.memory.process.size, 6600m
2023-04-06 03:46:41,112 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 6728m
2023-04-06 03:46:41,113 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2023-04-06 03:46:41,113 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: parallelism.default, 1
2023-04-06 03:46:41,113 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2023-04-06 03:46:41,153 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Loading FallbackYarnSessionCli
2023-04-06 03:46:41,238 INFO  org.apache.flink.core.fs.FileSystem                          [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2023-04-06 03:46:41,342 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory [] - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2023-04-06 03:46:41,380 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-10805020939677187048.conf.
2023-04-06 03:46:41,395 INFO  org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2023-04-06 03:46:41,397 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Running 'run' command.
2023-04-06 03:46:41,416 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Building program from JAR file
2023-04-06 03:46:41,447 INFO  org.apache.flink.client.ClientUtils                          [] - Starting program (detached: false)
2023-04-06 03:46:41,935 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class java.util.Arrays$ArrayList is not public so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2023-04-06 03:46:42,660 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2023-04-06 03:46:42,951 INFO  org.apache.flink.configuration.Configuration                 [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2023-04-06 03:46:50,724 INFO  org.apache.flink.configuration.Configuration                 [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2023-04-06 03:46:55,508 ERROR org.example.FlinkConnectorExample                            [] - error when write Nebula Graph, 
java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 172cf2c9203855ee49e49e39c0bad35c)
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
	at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1651) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.example.FlinkConnectorExample.sinkVertexData(FlinkConnectorExample.java:152) ~[classes!/:1.0-SNAPSHOT]
	at org.example.FlinkConnectorExample.main(FlinkConnectorExample.java:50) ~[classes!/:1.0-SNAPSHOT]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
	at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) ~[demo-flink-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:108) ~[demo-flink-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) ~[demo-flink-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
	at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88) ~[demo-flink-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:319) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:147) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:765) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:233) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:992) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1070) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) [flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1070) [flink-dist_2.11-1.11.6.jar:1.11.6]
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 172cf2c9203855ee49e49e39c0bad35c)
	at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:122) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
	at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
	at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:313) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:119) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
	at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
	at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:313) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:206) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:189) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:639) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:396) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source) ~[?:?]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:?]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:130) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:487) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:566) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at java.lang.Thread.run(Thread.java:750) ~[?:?]
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.userFunction of type org.apache.flink.api.common.functions.Function in instance of org.apache.flink.streaming.api.operators.StreamMap
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[?:?]
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ~[?:?]
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2436) ~[?:?]
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) ~[?:?]
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) ~[?:?]
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) ~[?:?]
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) ~[?:?]
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) ~[?:?]
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) ~[?:?]
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) ~[?:?]
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502) ~[?:?]
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460) ~[?:?]
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:130) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:487) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:566) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
	at java.lang.Thread.run(Thread.java:750) ~[?:?]

pom.xml配置如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>demo-flink</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.5</version>
    </parent>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>
        <dependency>
            <groupId>com.vesoft</groupId>
            <artifactId>nebula-flink-connector</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.4.5</version>
                <configuration>
                    <mainClass>org.example.FlinkConnectorExample</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

jar包内容如下,用的nebula提供的example

/* Copyright (c) 2020 vesoft inc. All rights reserved.
 *
 * This source code is licensed under Apache 2.0 License.
 */

package org.example;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.connector.nebula.connection.NebulaClientOptions;
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat;
import org.apache.flink.connector.nebula.sink.NebulaSinkFunction;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.statement.VertexExecutionOptions;
import org.apache.flink.connector.nebula.utils.SSLSighType;
import org.apache.flink.connector.nebula.utils.WriteModeEnum;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * make sure your nebula graph has create Space, space schema:
 *
 * <p>"CREATE SPACE `flinkSink` (partition_num = 100, replica_factor = 3, charset = utf8,
 * collate = utf8_bin, vid_type = INT64, atomic_edge = false) ON default"
 *
 * <p>"USE `flinkSink`"
 *
 * <p>"CREATE TAG IF NOT EXISTS person(col1 string, col2 fixed_string(8), col3 int8, col4 int16,
 * col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double,
 * col12 float, col13 time, col14 geography);"
 *
 * <p>"CREATE EDGE IF NOT EXISTS friend(col1 string, col2 fixed_string(8), col3 int8, col4 int16,
 * col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double,
 * col12 float, col13 time, col14 geography);"
 */
public class FlinkConnectorExample {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkConnectorExample.class);

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(5);
        DataStream<List<String>> playerSource = constructVertexSourceData(env);
        sinkVertexData(env, playerSource);
        updateVertexData(env, playerSource);
        deleteVertexData(env, playerSource);
//
        DataStream<List<String>> friendSource = constructEdgeSourceData(env);
        sinkEdgeData(env, friendSource);
        updateEdgeData(env, friendSource);
        deleteEdgeData(env, friendSource);
        System.out.println("1111111111111111111111");
    }


    /**
     * construct flink data source
     */
    public static DataStream<List<String>> constructVertexSourceData(
            StreamExecutionEnvironment env) {
        List<List<String>> player = new ArrayList<>();
        List<String> fields1 = Arrays.asList("61", "aba", "abcdefgh", "1", "1111", "22222",
                "6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
                "11:12:12", "POINT(1 3)");
        List<String> fields2 = Arrays.asList("62", "aba", "abcdefgh", "1", "1111", "22222",
                "6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
                "11:12:12", "POINT(1 3)");
        List<String> fields3 = Arrays.asList("63", "aba", "abcdefgh", "1", "1111", "22222",
                "6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
                "11:12:12", "POINT(1 3)");
        List<String> fields4 = Arrays.asList("64", "aba", "abcdefgh", "1", "1111", "22222",
                "6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
                "11:12:12", "LINESTRING(1 3,2 4)");
        List<String> fields5 = Arrays.asList("65", "aba", "abcdefgh", "1", "1111", "22222",
                "6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
                "11:12:12", "LINESTRING(1 3,2 4)");
        List<String> fields6 = Arrays.asList("66", "aba", "abcdefgh", "1", "1111", "22222",
                "6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
                "11:12:12", "LINESTRING(1 3,2 4)");
        List<String> fields7 = Arrays.asList("67", "李四", "abcdefgh", "1", "1111", "22222",
                "6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "true", "1.2", "1.0",
                "11:12:12", "polygon((0 1,1 2,2 3,0 1))");
        List<String> fields8 = Arrays.asList("68", "aba", "张三", "1", "1111", "22222", "6412233",
                "2019-01-01", "2019-01-01T12:12:12", "435463424", "true", "1.2", "1.0", "11:12:12",
                "POLYGON((0 1,1 2,2 3,0 1))");
        player.add(fields1);
        player.add(fields2);
        player.add(fields3);
        player.add(fields4);
        player.add(fields5);
        player.add(fields6);
        player.add(fields7);
        player.add(fields8);
        DataStream<List<String>> playerSource = env.fromCollection(player);
        return playerSource;
    }

    private static NebulaClientOptions getClientOptions() {
        // not enable ssl for graph
        NebulaClientOptions nebulaClientOptions =
                new NebulaClientOptions.NebulaClientOptionsBuilder()
                        .setGraphAddress("192.168.20.10:9669")
                        .setMetaAddress("192.168.20.10:9559")
                        .build();

        return nebulaClientOptions;
    }

    /**
     * sink Nebula Graph with default INSERT mode
     */
    public static void sinkVertexData(StreamExecutionEnvironment env,
                                      DataStream<List<String>> playerSource) {
        NebulaClientOptions nebulaClientOptions = getClientOptions();  //连接池
        NebulaGraphConnectionProvider graphConnectionProvider =
                new NebulaGraphConnectionProvider(nebulaClientOptions);  // graph
        NebulaMetaConnectionProvider metaConnectionProvider =
                new NebulaMetaConnectionProvider(nebulaClientOptions);  // meta
        System.out.println("22222222222222222");
        ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSink")
                .setTag("person")
                .setIdIndex(0)
                .setFields(Arrays.asList("col1", "col2", "col3", "col4", "col5", "col6", "col7",
                        "col8",
                        "col9", "col10", "col11", "col12", "col13", "col14"))
                .setPositions(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
                .setBatch(2)
                .builder();

        NebulaBatchOutputFormat outPutFormat =
                new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
                        .setExecutionOptions(executionOptions);
        NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
        DataStream<Row> dataStream = playerSource.map(row -> {
            org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
            for (int i = 0; i < row.size(); i++) {
                record.setField(i, row.get(i));
            }
            return record;
        });

        dataStream.addSink(nebulaSinkFunction);
        try {
            System.out.println("33333333333333333");
            env.execute("Write Nebula");
            System.out.println("4444444444444444");
        } catch (Exception e) {
            LOG.error("error when write Nebula Graph, ", e);
            System.exit(-1);
        }
    }

    /**
     * sink Nebula Graph with UPDATE mode
     */
    public static void updateVertexData(StreamExecutionEnvironment env,
                                        DataStream<List<String>> playerSource) {
        NebulaClientOptions nebulaClientOptions = getClientOptions();
        NebulaGraphConnectionProvider graphConnectionProvider =
                new NebulaGraphConnectionProvider(nebulaClientOptions);
        NebulaMetaConnectionProvider metaConnectionProvider =
                new NebulaMetaConnectionProvider(nebulaClientOptions);

        ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSink")
                .setTag("person")
                .setIdIndex(0)
                .setFields(Arrays.asList("col1", "col2"))
                .setPositions(Arrays.asList(1, 2))
                .setWriteMode(WriteModeEnum.UPDATE)
                .setBatch(2)
                .builder();

        NebulaBatchOutputFormat outPutFormat =
                new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
                        .setExecutionOptions(executionOptions);
        NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
        DataStream<Row> dataStream = playerSource.map(row -> {
            org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
            for (int i = 0; i < row.size(); i++) {
                record.setField(i, row.get(i));
            }
            return record;
        });
        dataStream.addSink(nebulaSinkFunction);
        try {
            env.execute("Update Nebula Vertex");
        } catch (Exception e) {
            LOG.error("error when update Nebula Graph Vertex, ", e);
            System.exit(-1);
        }
    }

    /**
     * sink Nebula Graph with DELETE mode
     */
    public static void deleteVertexData(StreamExecutionEnvironment env,
                                        DataStream<List<String>> playerSource) {
        NebulaClientOptions nebulaClientOptions = getClientOptions();
        NebulaGraphConnectionProvider graphConnectionProvider =
                new NebulaGraphConnectionProvider(nebulaClientOptions);
        NebulaMetaConnectionProvider metaConnectionProvider =
                new NebulaMetaConnectionProvider(nebulaClientOptions);

        ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSink")
                .setTag("person")
                .setIdIndex(0)
                .setFields(Arrays.asList("col1", "col2"))
                .setPositions(Arrays.asList(1, 2))
                .setWriteMode(WriteModeEnum.DELETE)
                .setBatch(2)
                .builder();

        NebulaBatchOutputFormat outPutFormat =
                new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
                        .setExecutionOptions(executionOptions);
        NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
        DataStream<Row> dataStream = playerSource.map(row -> {
            org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
            for (int i = 0; i < row.size(); i++) {
                record.setField(i, row.get(i));
            }
            return record;
        });
        dataStream.addSink(nebulaSinkFunction);
        try {
            env.execute("Update Nebula Vertex");
        } catch (Exception e) {
            LOG.error("error when update Nebula Graph Vertex, ", e);
            System.exit(-1);
        }
    }


    /**
     * construct flink data source
     */
    public static DataStream<List<String>> constructEdgeSourceData(StreamExecutionEnvironment env) {
        List<List<String>> friend = new ArrayList<>();
        List<String> fields1 = Arrays.asList("61", "62", "aba", "abcdefgh", "1", "1111", "22222",
                "6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
                "11:12:12", "POINT(1 3)");
        List<String> fields2 = Arrays.asList("62", "63", "aba", "abcdefgh", "1", "1111", "22222",
                "6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
                "11:12:12", "POINT(1 3)");
        List<String> fields3 = Arrays.asList("63", "64", "aba", "abcdefgh", "1", "1111", "22222",
                "6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
                "11:12:12", "POINT(1 3)");
        List<String> fields4 = Arrays.asList("64", "65", "aba", "abcdefgh", "1", "1111", "22222",
                "6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
                "11:12:12", "LINESTRING(1 3,2 4)");
        List<String> fields5 = Arrays.asList("65", "66", "aba", "abcdefgh", "1", "1111", "22222",
                "6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
                "11:12:12", "LINESTRING(1 3,2 4)");
        List<String> fields6 = Arrays.asList("66", "67", "aba", "abcdefgh", "1", "1111", "22222",
                "6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
                "11:12:12", "LINESTRING(1 3,2 4)");
        List<String> fields7 = Arrays.asList("67", "68", "李四", "abcdefgh", "1", "1111", "22222",
                "6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "true", "1.2", "1.0",
                "11:12:12", "polygon((0 1,1 2,2 3,0 1))");
        List<String> fields8 = Arrays.asList("68", "61", "aba", "张三", "1", "1111", "22222",
                "6412233",
                "2019-01-01", "2019-01-01T12:12:12", "435463424", "true", "1.2", "1.0", "11:12:12",
                "POLYGON((0 1,1 2,2 3,0 1))");
        friend.add(fields1);
        friend.add(fields2);
        friend.add(fields3);
        friend.add(fields4);
        friend.add(fields5);
        friend.add(fields6);
        friend.add(fields7);
        friend.add(fields8);
        DataStream<List<String>> playerSource = env.fromCollection(friend);
        return playerSource;
    }

    /**
     * sink Nebula Graph
     */
    public static void sinkEdgeData(StreamExecutionEnvironment env,
                                    DataStream<List<String>> playerSource) {
        NebulaClientOptions nebulaClientOptions = getClientOptions();
        NebulaGraphConnectionProvider graphConnectionProvider =
                new NebulaGraphConnectionProvider(nebulaClientOptions);
        NebulaMetaConnectionProvider metaConnectionProvider =
                new NebulaMetaConnectionProvider(nebulaClientOptions);

        ExecutionOptions executionOptions = new EdgeExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSink")
                .setEdge("friend")
                .setSrcIndex(0)
                .setDstIndex(1)
                .setRankIndex(4)
                .setFields(Arrays.asList("col1", "col2", "col3", "col4", "col5", "col6", "col7",
                        "col8", "col9", "col10", "col11", "col12", "col13", "col14"))
                .setPositions(Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15))
                .setBatch(2)
                .builder();

        NebulaBatchOutputFormat outPutFormat =
                new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
                        .setExecutionOptions(executionOptions);
        NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
        DataStream<Row> dataStream = playerSource.map(row -> {
            org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
            for (int i = 0; i < row.size(); i++) {
                record.setField(i, row.get(i));
            }
            return record;
        });
        dataStream.addSink(nebulaSinkFunction);
        try {
            env.execute("Write Nebula");
        } catch (Exception e) {
            LOG.error("error when write Nebula Graph, ", e);
            System.exit(-1);
        }
    }

    /**
     * sink Nebula Graph with UPDATE mode
     */
    public static void updateEdgeData(StreamExecutionEnvironment env,
                                      DataStream<List<String>> playerSource) {
        NebulaClientOptions nebulaClientOptions = getClientOptions();
        NebulaGraphConnectionProvider graphConnectionProvider =
                new NebulaGraphConnectionProvider(nebulaClientOptions);
        NebulaMetaConnectionProvider metaConnectionProvider =
                new NebulaMetaConnectionProvider(nebulaClientOptions);

        ExecutionOptions executionOptions = new EdgeExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSink")
                .setEdge("friend")
                .setSrcIndex(0)
                .setDstIndex(1)
                .setRankIndex(4)
                .setFields(Arrays.asList("col1", "col2"))
                .setPositions(Arrays.asList(2, 3))
                .setWriteMode(WriteModeEnum.UPDATE)
                .setBatch(2)
                .builder();

        NebulaBatchOutputFormat outPutFormat =
                new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
                        .setExecutionOptions(executionOptions);
        NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
        DataStream<Row> dataStream = playerSource.map(row -> {
            org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
            for (int i = 0; i < row.size(); i++) {
                record.setField(i, row.get(i));
            }
            return record;
        });
        dataStream.addSink(nebulaSinkFunction);
        try {
            env.execute("Update Nebula Vertex");
        } catch (Exception e) {
            LOG.error("error when update Nebula Graph Vertex, ", e);
            System.exit(-1);
        }
    }


    /**
     * sink Nebula Graph with DELETE mode
     */
    public static void deleteEdgeData(StreamExecutionEnvironment env,
                                      DataStream<List<String>> playerSource) {
        NebulaClientOptions nebulaClientOptions = getClientOptions();
        NebulaGraphConnectionProvider graphConnectionProvider =
                new NebulaGraphConnectionProvider(nebulaClientOptions);
        NebulaMetaConnectionProvider metaConnectionProvider =
                new NebulaMetaConnectionProvider(nebulaClientOptions);

        ExecutionOptions executionOptions = new EdgeExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSink")
                .setEdge("friend")
                .setSrcIndex(0)
                .setDstIndex(1)
                .setRankIndex(4)
                .setFields(Arrays.asList("col1", "col2"))
                .setPositions(Arrays.asList(2, 3))
                .setWriteMode(WriteModeEnum.DELETE)
                .setBatch(2)
                .builder();

        NebulaBatchOutputFormat outPutFormat =
                new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
                        .setExecutionOptions(executionOptions);
        NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
        DataStream<Row> dataStream = playerSource.map(row -> {
            org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
            for (int i = 0; i < row.size(); i++) {
                record.setField(i, row.get(i));
            }
            return record;
        });
        dataStream.addSink(nebulaSinkFunction);
        try {
            env.execute("Update Nebula Vertex");
        } catch (Exception e) {
            LOG.error("error when update Nebula Graph Vertex, ", e);
            System.exit(-1);
        }
    }
}

操作步骤,在windows上idea使用maven打jar包,上传centos7上使用flink run xxx.jar,提交任务后运行任务时报错,抛出userFunction强转Function异常。但是单独使用java -jar xxx.jar,则不会报错,可以成功写入nebula数据(java8和java11都可以)

根据flink中的日志,参考下这个帖子 https://blog.csdn.net/lisi1129/article/details/101453563

如果上面那个配置解决不了的话,可以根据自己项目,检查一下flink依赖问题。我也遇到一样的报错的,后面发现是flink版本没对上

1 个赞