nebula-exchange读取kafka写入报错

nebula版本:3.8.0
spark版本:2.4.8
kafka版本:3.6.2 (开启了kerberos)
Hadoop版本:3.3.3

问题描述:NebulaGraph图数据库,我通过spark读取Kafka数据写入到NebulaGraph中,但是任务执行报错,是什么原因呢?

/opt/datasophon/spark24/bin/spark-submit \
--executor-memory 6G \
--driver-memory 4G \
--executor-cores 2 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.maxExecutors=80 \
--master yarn \
--deploy-mode client \
--queue root.ad_hoc \
--files /home/user_210796824/kafka_client_jaas.conf,/data1/user_210796824/usr_data_collect_daas.keytab,/data1/user_210796824/nebula/kafka_application3.conf \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \
--jars /data1/user_210796824/nebula/kafka-clients-3.6.2.jar,/data1/user_210796824/nebula/spark-sql-kafka-0-10_2.11-2.4.8.jar,/data1/user_210796824/nebula/nebula-exchange_spark_2.4-3.8.0.jar,/data1/user_210796824/nebula/spark-streaming-kafka-0-10_2.11-2.4.8.jar \
--class com.vesoft.nebula.exchange.Exchange   \
/data1/user_210796824/nebula/nebula-exchange_spark_2.4-3.8.0.jar \
-c kafka_application3.conf
2025-07-24 17:01:47,197 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2025-07-24 17:01:47,610 INFO config.Configs$: DataBase Config DataBaseConfigEntry:{graphAddress:List(10.252.131.10:9669, 10.252.131.11:9669, 10.252.131.12:9669), space:product_map, metaAddress:List(10.252.131.10:9559, 10.252.131.11:9559, 10.252.131.12:9559)}
2025-07-24 17:01:47,611 INFO config.Configs$: User Config UserConfigEntry{user:root, password:xxxxx}
2025-07-24 17:01:47,613 INFO config.Configs$: Connection Config Some(Config(SimpleConfigObject({"retry":3,"timeout":5000})))
2025-07-24 17:01:47,614 INFO config.Configs$: Execution Config ExecutionConfigEntry:{timeout:2147483647, retry:3}
2025-07-24 17:01:47,635 INFO config.Configs$: Source Config Kafka source server: wlcb-bdc-kafka-prd-10-252-128-123.belle.lan:9092,wlcb-bdc-kafka-prd-10-252-128-126.belle.lan:9092,wlcb-bdc-kafka-prd-10-252-128-127.belle.lan:9092 topic:nebula-exchange_brand startingOffsets:latest maxOffsetsPerTrigger:None
2025-07-24 17:01:47,636 INFO config.Configs$: Sink Config Kafka source server: wlcb-bdc-kafka-prd-10-252-128-123.belle.lan:9092,wlcb-bdc-kafka-prd-10-252-128-126.belle.lan:9092,wlcb-bdc-kafka-prd-10-252-128-127.belle.lan:9092 topic:nebula-exchange_brand startingOffsets:latest maxOffsetsPerTrigger:None
2025-07-24 17:01:47,637 INFO config.Configs$: name Brand  batch 10
2025-07-24 17:01:47,640 INFO config.Configs$: Tag Config: Tag name: Brand, source: Kafka source server: wlcb-bdc-kafka-prd-10-252-128-123.belle.lan:9092,wlcb-bdc-kafka-prd-10-252-128-126.belle.lan:9092,wlcb-bdc-kafka-prd-10-252-128-127.belle.lan:9092 topic:nebula-exchange_brand startingOffsets:latest maxOffsetsPerTrigger:None, sink: Nebula sink addresses: [10.252.131.10:9669, 10.252.131.11:9669, 10.252.131.12:9669], writeMode: insert, vertex field: brand_no, vertex policy: None, batch: 10, partition: 10, repartitionWithNebula: true, enableTagless: false, ignoreIndex: false, vertexUdf: None, filter: None.
2025-07-24 17:01:47,641 INFO exchange.Exchange$: >>>>> Config Configs(DataBaseConfigEntry:{graphAddress:List(10.252.131.10:9669, 10.252.131.11:9669, 10.252.131.12:9669), space:product_map, metaAddress:List(10.252.131.10:9559, 10.252.131.11:9559, 10.252.131.12:9559)},UserConfigEntry{user:root, password:xxxxx},ConnectionConfigEntry:{timeout:5000, retry:3},ExecutionConfigEntry:{timeout:2147483647, retry:3},ErrorConfigEntry:{errorPath:file:///tmp/errors, errorMaxSize:32},RateConfigEntry:{limit:1024, timeout:5000},SslConfigEntry:{enableGraph:false, enableMeta:false, signType:ca},,List(Tag name: Brand, source: Kafka source server: wlcb-bdc-kafka-prd-10-252-128-123.belle.lan:9092,wlcb-bdc-kafka-prd-10-252-128-126.belle.lan:9092,wlcb-bdc-kafka-prd-10-252-128-127.belle.lan:9092 topic:nebula-exchange_brand startingOffsets:latest maxOffsetsPerTrigger:None, sink: Nebula sink addresses: [10.252.131.10:9669, 10.252.131.11:9669, 10.252.131.12:9669], writeMode: insert, vertex field: brand_no, vertex policy: None, batch: 10, partition: 10, repartitionWithNebula: true, enableTagless: false, ignoreIndex: false, vertexUdf: None, filter: None.),List(),None)
2025-07-24 17:01:47,679 INFO spark.SparkContext: Running Spark version 2.4.8
2025-07-24 17:01:47,702 INFO spark.SparkContext: Submitted application: com.vesoft.nebula.exchange.Exchange
2025-07-24 17:01:47,824 INFO spark.SecurityManager: Changing view acls to: user_210796824,usr_data_collect_daas
2025-07-24 17:01:47,824 INFO spark.SecurityManager: Changing modify acls to: user_210796824,usr_data_collect_daas
2025-07-24 17:01:47,824 INFO spark.SecurityManager: Changing view acls groups to: 
2025-07-24 17:01:47,824 INFO spark.SecurityManager: Changing modify acls groups to: 
2025-07-24 17:01:47,824 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(user_210796824, usr_data_collect_daas); groups with view permissions: Set(); users  with modify permissions: Set(user_210796824, usr_data_collect_daas); groups with modify permissions: Set()
2025-07-24 17:01:48,092 INFO util.Utils: Successfully started service 'sparkDriver' on port 16365.
2025-07-24 17:01:48,113 INFO spark.SparkEnv: Registering MapOutputTracker
2025-07-24 17:01:48,128 INFO spark.SparkEnv: Registering BlockManagerMaster
2025-07-24 17:01:48,130 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
2025-07-24 17:01:48,131 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
2025-07-24 17:01:48,138 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-1fdd0bad-f642-44f3-93af-1be4d6a3cc41
2025-07-24 17:01:48,152 INFO memory.MemoryStore: MemoryStore started with capacity 2004.6 MB
2025-07-24 17:01:48,163 INFO spark.SparkEnv: Registering OutputCommitCoordinator
2025-07-24 17:01:48,223 INFO util.log: Logging initialized @1875ms to org.spark_project.jetty.util.log.Slf4jLog
2025-07-24 17:01:48,280 INFO server.Server: jetty-9.4.z-SNAPSHOT; built: unknown; git: unknown; jvm 1.8.0_312-b07
2025-07-24 17:01:48,303 INFO server.Server: Started @1956ms
2025-07-24 17:01:48,339 INFO server.AbstractConnector: Started ServerConnector@27f1bbe0{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
2025-07-24 17:01:48,340 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
2025-07-24 17:01:48,363 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5e3a39cd{/jobs,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,363 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@13cd7ea5{/jobs/json,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,364 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@102d92c4{/jobs/job,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,365 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@60723d6a{/jobs/job/json,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,365 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@18324f97{/stages,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,366 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@64f555e7{/stages/json,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,366 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@71178a52{/stages/stage,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,367 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@44afefd5{/stages/stage/json,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,368 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@9a7a808{/stages/pool,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,368 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@72209d93{/stages/pool/json,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,369 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2687f956{/storage,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,369 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1ded7b14{/storage/json,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,370 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@29be7749{/storage/rdd,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,371 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5f84abe8{/storage/rdd/json,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,371 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4650a407{/environment,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,372 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@30135202{/environment/json,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,373 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6a4d7f76{/executors,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,373 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@10ec523c{/executors/json,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,374 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@53dfacba{/executors/threadDump,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,374 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@79767781{/executors/threadDump/json,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,383 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@78411116{/static,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,384 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6075b2d3{/,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,385 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@33abde31{/api,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,386 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7a18e8d{/jobs/job/kill,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,387 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3028e50e{/stages/stage/kill,null,AVAILABLE,@Spark}
2025-07-24 17:01:48,388 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://wlcb-bdc-olap-prd-10-252-128-34-belle-lan:4040
2025-07-24 17:01:48,406 INFO spark.SparkContext: Added JAR file:/data1/user_210796824/nebula/nebula-exchange_spark_2.4-3.8.0.jar at spark://wlcb-bdc-olap-prd-10-252-128-34-belle-lan:16365/jars/nebula-exchange_spark_2.4-3.8.0.jar with timestamp 1753347708406
2025-07-24 17:01:48,457 INFO util.Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
2025-07-24 17:01:55,060 INFO yarn.Client: Requesting a new application from cluster with 129 NodeManagers
2025-07-24 17:01:55,076 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (25600 MB per container)
2025-07-24 17:01:55,077 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
2025-07-24 17:01:55,077 INFO yarn.Client: Setting up container launch context for our AM
2025-07-24 17:01:55,079 INFO yarn.Client: Setting up the launch environment for our AM container
2025-07-24 17:01:55,085 INFO yarn.Client: Preparing resources for our AM container
2025-07-24 17:01:55,128 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
2025-07-24 17:01:55,840 INFO yarn.Client: Uploading resource file:/tmp/spark-4d5fa2d1-920a-47ed-97dd-7740fa69804f/__spark_libs__3023537785368633875.zip -> hdfs://belleservice/user/usr_data_collect_daas/.sparkStaging/application_1751335818709_1092017/__spark_libs__3023537785368633875.zip
2025-07-24 17:01:57,042 INFO yarn.Client: Uploading resource file:/data1/user_210796824/nebula/kafka-clients-3.6.2.jar -> hdfs://belleservice/user/usr_data_collect_daas/.sparkStaging/application_1751335818709_1092017/kafka-clients-3.6.2.jar
2025-07-24 17:01:57,122 INFO yarn.Client: Uploading resource file:/data1/user_210796824/nebula/spark-sql-kafka-0-10_2.11-2.4.8.jar -> hdfs://belleservice/user/usr_data_collect_daas/.sparkStaging/application_1751335818709_1092017/spark-sql-kafka-0-10_2.11-2.4.8.jar
2025-07-24 17:01:57,592 INFO yarn.Client: Uploading resource file:/data1/user_210796824/nebula/nebula-exchange_spark_2.4-3.8.0.jar -> hdfs://belleservice/user/usr_data_collect_daas/.sparkStaging/application_1751335818709_1092017/nebula-exchange_spark_2.4-3.8.0.jar
2025-07-24 17:01:58,418 INFO yarn.Client: Uploading resource file:/data1/user_210796824/nebula/spark-streaming-kafka-0-10_2.11-2.4.8.jar -> hdfs://belleservice/user/usr_data_collect_daas/.sparkStaging/application_1751335818709_1092017/spark-streaming-kafka-0-10_2.11-2.4.8.jar
2025-07-24 17:01:58,886 INFO yarn.Client: Uploading resource file:/home/user_210796824/kafka_client_jaas.conf -> hdfs://belleservice/user/usr_data_collect_daas/.sparkStaging/application_1751335818709_1092017/kafka_client_jaas.conf
2025-07-24 17:01:59,352 INFO yarn.Client: Uploading resource file:/data1/user_210796824/usr_data_collect_daas.keytab -> hdfs://belleservice/user/usr_data_collect_daas/.sparkStaging/application_1751335818709_1092017/usr_data_collect_daas.keytab
2025-07-24 17:01:59,836 INFO yarn.Client: Uploading resource file:/data1/user_210796824/nebula/kafka_application3.conf -> hdfs://belleservice/user/usr_data_collect_daas/.sparkStaging/application_1751335818709_1092017/kafka_application3.conf
2025-07-24 17:02:00,417 INFO yarn.Client: Uploading resource file:/tmp/spark-4d5fa2d1-920a-47ed-97dd-7740fa69804f/__spark_conf__3783415543804579027.zip -> hdfs://belleservice/user/usr_data_collect_daas/.sparkStaging/application_1751335818709_1092017/__spark_conf__.zip
2025-07-24 17:02:00,913 INFO spark.SecurityManager: Changing view acls to: user_210796824,usr_data_collect_daas
2025-07-24 17:02:00,914 INFO spark.SecurityManager: Changing modify acls to: user_210796824,usr_data_collect_daas
2025-07-24 17:02:00,914 INFO spark.SecurityManager: Changing view acls groups to: 
2025-07-24 17:02:00,914 INFO spark.SecurityManager: Changing modify acls groups to: 
2025-07-24 17:02:00,914 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(user_210796824, usr_data_collect_daas); groups with view permissions: Set(); users  with modify permissions: Set(user_210796824, usr_data_collect_daas); groups with modify permissions: Set()
2025-07-24 17:02:02,979 INFO security.HadoopFSDelegationTokenProvider: getting token for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1690191162_1, ugi=usr_data_collect_daas@HADOOP.COM (auth:KERBEROS)]]
2025-07-24 17:02:02,991 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 56523297 for usr_data_collect_daas on ha-hdfs:belleservice
2025-07-24 17:02:03,106 WARN conf.HiveConf: HiveConf of name hive.server2.metrics.enabled does not exist
2025-07-24 17:02:03,106 WARN conf.HiveConf: HiveConf of name hive.driver.parallel.compilation does not exist
2025-07-24 17:02:03,106 WARN conf.HiveConf: HiveConf of name hive.server2.authentication.ldap.userDNPattern does not exist
2025-07-24 17:02:03,106 WARN conf.HiveConf: HiveConf of name hive.metastore.port does not exist
2025-07-24 17:02:03,106 WARN conf.HiveConf: HiveConf of name hive.service.metrics.codahale.reporter.classes does not exist
2025-07-24 17:02:04,099 INFO yarn.Client: Submitting application application_1751335818709_1092017 to ResourceManager
2025-07-24 17:02:04,122 INFO impl.YarnClientImpl: Submitted application application_1751335818709_1092017
2025-07-24 17:02:04,124 INFO cluster.SchedulerExtensionServices: Starting Yarn extension services with app application_1751335818709_1092017 and attemptId None
2025-07-24 17:02:05,132 INFO yarn.Client: Application report for application_1751335818709_1092017 (state: ACCEPTED)
2025-07-24 17:02:05,140 INFO yarn.Client: 
         client token: Token { kind: YARN_CLIENT_TOKEN, service:  }
         diagnostics: AM container is launched, waiting for AM container to Register with RM
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: root.ad_hoc
         start time: 1753347724107
         final status: UNDEFINED
         tracking URL: http://wlcb-bdc-manager-prd-10-252-128-39-belle-lan:8088/proxy/application_1751335818709_1092017/
         user: usr_data_collect_daas
2025-07-24 17:02:06,141 INFO yarn.Client: Application report for application_1751335818709_1092017 (state: ACCEPTED)
2025-07-24 17:02:07,143 INFO yarn.Client: Application report for application_1751335818709_1092017 (state: ACCEPTED)
2025-07-24 17:02:08,145 INFO yarn.Client: Application report for application_1751335818709_1092017 (state: ACCEPTED)
2025-07-24 17:02:09,146 INFO yarn.Client: Application report for application_1751335818709_1092017 (state: ACCEPTED)
2025-07-24 17:02:10,148 INFO yarn.Client: Application report for application_1751335818709_1092017 (state: ACCEPTED)
2025-07-24 17:02:11,150 INFO yarn.Client: Application report for application_1751335818709_1092017 (state: ACCEPTED)
2025-07-24 17:02:12,152 INFO yarn.Client: Application report for application_1751335818709_1092017 (state: ACCEPTED)
2025-07-24 17:02:13,154 INFO yarn.Client: Application report for application_1751335818709_1092017 (state: ACCEPTED)
2025-07-24 17:02:14,155 INFO yarn.Client: Application report for application_1751335818709_1092017 (state: ACCEPTED)
2025-07-24 17:02:15,157 INFO yarn.Client: Application report for application_1751335818709_1092017 (state: ACCEPTED)
2025-07-24 17:02:16,159 INFO yarn.Client: Application report for application_1751335818709_1092017 (state: RUNNING)
2025-07-24 17:02:16,159 INFO yarn.Client: 
         client token: Token { kind: YARN_CLIENT_TOKEN, service:  }
         diagnostics: N/A
         ApplicationMaster host: 10.252.128.4
         ApplicationMaster RPC port: -1
         queue: root.ad_hoc
         start time: 1753347724107
         final status: UNDEFINED
         tracking URL: http://wlcb-bdc-manager-prd-10-252-128-39-belle-lan:8088/proxy/application_1751335818709_1092017/
         user: usr_data_collect_daas
2025-07-24 17:02:16,163 INFO cluster.YarnClientSchedulerBackend: Application application_1751335818709_1092017 has started running.
2025-07-24 17:02:16,179 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 19639.
2025-07-24 17:02:16,179 INFO netty.NettyBlockTransferService: Server created on wlcb-bdc-olap-prd-10-252-128-34-belle-lan:19639
2025-07-24 17:02:16,182 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2025-07-24 17:02:16,221 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, wlcb-bdc-olap-prd-10-252-128-34-belle-lan, 19639, None)
2025-07-24 17:02:16,229 INFO storage.BlockManagerMasterEndpoint: Registering block manager wlcb-bdc-olap-prd-10-252-128-34-belle-lan:19639 with 2004.6 MB RAM, BlockManagerId(driver, wlcb-bdc-olap-prd-10-252-128-34-belle-lan, 19639, None)
2025-07-24 17:02:16,235 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, wlcb-bdc-olap-prd-10-252-128-34-belle-lan, 19639, None)
2025-07-24 17:02:16,235 INFO storage.BlockManager: external shuffle service port = 7337
2025-07-24 17:02:16,236 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, wlcb-bdc-olap-prd-10-252-128-34-belle-lan, 19639, None)
2025-07-24 17:02:16,364 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> wlcb-bdc-manager-prd-10-252-128-39-belle-lan,wlcb-bdc-manager-prd-10-252-128-40-belle-lan, PROXY_URI_BASES -> http://wlcb-bdc-manager-prd-10-252-128-39-belle-lan:8088/proxy/application_1751335818709_1092017,http://wlcb-bdc-manager-prd-10-252-128-40-belle-lan:8088/proxy/application_1751335818709_1092017, RM_HA_URLS -> wlcb-bdc-manager-prd-10-252-128-39-belle-lan:8088,wlcb-bdc-manager-prd-10-252-128-40-belle-lan:8088), /proxy/application_1751335818709_1092017
2025-07-24 17:02:16,370 INFO ui.JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /metrics/json.
2025-07-24 17:02:16,378 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@26c7b1c6{/metrics/json,null,AVAILABLE,@Spark}
2025-07-24 17:02:16,633 INFO scheduler.EventLoggingListener: Logging events to hdfs:/user/spark/spark3ApplicationHistory/application_1751335818709_1092017
2025-07-24 17:02:16,635 INFO util.Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
2025-07-24 17:02:16,643 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
2025-07-24 17:02:16,665 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
2025-07-24 17:02:16,688 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)
2025-07-24 17:02:16,809 INFO exchange.Exchange$: >>>>> Processing Tag Brand
2025-07-24 17:02:16,809 INFO exchange.Exchange$: >>>>> field keys: brand_no, brand_cname, brand_ename
2025-07-24 17:02:16,809 INFO exchange.Exchange$: >>>>> nebula keys: brand_no, brand_cname, brand_ename
2025-07-24 17:02:16,809 INFO exchange.Exchange$: >>>>> Loading from Kafka wlcb-bdc-kafka-prd-10-252-128-123.belle.lan:9092,wlcb-bdc-kafka-prd-10-252-128-126.belle.lan:9092,wlcb-bdc-kafka-prd-10-252-128-127.belle.lan:9092 and subscribe nebula-exchange_brand
2025-07-24 17:02:16,836 INFO internal.SharedState: loading hive config file: file:/opt/datasophon/spark24/conf/hive-site.xml
2025-07-24 17:02:16,853 INFO internal.SharedState: spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('/user/hive/warehouse').
2025-07-24 17:02:16,853 INFO internal.SharedState: Warehouse path is '/user/hive/warehouse'.
2025-07-24 17:02:16,861 INFO ui.JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL.
2025-07-24 17:02:16,862 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5edb4fe0{/SQL,null,AVAILABLE,@Spark}
2025-07-24 17:02:16,862 INFO ui.JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL/json.
2025-07-24 17:02:16,863 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2cefcf3c{/SQL/json,null,AVAILABLE,@Spark}
2025-07-24 17:02:16,863 INFO ui.JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL/execution.
2025-07-24 17:02:16,864 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@139051fa{/SQL/execution,null,AVAILABLE,@Spark}
2025-07-24 17:02:16,864 INFO ui.JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /SQL/execution/json.
2025-07-24 17:02:16,865 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2f54cc62{/SQL/execution/json,null,AVAILABLE,@Spark}
2025-07-24 17:02:16,866 INFO ui.JettyUtils: Adding filter org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter to /static/sql.
2025-07-24 17:02:16,867 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2235906{/static/sql,null,AVAILABLE,@Spark}
2025-07-24 17:02:17,297 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
        at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:52)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:63)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:61)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:67)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:67)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:73)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:69)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:78)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3364)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:751)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:710)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
        at com.vesoft.nebula.exchange.Exchange$.com$vesoft$nebula$exchange$Exchange$$filterDf(Exchange.scala:403)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:134)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:116)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
        at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2025-07-24 17:02:17,723 INFO spark.SparkContext: Invoking stop() from shutdown hook
2025-07-24 17:02:17,741 INFO server.AbstractConnector: Stopped Spark@27f1bbe0{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
2025-07-24 17:02:17,743 INFO ui.SparkUI: Stopped Spark web UI at http://wlcb-bdc-olap-prd-10-252-128-34-belle-lan:4040
2025-07-24 17:02:17,749 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread
2025-07-24 17:02:17,780 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
2025-07-24 17:02:17,788 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
2025-07-24 17:02:17,792 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
2025-07-24 17:02:17,795 INFO cluster.YarnClientSchedulerBackend: Stopped
2025-07-24 17:02:18,230 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
2025-07-24 17:02:18,249 INFO memory.MemoryStore: MemoryStore cleared
2025-07-24 17:02:18,250 INFO storage.BlockManager: BlockManager stopped
2025-07-24 17:02:18,260 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
2025-07-24 17:02:18,266 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
2025-07-24 17:02:18,279 INFO spark.SparkContext: Successfully stopped SparkContext
2025-07-24 17:02:18,279 INFO util.ShutdownHookManager: Shutdown hook called
2025-07-24 17:02:18,280 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-4d5fa2d1-920a-47ed-97dd-7740fa69804f
2025-07-24 17:02:18,285 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-c91fbeb7-bfb0-4f9f-a4e3-8d49db8ea2c9
2025-07-24 17:02:18,291 INFO util.ShutdownHookManager: Deleting directory /tmp/temporaryReader-461a49ca-ac33-4452-b8aa-f7acc1f198f3

kafka_application3.conf的配置文件如下:

{
  spark: {
    app: {
      name: NebulaGraph Exchange 3.6.0
    }
    driver: {
      cores: 1
      maxResultSize: 1G
    }
    cores: {
      max: 16
    }
  }

  nebula: {
    address:{
      graph:["10.252.131.10:9669","10.252.131.11:9669","10.252.131.12:9669"]
      meta:["10.252.131.10:9559","10.252.131.11:9559","10.252.131.12:9559"]
    }
    user: root
    pswd: nebula
    space: product_map
    connection: {
      timeout: 5000
      retry: 3
    }
    execution: {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors
    }
    rate: {
      limit: 1024
      timeout: 5000
    }
  }
  
tags: [
    {
      name: Brand
      type: {
        source: kafka
        sink: client
      }
      service: "wlcb-bdc-kafka-prd-10-252-128-123.belle.lan:9092"
      topic: nebula-exchange_brand
      securityProtocol: SASL_PLAINTEXT
      mechanism: GSSAPI
      kerberos: true
      kerberosServiceName: kafka
      fields: [brand_no, brand_cname, brand_ename]
      nebula.fields: [brand_no, brand_cname, brand_ename]
      vertex: {
        field: brand_no
        policy:hash
      }
      writeMode: INSERT
      batch: 10
      partition: 10
      interval.seconds: 10
      startingOffsets: earliest
      maxOffsetsPerTrigger:10000
    }
  ]
}

那位官方老师帮忙看下,谢谢