提问参考模版:
-
nebula 版本:
-
部署方式:单机
-
安装方式:RPM
-
是否为线上版本:N
-
硬件信息
- 磁盘( 30G SSD)
- CPU: 4核、内存信息: 8G
-
问题的具体描述
我们需要做实时入图性能测试, 模拟生成反欺诈模型的数据发送给kafka,然后flink 从kafka中消费数据,然后flink 再将数据写入Nebula, 目前遇到的问题是,flink 消费kafka数据正常,也将flink转换成Nebula 的Row对象也正常, 但是就是调用 流的addSink(nebulaEdgeSinkFunction) 方法时,kafka接受的数据流只接受第一条数据,而且重复消费第一条,一直在重复,删除topic 后,创建topic 重新执行,flink 消费kafka数据只消费第一条而且卡在这里,当去除掉addSink(nebulaEdgeSinkFunction) 方法时,flink 消费kafka的数据流正常,下面我结合代码以及结果详细说明 -
创建kafka 相关topic
bin/kafka-topics.sh --create --topic ods_nature_person_vertex --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh --create --topic ods_money_transfer_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh --create --topic ods_public_telephone_number_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh --create --topic ods_public_device_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh --create --topic ods_public_home_address_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh --create --topic ods_public_home_telephone_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh --create --topic ods_public_ip_address_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh --create --topic ods_public_work_address_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh --create --topic ods_public_work_telephone_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
- 模拟发送数据给kafka ods_money_transfer_nature_person 这个topic 启动 flink 程序,查看是否正确消费到数据以及是否能入图
- 去掉 moneyTransferDs.addSink(nebulaEdgeSinkFunction); 行后,flink 消费kafka流就正常了,如下:
其中sink 代码是从github 拉取 源码中的例子,抽取出来的,主要如下:
public static NebulaSinkFunction getNebulaEdgeSinkFunction( String graphSpace,
String edgeName,
List<String> fields,
List<Integer> positions) {
log.debug("开始获取NebulaEdgeSinkFunction");
NebulaClientOptions nebulaClientOptions = getClientOptions();
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider =
new NebulaMetaConnectionProvider(nebulaClientOptions);
ExecutionOptions executionOptions = new EdgeExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace(graphSpace)
.setEdge(edgeName)
.setSrcIndex(0)
.setDstIndex(1)
.setRankIndex(4)
.setFields(fields)
.setPositions(positions)
.setBatch(2)
.builder();
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
return new NebulaSinkFunction(outPutFormat);
}
- 删除topic 重新创建,flink 流消费就会一直卡在第一条数据地方
bin/kafka-topics.sh --delete --topic ods_money_transfer_nature_person --bootstrap-server node01:9092
bin/kafka-topics.sh --create --topic ods_money_transfer_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
- 启动flink 程序以及发送数据
经过反复测试,都是这样的结果,实在想不明白,把topic 删除重新创建后,加入addSink 代码 flink 就会消费第一条数据,而且卡在这里,如上图,请帮忙一起分析一下其中问题,有需要我把我整个工程,以及图模型提供出来,帮助一起分析下,目前已排除kafka 环境本身问题,我在另一个kafka环境测试过,是一样的结果,请大佬帮忙看下
PS:点入图没有问题,已入图,目前再测试入第一条边资金转账就一直卡在这里