package com.kzxx.flink.test; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider; import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; import org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat; import org.apache.flink.connector.nebula.sink.NebulaSinkFunction; import org.apache.flink.connector.nebula.statement.ExecutionOptions; import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.connector.nebula.connection.NebulaClientOptions; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import java.util.Arrays; public class NebulaTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource dataSource = env.readTextFile("hdfs://ns1/input/carfile"); SingleOutputStreamOperator row = dataSource.flatMap(new FlatMapFunction() { @Override public void flatMap(String s, Collector collector) throws Exception { JSONObject jsonObject = JSON.parseObject(s); String hphm = jsonObject.getString("car_num"); String hpzl = jsonObject.getString("car_num_type"); String hphmzl = jsonObject.getString("hphmzl"); String csys = jsonObject.getString("csys"); String hpys = jsonObject.getString("hpys"); String clpp1 = jsonObject.getString("clpp1"); if (null != hphm && null != hpzl && null != hphmzl && null != csys && null != hpys && null != clpp1) { Row row = new Row(6); row.setField(0, hphm); row.setField(1, hpzl); row.setField(2, csys); row.setField(3, hpys); row.setField(4, clpp1); row.setField(5, hphmzl); collector.collect(row); } } }); NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder() .setGraphAddress("172.20.62.117:9669, 172.20.62.118:9669, 172.20.62.119:9669") .setMetaAddress("172.20.62.117:9559, 172.20.62.118:9559, 172.20.62.119:9559") .setUsername("root") .setPassword("nebula") .build(); NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions); NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions); ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() // 设置图空间 .setGraphSpace("test_xunuo") // 设置Tag .setTag("Car") .setIdIndex(0) // 设置插入属性 .setFields(Arrays.asList("Hphm", "Hpzl", "Csys", "Hpys", "Clpp", "Hphmzl")) .setPositions(Arrays.asList(0, 1, 2, 3, 4, 5)) .builder(); NebulaBatchOutputFormat outPutFormat = new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider) .setExecutionOptions(executionOptions); NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat); row.addSink(nebulaSinkFunction); env.execute("write nebula"); } }