使用Flink-connector 消费kafka 数据不做任何计算然后实时入图只消费kafka第一条数据而且重复消费

提问参考模版:

  • nebula 版本:

  • 部署方式:单机

  • 安装方式:RPM

  • 是否为线上版本:N

  • 硬件信息

    • 磁盘( 30G SSD)
    • CPU: 4核、内存信息: 8G
  • 问题的具体描述
    我们需要做实时入图性能测试, 模拟生成反欺诈模型的数据发送给kafka,然后flink 从kafka中消费数据,然后flink 再将数据写入Nebula, 目前遇到的问题是,flink 消费kafka数据正常,也将flink转换成Nebula 的Row对象也正常, 但是就是调用 流的addSink(nebulaEdgeSinkFunction) 方法时,kafka接受的数据流只接受第一条数据,而且重复消费第一条,一直在重复,删除topic 后,创建topic 重新执行,flink 消费kafka数据只消费第一条而且卡在这里,当去除掉addSink(nebulaEdgeSinkFunction) 方法时,flink 消费kafka的数据流正常,下面我结合代码以及结果详细说明

  • 创建kafka 相关topic

bin/kafka-topics.sh  --create --topic ods_nature_person_vertex --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh  --create --topic ods_money_transfer_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh  --create --topic ods_public_telephone_number_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh  --create --topic ods_public_device_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh  --create --topic ods_public_home_address_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh  --create --topic ods_public_home_telephone_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh  --create --topic ods_public_ip_address_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh  --create --topic ods_public_work_address_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh  --create --topic ods_public_work_telephone_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1

  • 模拟发送数据给kafka ods_money_transfer_nature_person 这个topic 启动 flink 程序,查看是否正确消费到数据以及是否能入图

  • 去掉 moneyTransferDs.addSink(nebulaEdgeSinkFunction); 行后,flink 消费kafka流就正常了,如下:

其中sink 代码是从github 拉取 源码中的例子,抽取出来的,主要如下:

public static NebulaSinkFunction getNebulaEdgeSinkFunction( String graphSpace,
                                                  String edgeName,
                                                  List<String> fields,
                                                  List<Integer> positions) {
        log.debug("开始获取NebulaEdgeSinkFunction");
        NebulaClientOptions nebulaClientOptions = getClientOptions();
        NebulaGraphConnectionProvider graphConnectionProvider =
                new NebulaGraphConnectionProvider(nebulaClientOptions);
        NebulaMetaConnectionProvider metaConnectionProvider =
                new NebulaMetaConnectionProvider(nebulaClientOptions);
        ExecutionOptions executionOptions = new EdgeExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace(graphSpace)
                .setEdge(edgeName)
                .setSrcIndex(0)
                .setDstIndex(1)
                .setRankIndex(4)
                .setFields(fields)
                .setPositions(positions)
                .setBatch(2)
                .builder();
        NebulaBatchOutputFormat outPutFormat =
                new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
                        .setExecutionOptions(executionOptions);
        return new NebulaSinkFunction(outPutFormat);
    }
  • 删除topic 重新创建,flink 流消费就会一直卡在第一条数据地方
bin/kafka-topics.sh  --delete  --topic ods_money_transfer_nature_person --bootstrap-server node01:9092
bin/kafka-topics.sh  --create --topic ods_money_transfer_nature_person --bootstrap-server node01:9092 --replication-factor 1 --partitions 1

经过反复测试,都是这样的结果,实在想不明白,把topic 删除重新创建后,加入addSink 代码 flink 就会消费第一条数据,而且卡在这里,如上图,请帮忙一起分析一下其中问题,有需要我把我整个工程,以及图模型提供出来,帮助一起分析下,目前已排除kafka 环境本身问题,我在另一个kafka环境测试过,是一样的结果,请大佬帮忙看下
PS:点入图没有问题,已入图,目前再测试入第一条边资金转账就一直卡在这里

1 个赞

用flink进行点入图和边入图的方式有什么区别么,addSink时指定点或者边的处理逻辑都是一样的,只是转换成的nebula的insert语句不同啊。

是的,就很奇怪,而且代码完全是拉源码中的实例代码对着写的,就改了space等信息

我这两天反复实验了几次,还是一样的结果,只要加了sinkEdge方法就卡住,重新在其他节点上部署了kafka,测试下来还是一样的问题,能否我这边提供完整的图模型,项目代码,你们这边把工程拉一下直接本地测试一下是否会出现同样情况?

您这边可以做下这样的测试:
flink 消费边数据, 但将kafka中边数据作为点写入nebula图中。 或者 flink消费点数据,将kafka中的点数据作为边写入nebula图中。
看下数据是否会卡住

1 个赞

已解决,问题在于flink 数据流在sinkEdge方法的时候,设置了setRankIndex(4),其实我的边只有一个列,所以造成flink 流sink 不进去,从而offset 提交失败,造成不断重试,我是这么理解的,但为什么没有报数组越界的错误呢,有点奇怪

1 个赞

如果问题解决了,你可以勾选自己的回复为解决方案哈。然后剩下的为什么没有报错,等 Nicole 到时候回复你哈。

sinkEdge内部在获取rank列时是通过flink Row 的fields数组来获取的,如果你的一个row中没有5个field的话是会报数组越界的。 没有报错的话可以debug看下你们flink读取kafka数据后一个row里面有多少列(可能有数据的是一列,其他的均为空)。


image

2 个赞

此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。