nebula-flink-connector中使用datastream api无法和flink-cdc的source一起使用

  • nebula 版本:3.8.0
  • nebula-flink-connector版本:3.8.1
  • 部署方式:docker-compose-lite
  • 安装方式: Docker
  • 问题的具体描述:在nebula-flink-connector中使用NebulaSinkFunction时在构建NebulaVertexBatchOutputFormat时候首先需要构建VertexExecutionOptions对象,在构建VertexExecutionOptions对象时要先设置WriteMode,这里的WriteMode包含INSERT、DELETE、UPDATE,但是当source使用FlinkCDC时,这个时候VertexExecutionOptions的WriteMode就无法提前设置了,只能在获取到每一条Row类型的数据时候才能确定WriteMode,所以NebulaSinkFunction是无法支持FlinkCDC的source的,不过以FlinkSQL的方式是支持的。
public static NebulaSinkFunction<Row> getNebulaVertexBatchOutputFormat(
            GraphConfig graphConfig,
            NebulaGraphConnectionProvider graphConnectionProvider,
            NebulaMetaConnectionProvider metaConnectionProvider) {
        EnnVertexConfig ennVertexConfig = (EnnVertexConfig) graphConfig;
        VertexExecutionOptions executionOptions =
                new VertexExecutionOptions.ExecutionOptionBuilder()
                        .setGraphSpace(ennVertexConfig.getGraphSpace())
                        .setTag(ennVertexConfig.getTagName())
                        .setIdIndex(ennVertexConfig.getIndex())//vid
                        .setFields(ennVertexConfig.getFields())
                        .setWriteMode(WriteModeEnum.INSERT)
                        .setPositions(ennVertexConfig.getPositions())
                        .setBatchSize(ennVertexConfig.getBatchSize())
                        .build();
        NebulaVertexBatchOutputFormat outputFormat =
                new NebulaVertexBatchOutputFormat(graphConnectionProvider, metaConnectionProvider, executionOptions);
        return new NebulaSinkFunction<>(outputFormat);
    }

    @Override
    public void addToBatch(Row record) {
        /*将record转换成nebulagraph的点*/
        NebulaVertex vertex = converter.createVertex(record, executionOptions.getPolicy());
        if (vertex == null) {
            return;
        }
        nebulaVertexList.add(vertex);
    }

   @Override
    public void executeBatch(Session session) throws IOException {
        if (isBatchEmpty()) {
            return;
        }
        NebulaVertices nebulaVertices = new NebulaVertices(executionOptions.getLabel(),
                executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy(),
                executionOptions.isDeleteExecutedWithEdges());
        // generate the write ngql statement
        String statement = null;
        switch (executionOptions.getWriteMode()) {
            case INSERT:
                LOG.info("执行点的写入");
                statement = nebulaVertices.getInsertStatement();
                break;
            case UPDATE:
                LOG.info("执行点的更新");
                statement = nebulaVertices.getUpdateStatement();
                break;
            case DELETE:

                statement = nebulaVertices.getDeleteStatement();
                LOG.info("执行点的删除,语句={}", statement);
                break;
            default:
                throw new IllegalArgumentException("write mode is not supported");
        }
        executeStatement(session, statement);
        clearBatch();
    }

补充一下哈
我用nebula-flink-connector的datastream api 时发现了一个问题 当我的source使用的是flinkcdc 然后把source数据反序列化成row类型这时候row的RowKind有INSERT、DELETE和UPDATE 在nebula-flink-connector的NebulaVertexBatchExecutor中的addToBatch方法并没有区分是inser和delete 而是在executeBatch方法中通过executionOptions.getWriteMode()
来判断的 而executionOptions是在NebulaVertexBatchOutputFormat构造的时候传过来的 这就等于初始化NebulaVertexBatchOutputFormat的时候就设置成固定值了 那当mysql这边删除了一条数据 在executeBatch中是判断不出来的WriteMode到底是INSERT、UPDATE、DELETE

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。