大头番茄
1
- nebula 版本:3.8.0
- nebula-flink-connector版本:3.8.1
- 部署方式:docker-compose-lite
- 安装方式: Docker
- 问题的具体描述:在nebula-flink-connector中使用NebulaSinkFunction时在构建NebulaVertexBatchOutputFormat时候首先需要构建VertexExecutionOptions对象,在构建VertexExecutionOptions对象时要先设置WriteMode,这里的WriteMode包含INSERT、DELETE、UPDATE,但是当source使用FlinkCDC时,这个时候VertexExecutionOptions的WriteMode就无法提前设置了,只能在获取到每一条Row类型的数据时候才能确定WriteMode,所以NebulaSinkFunction是无法支持FlinkCDC的source的,不过以FlinkSQL的方式是支持的。
public static NebulaSinkFunction<Row> getNebulaVertexBatchOutputFormat(
GraphConfig graphConfig,
NebulaGraphConnectionProvider graphConnectionProvider,
NebulaMetaConnectionProvider metaConnectionProvider) {
EnnVertexConfig ennVertexConfig = (EnnVertexConfig) graphConfig;
VertexExecutionOptions executionOptions =
new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace(ennVertexConfig.getGraphSpace())
.setTag(ennVertexConfig.getTagName())
.setIdIndex(ennVertexConfig.getIndex())//vid
.setFields(ennVertexConfig.getFields())
.setWriteMode(WriteModeEnum.INSERT)
.setPositions(ennVertexConfig.getPositions())
.setBatchSize(ennVertexConfig.getBatchSize())
.build();
NebulaVertexBatchOutputFormat outputFormat =
new NebulaVertexBatchOutputFormat(graphConnectionProvider, metaConnectionProvider, executionOptions);
return new NebulaSinkFunction<>(outputFormat);
}
@Override
public void addToBatch(Row record) {
/*将record转换成nebulagraph的点*/
NebulaVertex vertex = converter.createVertex(record, executionOptions.getPolicy());
if (vertex == null) {
return;
}
nebulaVertexList.add(vertex);
}
@Override
public void executeBatch(Session session) throws IOException {
if (isBatchEmpty()) {
return;
}
NebulaVertices nebulaVertices = new NebulaVertices(executionOptions.getLabel(),
executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy(),
executionOptions.isDeleteExecutedWithEdges());
// generate the write ngql statement
String statement = null;
switch (executionOptions.getWriteMode()) {
case INSERT:
LOG.info("执行点的写入");
statement = nebulaVertices.getInsertStatement();
break;
case UPDATE:
LOG.info("执行点的更新");
statement = nebulaVertices.getUpdateStatement();
break;
case DELETE:
statement = nebulaVertices.getDeleteStatement();
LOG.info("执行点的删除,语句={}", statement);
break;
default:
throw new IllegalArgumentException("write mode is not supported");
}
executeStatement(session, statement);
clearBatch();
}
大头番茄
2
补充一下哈
我用nebula-flink-connector的datastream api 时发现了一个问题 当我的source使用的是flinkcdc 然后把source数据反序列化成row类型这时候row的RowKind有INSERT、DELETE和UPDATE 在nebula-flink-connector的NebulaVertexBatchExecutor中的addToBatch方法并没有区分是inser和delete 而是在executeBatch方法中通过executionOptions.getWriteMode()
来判断的 而executionOptions是在NebulaVertexBatchOutputFormat构造的时候传过来的 这就等于初始化NebulaVertexBatchOutputFormat的时候就设置成固定值了 那当mysql这边删除了一条数据 在executeBatch中是判断不出来的WriteMode到底是INSERT、UPDATE、DELETE
system
关闭
3
此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。