如何将Exchange不支持的数据源中的数据导入Nebula

Exchange 是Nebula的多源数据导入工具, 目前已支持 CSV、JSON、HBASE、MySQL、ClickHouse、MaxCompute、Kafka等多种数据源。

问:如果你想将 不在以上所列的数据源中的数据导入Nebula 如果做?

答:Nebula Spark Connector是Nebula 的一个Spark连接器,你可以自行编写简单的程序来读取你的数据源数据,并将数据写入Nebula,具体代码示例见 nebula-spark-connector/NebulaSparkWriterExample.scala at 54bce417756a570c07dcf85f4c9dc7a8c36afb84 · vesoft-inc/nebula-spark-connector · GitHub

Mysql 数据源导入Nebula中,使用Spark Connector的代码示例:(只需要自己写读取数据源的几行代码,写入nebula的代码是一样的)

def writeVertex(spark: SparkSession): Unit = {
    LOG.info("start to write nebula vertices")
    var df = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://127.0.0.1:3360/db?useUnicode=true&characterEncoding=utf-8")
      .option("dbtable", "table")
      .option("user", "root")
      .option("password", "nebula")
      .load()
      
    df = df.select("select id, a as name, b as age from table")

    val config = getNebulaConnectionConfig()
    val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
      .builder()
      .withSpace("test")
      .withTag("person")
      .withVidField("id")
      .withVidAsProp(false)
      .withBatch(1000)
      .build()
    df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
  }
1 个赞