- nebula 版本:3.2.1
- 部署方式:单机
- 安装方式:RPM
- 是否上生产环境:N
- 硬件信息
- 磁盘( 推荐使用 SSD)
- CPU 8核、内存16GB
- 问题的具体描述
- 相关的 meta / storage / graph info 日志信息(尽量使用文本形式方便检索)
java版本8或11
maven版本3.8.8
flink 版本1.11.6
nebula flink connector 版本3.3.0或者3.0.0
flink-root-client-localhost.localdomain.log输出如下
2023-04-06 03:46:41,094 INFO org.apache.flink.client.cli.CliFrontend [] - --------------------------------------------------------------------------------
2023-04-06 03:46:41,099 INFO org.apache.flink.client.cli.CliFrontend [] - Starting Command Line Client (Version: 1.11.6, Scala: 2.11, Rev:826a237, Date:2021-12-14T23:38:16+01:00)
2023-04-06 03:46:41,099 INFO org.apache.flink.client.cli.CliFrontend [] - OS current user: root
2023-04-06 03:46:41,100 INFO org.apache.flink.client.cli.CliFrontend [] - Current Hadoop/Kerberos user: <no hadoop dependency found>
2023-04-06 03:46:41,100 INFO org.apache.flink.client.cli.CliFrontend [] - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 11/11.0.18+9-LTS-195
2023-04-06 03:46:41,100 INFO org.apache.flink.client.cli.CliFrontend [] - Maximum heap size: 4000 MiBytes
2023-04-06 03:46:41,100 INFO org.apache.flink.client.cli.CliFrontend [] - JAVA_HOME: /home/java/jdk-11.0.18
2023-04-06 03:46:41,101 INFO org.apache.flink.client.cli.CliFrontend [] - No Hadoop Dependency available
2023-04-06 03:46:41,101 INFO org.apache.flink.client.cli.CliFrontend [] - JVM Options:
2023-04-06 03:46:41,101 INFO org.apache.flink.client.cli.CliFrontend [] - -Dlog.file=/home/flink/flink-1.11.6/log/flink-root-client-localhost.localdomain.log
2023-04-06 03:46:41,101 INFO org.apache.flink.client.cli.CliFrontend [] - -Dlog4j.configuration=file:/home/flink/flink-1.11.6/conf/log4j-cli.properties
2023-04-06 03:46:41,102 INFO org.apache.flink.client.cli.CliFrontend [] - -Dlog4j.configurationFile=file:/home/flink/flink-1.11.6/conf/log4j-cli.properties
2023-04-06 03:46:41,102 INFO org.apache.flink.client.cli.CliFrontend [] - -Dlogback.configurationFile=file:/home/flink/flink-1.11.6/conf/logback.xml
2023-04-06 03:46:41,102 INFO org.apache.flink.client.cli.CliFrontend [] - Program Arguments:
2023-04-06 03:46:41,105 INFO org.apache.flink.client.cli.CliFrontend [] - run
2023-04-06 03:46:41,105 INFO org.apache.flink.client.cli.CliFrontend [] - /home/demo-flink-1.0-SNAPSHOT.jar
2023-04-06 03:46:41,105 INFO org.apache.flink.client.cli.CliFrontend [] - Classpath: /home/flink/flink-1.11.6/lib/flink-csv-1.11.6.jar:/home/flink/flink-1.11.6/lib/flink-json-1.11.6.jar:/home/flink/flink-1.11.6/lib/flink-shaded-zookeeper-3.4.14.jar:/home/flink/flink-1.11.6/lib/flink-table_2.11-1.11.6.jar:/home/flink/flink-1.11.6/lib/flink-table-blink_2.11-1.11.6.jar:/home/flink/flink-1.11.6/lib/log4j-1.2-api-2.16.0.jar:/home/flink/flink-1.11.6/lib/log4j-api-2.16.0.jar:/home/flink/flink-1.11.6/lib/log4j-core-2.16.0.jar:/home/flink/flink-1.11.6/lib/log4j-slf4j-impl-2.16.0.jar:/home/flink/flink-1.11.6/lib/flink-dist_2.11-1.11.6.jar:::
2023-04-06 03:46:41,105 INFO org.apache.flink.client.cli.CliFrontend [] - --------------------------------------------------------------------------------
2023-04-06 03:46:41,111 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, localhost
2023-04-06 03:46:41,112 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123
2023-04-06 03:46:41,112 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 6600m
2023-04-06 03:46:41,112 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 6728m
2023-04-06 03:46:41,113 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2023-04-06 03:46:41,113 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 1
2023-04-06 03:46:41,113 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region
2023-04-06 03:46:41,153 INFO org.apache.flink.client.cli.CliFrontend [] - Loading FallbackYarnSessionCli
2023-04-06 03:46:41,238 INFO org.apache.flink.core.fs.FileSystem [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2023-04-06 03:46:41,342 INFO org.apache.flink.runtime.security.modules.HadoopModuleFactory [] - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2023-04-06 03:46:41,380 INFO org.apache.flink.runtime.security.modules.JaasModule [] - Jaas file will be created as /tmp/jaas-10805020939677187048.conf.
2023-04-06 03:46:41,395 INFO org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2023-04-06 03:46:41,397 INFO org.apache.flink.client.cli.CliFrontend [] - Running 'run' command.
2023-04-06 03:46:41,416 INFO org.apache.flink.client.cli.CliFrontend [] - Building program from JAR file
2023-04-06 03:46:41,447 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: false)
2023-04-06 03:46:41,935 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class java.util.Arrays$ArrayList is not public so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2023-04-06 03:46:42,660 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2023-04-06 03:46:42,951 INFO org.apache.flink.configuration.Configuration [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2023-04-06 03:46:50,724 INFO org.apache.flink.configuration.Configuration [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2023-04-06 03:46:55,508 ERROR org.example.FlinkConnectorExample [] - error when write Nebula Graph,
java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 172cf2c9203855ee49e49e39c0bad35c)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1651) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.example.FlinkConnectorExample.sinkVertexData(FlinkConnectorExample.java:152) ~[classes!/:1.0-SNAPSHOT]
at org.example.FlinkConnectorExample.main(FlinkConnectorExample.java:50) ~[classes!/:1.0-SNAPSHOT]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49) ~[demo-flink-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:108) ~[demo-flink-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:58) ~[demo-flink-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:88) ~[demo-flink-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:319) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:147) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:765) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:233) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:992) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1070) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) [flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1070) [flink-dist_2.11-1.11.6.jar:1.11.6]
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 172cf2c9203855ee49e49e39c0bad35c)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:122) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:313) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:119) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:313) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:206) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:189) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:639) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:396) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source) ~[?:?]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:130) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:487) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:566) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at java.lang.Thread.run(Thread.java:750) ~[?:?]
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.userFunction of type org.apache.flink.api.common.functions.Function in instance of org.apache.flink.streaming.api.operators.StreamMap
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[?:?]
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2436) ~[?:?]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) ~[?:?]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2430) ~[?:?]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2354) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2212) ~[?:?]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) ~[?:?]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:502) ~[?:?]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:460) ~[?:?]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:130) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:487) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:566) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:752) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569) ~[flink-dist_2.11-1.11.6.jar:1.11.6]
at java.lang.Thread.run(Thread.java:750) ~[?:?]
pom.xml配置如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>demo-flink</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>nebula-flink-connector</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.4.5</version>
<configuration>
<mainClass>org.example.FlinkConnectorExample</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
jar包内容如下,用的nebula提供的example
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
package org.example;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.connector.nebula.connection.NebulaClientOptions;
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat;
import org.apache.flink.connector.nebula.sink.NebulaSinkFunction;
import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.statement.VertexExecutionOptions;
import org.apache.flink.connector.nebula.utils.SSLSighType;
import org.apache.flink.connector.nebula.utils.WriteModeEnum;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* make sure your nebula graph has create Space, space schema:
*
* <p>"CREATE SPACE `flinkSink` (partition_num = 100, replica_factor = 3, charset = utf8,
* collate = utf8_bin, vid_type = INT64, atomic_edge = false) ON default"
*
* <p>"USE `flinkSink`"
*
* <p>"CREATE TAG IF NOT EXISTS person(col1 string, col2 fixed_string(8), col3 int8, col4 int16,
* col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double,
* col12 float, col13 time, col14 geography);"
*
* <p>"CREATE EDGE IF NOT EXISTS friend(col1 string, col2 fixed_string(8), col3 int8, col4 int16,
* col5 int32, col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool, col11 double,
* col12 float, col13 time, col14 geography);"
*/
public class FlinkConnectorExample {
private static final Logger LOG = LoggerFactory.getLogger(FlinkConnectorExample.class);
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
DataStream<List<String>> playerSource = constructVertexSourceData(env);
sinkVertexData(env, playerSource);
updateVertexData(env, playerSource);
deleteVertexData(env, playerSource);
//
DataStream<List<String>> friendSource = constructEdgeSourceData(env);
sinkEdgeData(env, friendSource);
updateEdgeData(env, friendSource);
deleteEdgeData(env, friendSource);
System.out.println("1111111111111111111111");
}
/**
* construct flink data source
*/
public static DataStream<List<String>> constructVertexSourceData(
StreamExecutionEnvironment env) {
List<List<String>> player = new ArrayList<>();
List<String> fields1 = Arrays.asList("61", "aba", "abcdefgh", "1", "1111", "22222",
"6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
"11:12:12", "POINT(1 3)");
List<String> fields2 = Arrays.asList("62", "aba", "abcdefgh", "1", "1111", "22222",
"6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
"11:12:12", "POINT(1 3)");
List<String> fields3 = Arrays.asList("63", "aba", "abcdefgh", "1", "1111", "22222",
"6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
"11:12:12", "POINT(1 3)");
List<String> fields4 = Arrays.asList("64", "aba", "abcdefgh", "1", "1111", "22222",
"6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
"11:12:12", "LINESTRING(1 3,2 4)");
List<String> fields5 = Arrays.asList("65", "aba", "abcdefgh", "1", "1111", "22222",
"6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
"11:12:12", "LINESTRING(1 3,2 4)");
List<String> fields6 = Arrays.asList("66", "aba", "abcdefgh", "1", "1111", "22222",
"6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
"11:12:12", "LINESTRING(1 3,2 4)");
List<String> fields7 = Arrays.asList("67", "李四", "abcdefgh", "1", "1111", "22222",
"6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "true", "1.2", "1.0",
"11:12:12", "polygon((0 1,1 2,2 3,0 1))");
List<String> fields8 = Arrays.asList("68", "aba", "张三", "1", "1111", "22222", "6412233",
"2019-01-01", "2019-01-01T12:12:12", "435463424", "true", "1.2", "1.0", "11:12:12",
"POLYGON((0 1,1 2,2 3,0 1))");
player.add(fields1);
player.add(fields2);
player.add(fields3);
player.add(fields4);
player.add(fields5);
player.add(fields6);
player.add(fields7);
player.add(fields8);
DataStream<List<String>> playerSource = env.fromCollection(player);
return playerSource;
}
private static NebulaClientOptions getClientOptions() {
// not enable ssl for graph
NebulaClientOptions nebulaClientOptions =
new NebulaClientOptions.NebulaClientOptionsBuilder()
.setGraphAddress("192.168.20.10:9669")
.setMetaAddress("192.168.20.10:9559")
.build();
return nebulaClientOptions;
}
/**
* sink Nebula Graph with default INSERT mode
*/
public static void sinkVertexData(StreamExecutionEnvironment env,
DataStream<List<String>> playerSource) {
NebulaClientOptions nebulaClientOptions = getClientOptions(); //连接池
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions); // graph
NebulaMetaConnectionProvider metaConnectionProvider =
new NebulaMetaConnectionProvider(nebulaClientOptions); // meta
System.out.println("22222222222222222");
ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setTag("person")
.setIdIndex(0)
.setFields(Arrays.asList("col1", "col2", "col3", "col4", "col5", "col6", "col7",
"col8",
"col9", "col10", "col11", "col12", "col13", "col14"))
.setPositions(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
.setBatch(2)
.builder();
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
DataStream<Row> dataStream = playerSource.map(row -> {
org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
for (int i = 0; i < row.size(); i++) {
record.setField(i, row.get(i));
}
return record;
});
dataStream.addSink(nebulaSinkFunction);
try {
System.out.println("33333333333333333");
env.execute("Write Nebula");
System.out.println("4444444444444444");
} catch (Exception e) {
LOG.error("error when write Nebula Graph, ", e);
System.exit(-1);
}
}
/**
* sink Nebula Graph with UPDATE mode
*/
public static void updateVertexData(StreamExecutionEnvironment env,
DataStream<List<String>> playerSource) {
NebulaClientOptions nebulaClientOptions = getClientOptions();
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider =
new NebulaMetaConnectionProvider(nebulaClientOptions);
ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setTag("person")
.setIdIndex(0)
.setFields(Arrays.asList("col1", "col2"))
.setPositions(Arrays.asList(1, 2))
.setWriteMode(WriteModeEnum.UPDATE)
.setBatch(2)
.builder();
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
DataStream<Row> dataStream = playerSource.map(row -> {
org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
for (int i = 0; i < row.size(); i++) {
record.setField(i, row.get(i));
}
return record;
});
dataStream.addSink(nebulaSinkFunction);
try {
env.execute("Update Nebula Vertex");
} catch (Exception e) {
LOG.error("error when update Nebula Graph Vertex, ", e);
System.exit(-1);
}
}
/**
* sink Nebula Graph with DELETE mode
*/
public static void deleteVertexData(StreamExecutionEnvironment env,
DataStream<List<String>> playerSource) {
NebulaClientOptions nebulaClientOptions = getClientOptions();
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider =
new NebulaMetaConnectionProvider(nebulaClientOptions);
ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setTag("person")
.setIdIndex(0)
.setFields(Arrays.asList("col1", "col2"))
.setPositions(Arrays.asList(1, 2))
.setWriteMode(WriteModeEnum.DELETE)
.setBatch(2)
.builder();
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
DataStream<Row> dataStream = playerSource.map(row -> {
org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
for (int i = 0; i < row.size(); i++) {
record.setField(i, row.get(i));
}
return record;
});
dataStream.addSink(nebulaSinkFunction);
try {
env.execute("Update Nebula Vertex");
} catch (Exception e) {
LOG.error("error when update Nebula Graph Vertex, ", e);
System.exit(-1);
}
}
/**
* construct flink data source
*/
public static DataStream<List<String>> constructEdgeSourceData(StreamExecutionEnvironment env) {
List<List<String>> friend = new ArrayList<>();
List<String> fields1 = Arrays.asList("61", "62", "aba", "abcdefgh", "1", "1111", "22222",
"6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
"11:12:12", "POINT(1 3)");
List<String> fields2 = Arrays.asList("62", "63", "aba", "abcdefgh", "1", "1111", "22222",
"6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
"11:12:12", "POINT(1 3)");
List<String> fields3 = Arrays.asList("63", "64", "aba", "abcdefgh", "1", "1111", "22222",
"6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
"11:12:12", "POINT(1 3)");
List<String> fields4 = Arrays.asList("64", "65", "aba", "abcdefgh", "1", "1111", "22222",
"6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
"11:12:12", "LINESTRING(1 3,2 4)");
List<String> fields5 = Arrays.asList("65", "66", "aba", "abcdefgh", "1", "1111", "22222",
"6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
"11:12:12", "LINESTRING(1 3,2 4)");
List<String> fields6 = Arrays.asList("66", "67", "aba", "abcdefgh", "1", "1111", "22222",
"6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "false", "1.2", "1.0",
"11:12:12", "LINESTRING(1 3,2 4)");
List<String> fields7 = Arrays.asList("67", "68", "李四", "abcdefgh", "1", "1111", "22222",
"6412233", "2019-01-01", "2019-01-01T12:12:12", "435463424", "true", "1.2", "1.0",
"11:12:12", "polygon((0 1,1 2,2 3,0 1))");
List<String> fields8 = Arrays.asList("68", "61", "aba", "张三", "1", "1111", "22222",
"6412233",
"2019-01-01", "2019-01-01T12:12:12", "435463424", "true", "1.2", "1.0", "11:12:12",
"POLYGON((0 1,1 2,2 3,0 1))");
friend.add(fields1);
friend.add(fields2);
friend.add(fields3);
friend.add(fields4);
friend.add(fields5);
friend.add(fields6);
friend.add(fields7);
friend.add(fields8);
DataStream<List<String>> playerSource = env.fromCollection(friend);
return playerSource;
}
/**
* sink Nebula Graph
*/
public static void sinkEdgeData(StreamExecutionEnvironment env,
DataStream<List<String>> playerSource) {
NebulaClientOptions nebulaClientOptions = getClientOptions();
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider =
new NebulaMetaConnectionProvider(nebulaClientOptions);
ExecutionOptions executionOptions = new EdgeExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setEdge("friend")
.setSrcIndex(0)
.setDstIndex(1)
.setRankIndex(4)
.setFields(Arrays.asList("col1", "col2", "col3", "col4", "col5", "col6", "col7",
"col8", "col9", "col10", "col11", "col12", "col13", "col14"))
.setPositions(Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15))
.setBatch(2)
.builder();
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
DataStream<Row> dataStream = playerSource.map(row -> {
org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
for (int i = 0; i < row.size(); i++) {
record.setField(i, row.get(i));
}
return record;
});
dataStream.addSink(nebulaSinkFunction);
try {
env.execute("Write Nebula");
} catch (Exception e) {
LOG.error("error when write Nebula Graph, ", e);
System.exit(-1);
}
}
/**
* sink Nebula Graph with UPDATE mode
*/
public static void updateEdgeData(StreamExecutionEnvironment env,
DataStream<List<String>> playerSource) {
NebulaClientOptions nebulaClientOptions = getClientOptions();
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider =
new NebulaMetaConnectionProvider(nebulaClientOptions);
ExecutionOptions executionOptions = new EdgeExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setEdge("friend")
.setSrcIndex(0)
.setDstIndex(1)
.setRankIndex(4)
.setFields(Arrays.asList("col1", "col2"))
.setPositions(Arrays.asList(2, 3))
.setWriteMode(WriteModeEnum.UPDATE)
.setBatch(2)
.builder();
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
DataStream<Row> dataStream = playerSource.map(row -> {
org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
for (int i = 0; i < row.size(); i++) {
record.setField(i, row.get(i));
}
return record;
});
dataStream.addSink(nebulaSinkFunction);
try {
env.execute("Update Nebula Vertex");
} catch (Exception e) {
LOG.error("error when update Nebula Graph Vertex, ", e);
System.exit(-1);
}
}
/**
* sink Nebula Graph with DELETE mode
*/
public static void deleteEdgeData(StreamExecutionEnvironment env,
DataStream<List<String>> playerSource) {
NebulaClientOptions nebulaClientOptions = getClientOptions();
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider =
new NebulaMetaConnectionProvider(nebulaClientOptions);
ExecutionOptions executionOptions = new EdgeExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setEdge("friend")
.setSrcIndex(0)
.setDstIndex(1)
.setRankIndex(4)
.setFields(Arrays.asList("col1", "col2"))
.setPositions(Arrays.asList(2, 3))
.setWriteMode(WriteModeEnum.DELETE)
.setBatch(2)
.builder();
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
DataStream<Row> dataStream = playerSource.map(row -> {
org.apache.flink.types.Row record = new org.apache.flink.types.Row(row.size());
for (int i = 0; i < row.size(); i++) {
record.setField(i, row.get(i));
}
return record;
});
dataStream.addSink(nebulaSinkFunction);
try {
env.execute("Update Nebula Vertex");
} catch (Exception e) {
LOG.error("error when update Nebula Graph Vertex, ", e);
System.exit(-1);
}
}
}
操作步骤,在windows上idea使用maven打jar包,上传centos7上使用flink run xxx.jar,提交任务后运行任务时报错,抛出userFunction强转Function异常。但是单独使用java -jar xxx.jar,则不会报错,可以成功写入nebula数据(java8和java11都可以)