- nebula 版本:使用rpm包安装,2.5.0版本
- 部署方式: 单机
- 安装方式:源RPM
- 是否为线上版本: N
- 硬件信息
- 磁盘( 推荐使用 SSD)
- CPU、内存信息
- 问题的具体描述
使用的示例代码
public class FlinkConnectorExample {
private static final Logger LOG = LoggerFactory.getLogger(FlinkConnectorExample.class);
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<List<String>> playerSource = constructVertexSourceData(env);
sinkVertexData(env, playerSource);
}
/**
* construct flink data source
*/
public static DataStream<List<String>> constructVertexSourceData(
StreamExecutionEnvironment env) {
List<List<String>> player = new ArrayList<>();
List<String> fields1 = Arrays.asList("15", "Bob", "38");
List<String> fields2 = Arrays.asList("16", "Tina", "39");
List<String> fields3 = Arrays.asList("17", "Jena", "30");
List<String> fields4 = Arrays.asList("18", "Tom", "30");
List<String> fields5 = Arrays.asList("19", "Viki", "35");
List<String> fields6 = Arrays.asList("20", "Jime", "33");
List<String> fields7 = Arrays.asList("21", "Jhon", "36");
List<String> fields8 = Arrays.asList("22", "Crea", "30");
player.add(fields1);
player.add(fields2);
player.add(fields3);
player.add(fields4);
player.add(fields5);
player.add(fields6);
player.add(fields7);
player.add(fields8);
DataStream<List<String>> playerSource = env.fromCollection(player);
return playerSource;
}
/**
* sink Nebula Graph with default INSERT mode
*/
public static void sinkVertexData(StreamExecutionEnvironment env,
DataStream<List<String>> playerSource) {
NebulaClientOptions nebulaClientOptions =
new NebulaClientOptions.NebulaClientOptionsBuilder()
.setGraphAddress("127.0.0.1:9669")
.setMetaAddress("127.0.0.1:9559")
.build();
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider =
new NebulaMetaConnectionProvider(nebulaClientOptions);
ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("en_relation")
.setTag("en_tag")
.setIdIndex(0)
.setFields(Arrays.asList("name", "age"))
.setPositions(Arrays.asList(1, 2))
.setBatch(2)
.builder();
NebulaBatchOutputFormat outPutFormat =
new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
.setExecutionOptions(executionOptions);
NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
DataStream<Row> dataStream = playerSource.map(row -> {
Row record = new Row(row.size());
for (int i = 0; i < row.size(); i++) {
record.setField(i, row.get(i));
}
return record;
});
dataStream.addSink(nebulaSinkFunction);
try {
env.execute("Write Nebula");
} catch (Exception e) {
LOG.error("error when write Nebula Graph, ", e);
System.exit(-1);
}
}
}
运行后报错
后台服务显示正常的
studio连接也是正常的,请大佬支招