Nebula-Exchange ,通过mysql导入数据问题

老师们好,使用过程中我有以下问题,
问题1: 表字段类型是否强一致
例 :如下

fields: [name,bas_id,phone]
nebula.fields: [name,c_no,phone]

bas_id 是数据表字段 bigint 类型, c_no 是nebula 一个属性 string 类型,这样写 ,exchange 会自动转类型吗,是否强一致。

问题2: jar包执行中遇到图片中的问题是因为 /var/log/spark/lineage 没有权限问题吗

spark-submit --master "local"  --class com.vesoft.nebula.exchange.Exchange  ./nebula-exchange-2.6.1.jar  -c ./mysql_application.conf

  1. 不要求数据类型强一致,是严格按照Nebula的数据类型来导入的,比如bas_id在mysql是bigint,对应的c_no在nebula是string,就会按照数据是string类型导入。
  2. 是因为你的环境中没有mysql driver 。 你可以根据mysql的版本 下载 mysql-connector-java-xxx.jar 然后放到spark_home/jars 目录下。
2 个赞

使用mysql 导入数据 为什么会报 hive 异常呢

config文件

{
  # Spark 相关配置
  spark: {
    app: {
      name: Nebula Exchange 2.6.1
    }
    driver: {
      cores: 1
      maxResultSize: 1G
    }
    cores {
      max: 16
    }
  }

# Nebula Graph 相关配置
  nebula: {
    address:{
    graph:["10.*.7.*:9669", "10.*.7.*:9669", "10.*.7.*:9669"]
    meta:["10.*.7.*:9559", "10.*.7.*:9559", "10.*.7.*:9559"]
    }
    # 填写的账号必须拥有 Nebula Graph 相应图空间的写数据权限。
    user: root
    pswd: 123
    # 填写 Nebula Graph 中需要写入数据的图空间名称。
    space: my_xiao_mi
    connection {
      timeout: 3000
      retry: 3
    }
    execution {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors
    }
    rate: {
      limit: 1024
      timeout: 1000
    }
  }
  # 处理点
  tags: [
    # 设置 Tag player 相关信息。
    {
      # Nebula Graph 中对应的 Tag 名称。
      name: Person
      type: {
        # 指定数据源文件格式,设置为 MySQL。
        source: mysql
        # 指定如何将点数据导入 Nebula Graph:Client 或 SST。
        sink: client
      }

      host:10.*.7.*
      port:3306
      database:"lgsbiz_test"
      table:"bas_addressee"
      user:"lgsbiz_sf_test"
      password:"aB3lv78AxL5R"
      sentence:"select name,bas_id,phone from  lgsbiz_test.bas_addressee where DATE_FORMAT(creation_time, '%Y-%m-%d') ='2019-05-17';"

      # 在 fields 里指定 player 表中的列名称,其对应的 value 会作为 Nebula Graph 中指定属性。
      # fields 和 nebula.fields 里的配置必须一一对应。
      # 如果需要指定多个列名称,用英文逗号(,)隔开。
      fields: [name,bas_id,phone]
      nebula.fields: [name,c_no,phone]

      # 指定表中某一列数据为 Nebula Graph 中点 VID 的来源。
      vertex: {
        field:bas_id
      }

      # 单批次写入 Nebula Graph 的数据条数。
      batch: 256

      # Spark 分区数量
      partition: 32
    }
    # 设置 Tag team 相关信息。
    {
      name: Phone
      type: {
        source: mysql
        sink: client
      }

      host:10.*.7.*
      port:3306
      database:"lgsbiz_test"
      table:"bas_addressee"
      user:"lgsbiz_sf_test"
      password:"aB3lv78AxL5R"
      sentence:"select phone from  lgsbiz_test.bas_addressee where DATE_FORMAT(creation_time, '%Y-%m-%d') ='2019-05-17';"

      fields: [phone]
      nebula.fields: [phone]
      vertex: {
        field: phone
      }
      batch: 256
      partition: 32
    }

  ]

  # 处理边数据
  edges: [
    # 设置 Edge type follow 相关信息
    {
      # Nebula Graph 中对应的 Edge type 名称。
      name: LIKES

      type: {
        # 指定数据源文件格式,设置为 MySQL。
        source: mysql

        # 指定边数据导入 Nebula Graph 的方式,
        # 指定如何将点数据导入 Nebula Graph:Client 或 SST。
        sink: client
      }

      host:10.*.7.*
      port:3306
      database:"lgsbiz_test"
      table:"bas_addressee"
      user:"lgsbiz_sf_test"
      password:"aB3lv78AxL5R"
      sentence:"select phone,bas_id ,creation_time from  lgsbiz_test.bas_addressee where DATE_FORMAT(creation_time, '%Y-%m-%d') ='2019-05-17';"

      # 在 fields 里指定 follow 表中的列名称,其对应的 value 会作为 Nebula Graph 中指定属性。
      # fields 和 nebula.fields 里的配置必须一一对应。
      # 如果需要指定多个列名称,用英文逗号(,)隔开。
      fields: [creation_time]
      nebula.fields: [creationDate]

      # 在 source 里,将 follow 表中某一列作为边的起始点数据源。
      # 在 target 里,将 follow 表中某一列作为边的目的点数据源。
      source: {
        field: bas_id
      }

      target: {
        field: phone
      }

      # 单批次写入 Nebula Graph 的数据条数。
      batch: 256

      # Spark 分区数量
      partition: 32
    }

    # 设置 Edge type serve 相关信息
    {
      name: USE
      type: {
        source: mysql
        sink: client
      }

      host:10.*.7.*
      port:3306
      database:"lgsbiz_test"
      table:"bas_addressee"
      user:"lgsbiz_sf_test"
      password:"aB3lv78AxL5R"
      sentence:"select phone,bas_id  from  lgsbiz_test.bas_addressee where DATE_FORMAT(creation_time, '%Y-%m-%d') ='2019-05-17';"

      fields: [-]
      nebula.fields: [-]
      source: {
        field: bas_id
      }
      target: {
        field: phone
      }
      batch: 256
      partition: 32
    }
  ]
}

导入命令发一下,有没有加-h

这是导入命令

spark-submit --master "local"  --class com.vesoft.nebula.exchange.Exchange  ./nebula-exchange-2.6.1.jar  -c ./mysql_application.conf

没有加-h

根据上面的截图,应该是本机安装过一个Spark 环境,这个环境配置了Hive 而且和我们目前使用的Spark版本不一致 导致的加载的时候发现了hive 目录调用方法的时候发现没有这个方法

21/11/25 17:38:32 INFO client.HiveClientImpl: Warehouse location for Hive client (version 2.1.1) is /user/hive/warehouse
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Lorg/apache/hadoop/hive/conf/HiveConf;Lorg/apache/hadoop/hive/metastore/HiveMetaHookLoader;Ljava/util/concurrent/ConcurrentHashMap;Ljava/lang/String;Z)Lorg/apache/hadoop/hive/metastore/IMetaStoreClient;
        at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3819)
        at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3871)
        at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3851)
        at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:4105)
        at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:254)
        at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:237)
        at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:394)
        at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:338)
        at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:318)
        at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:294)
        at org.apache.spark.sql.hive.client.HiveClientImpl.org$apache$spark$sql$hive$client$HiveClientImpl$$client(HiveClientImpl.scala:254)
        at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:276)
        at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:221)
        at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:220)
        at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:266)
        at org.apache.spark.sql.hive.client.HiveClientImpl.databaseExists(HiveClientImpl.scala:356)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:217)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:217)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:217)
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
        at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:216)
        at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:114)
        at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:102)
        at org.apache.spark.sql.hive.HiveSessionStateBuilder.org$apache$spark$sql$hive$HiveSessionStateBuilder$$externalCatalog(HiveSessionStateBuilder.scala:39)
        at org.apache.spark.sql.hive.HiveSessionStateBuilder$$anonfun$1.apply(HiveSessionStateBuilder.scala:54)
        at org.apache.spark.sql.hive.HiveSessionStateBuilder$$anonfun$1.apply(HiveSessionStateBuilder.scala:54)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog$lzycompute(SessionCatalog.scala:90)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog(SessionCatalog.scala:90)
        at org.apache.spark.sql.query.analysis.QueryAnalysis$.hiveCatalog(QueryAnalysis.scala:63)
        at org.apache.spark.sql.query.analysis.QueryAnalysis$.getLineageInfo(QueryAnalysis.scala:88)
        at com.cloudera.spark.lineage.NavigatorQueryListener.onSuccess(ClouderaNavigatorListener.scala:60)
        at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:124)
        at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:123)
        at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:145)
        at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:143)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
        at org.apache.spark.sql.util.ExecutionListenerManager.org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling(QueryExecutionListener.scala:143)
        at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply$mcV$sp(QueryExecutionListener.scala:123)
        at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:123)
        at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:123)
        at org.apache.spark.sql.util.ExecutionListenerManager.readLock(QueryExecutionListener.scala:156)
        at org.apache.spark.sql.util.ExecutionListenerManager.onSuccess(QueryExecutionListener.scala:122)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3367)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3406)
        at org.apache.spark.sql.Dataset.createOrReplaceTempView(Dataset.scala:3095)
        at com.vesoft.nebula.exchange.reader.MySQLReader.read(ServerBaseReader.scala:93)
        at com.vesoft.nebula.exchange.Exchange$.com$vesoft$nebula$exchange$Exchange$$createDataSource(Exchange.scala:275)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:134)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:126)
        at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

sparksql 是支持执行sql语句的, 为了执行你配置文件中配置的执行语句会对dataframe进行创建一个临时表,此时会用到spark绑定的hive环境。
你这个问题是因为你环境中hive metastore版本和exchange中的版本冲突了。 论坛中有相似的帖子,替换下环境中hive metastore 包。

没有明显的error 但是数据没有进入 nebula

21/11/29 16:22:42 INFO exchange.Exchange$: Loading from mysql config: MySql source host: *.*.7.*, port: 3306, database: lgsbiz_test, table: bas_addressee, user: lgsbiz_sf_test, password: *******, sentence: select phone,bas_id  from  bas_addressee where DATE_FORMAT(creation_time, '%Y-%m-%d') ='2019-05-17'
Mon Nov 29 16:22:42 CST 2021 WARN: Caught while disconnecting...

EXCEPTION STACK TRACE:



** BEGIN NESTED EXCEPTION **

javax.net.ssl.SSLException
MESSAGE: closing inbound before receiving peer's close_notify

STACKTRACE:

javax.net.ssl.SSLException: closing inbound before receiving peer's close_notify
        at sun.security.ssl.Alert.createSSLException(Alert.java:133)
        at sun.security.ssl.Alert.createSSLException(Alert.java:117)
        at sun.security.ssl.TransportContext.fatal(TransportContext.java:340)
        at sun.security.ssl.TransportContext.fatal(TransportContext.java:296)
        at sun.security.ssl.TransportContext.fatal(TransportContext.java:287)
        at sun.security.ssl.SSLSocketImpl.shutdownInput(SSLSocketImpl.java:737)
        at sun.security.ssl.SSLSocketImpl.shutdownInput(SSLSocketImpl.java:716)
        at com.mysql.cj.protocol.a.NativeProtocol.quit(NativeProtocol.java:1312)
        at com.mysql.cj.NativeSession.quit(NativeSession.java:182)
        at com.mysql.cj.jdbc.ConnectionImpl.realClose(ConnectionImpl.java:1750)
        at com.mysql.cj.jdbc.ConnectionImpl.close(ConnectionImpl.java:720)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:71)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:210)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:317)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
        at com.vesoft.nebula.exchange.reader.MySQLReader.read(ServerBaseReader.scala:92)
        at com.vesoft.nebula.exchange.Exchange$.com$vesoft$nebula$exchange$Exchange$$createDataSource(Exchange.scala:275)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$3.apply(Exchange.scala:175)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$3.apply(Exchange.scala:168)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:168)
        at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


** END NESTED EXCEPTION **


Exception in thread "main" java.util.NoSuchElementException: key not found: -
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:59)
        at scala.collection.MapLike$class.apply(MapLike.scala:141)
        at scala.collection.AbstractMap.apply(Map.scala:59)
        at com.vesoft.nebula.exchange.utils.NebulaUtils$$anonfun$getDataSourceFieldType$1.apply(NebulaUtils.scala:43)
        at com.vesoft.nebula.exchange.utils.NebulaUtils$$anonfun$getDataSourceFieldType$1.apply(NebulaUtils.scala:42)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at com.vesoft.nebula.exchange.utils.NebulaUtils$.getDataSourceFieldType(NebulaUtils.scala:42)
        at com.vesoft.nebula.exchange.processor.EdgeProcessor.process(EdgeProcessor.scala:103)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$3.apply(Exchange.scala:190)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$3.apply(Exchange.scala:168)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:168)
        at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
21/11/29 16:22:42 INFO spark.SparkContext: Invoking stop() from shutdown hook

上面贴的日志不是明显的error么

我把这段删了 没什么报错数据库也没有数据

把你的配置文件、导入日志贴上来看看, 没有数据肯定有报错。

执行命令

spark-submit --master "local"  --class com.vesoft.nebula.exchange.Exchange  ./nebula-exchange-2.6.1-rel.jar  -c ./mysql_application_rel.conf

配置文件

{
  # Spark 相关配置
  spark: {
    app: {
      name: Nebula Exchange 2.6.1
    }
    driver: {
      cores: 1
      maxResultSize: 1G
    }
    cores {
      max: 16
    }
  }

# Nebula Graph 相关配置
  nebula: {
    address:{
    graph:["10.23.7.5:9669", "10.23.7.6:9669", "10.23.7.7:9669"]
    meta:["10.23.7.5:9559", "10.23.7.6:9559", "10.23.7.7:9559"]
    }
    # 填写的账号必须拥有 Nebula Graph 相应图空间的写数据权限。
    user: root
    pswd: 123
    # 填写 Nebula Graph 中需要写入数据的图空间名称。
    space: my_xiao_mi
    connection {
      timeout: 3000
      retry: 3
    }
    execution {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors
    }
    rate: {
      limit: 1024
      timeout: 1000
    }
  }
  # 处理点
  tags: [
    # 设置 Tag player 相关信息。
    {
      # Nebula Graph 中对应的 Tag 名称。
      name: Person
      type: {
        # 指定数据源文件格式,设置为 MySQL。
        source: mysql
        # 指定如何将点数据导入 Nebula Graph:Client 或 SST。
        sink: client
      }

      host:10.23.7.1
      port:3306
      database:"lgsbiz_test"
      table:"bas_addressee"
      user:"lgsbiz_sf_test"
      password:"aB3lv78AxL5R"
      sentence:"select name,bas_id,phone from  lgsbiz_test.bas_addressee where DATE_FORMAT(creation_time, '%Y-%m-%d') ='2019-05-17'"

      # 在 fields 里指定 player 表中的列名称,其对应的 value 会作为 Nebula Graph 中指定属性。
      # fields 和 nebula.fields 里的配置必须一一对应。
      # 如果需要指定多个列名称,用英文逗号(,)隔开。
      fields: [name,bas_id,phone]
      nebula.fields: [name,c_no,phone]

      # 指定表中某一列数据为 Nebula Graph 中点 VID 的来源。
      vertex: {
        field:bas_id
      }

      # 单批次写入 Nebula Graph 的数据条数。
      batch: 256

      # Spark 分区数量
      partition: 32
    }
    # 设置 Tag team 相关信息。
    {
      name: Phone
      type: {
        source: mysql
        sink: client
      }

      host:10.23.7.1
      port:3306
      database:"lgsbiz_test"
      table:"bas_addressee"
      user:"lgsbiz_sf_test"
      password:"aB3lv78AxL5R"
      sentence:"select phone from  lgsbiz_test.bas_addressee where DATE_FORMAT(creation_time, '%Y-%m-%d') ='2019-05-17'"

      fields: [phone]
      nebula.fields: [phone]
      vertex: {
        field: phone
      }
      batch: 256
      partition: 32
    }

  ]

  # 处理边数据
  edges: [
    # 设置 Edge type follow 相关信息
    {
      # Nebula Graph 中对应的 Edge type 名称。
      name: LIKES

      type: {
        # 指定数据源文件格式,设置为 MySQL。
        source: mysql

        # 指定边数据导入 Nebula Graph 的方式,
        # 指定如何将点数据导入 Nebula Graph:Client 或 SST。
        sink: client
      }

      host:10.23.7.1
      port:3306
      database:"lgsbiz_test"
      table:"bas_addressee"
      user:"lgsbiz_sf_test"
      password:"aB3lv78AxL5R"
      sentence:"select phone,bas_id ,creation_time from  lgsbiz_test.bas_addressee where DATE_FORMAT(creation_time, '%Y-%m-%d') ='2019-05-17'"

      # 在 fields 里指定 follow 表中的列名称,其对应的 value 会作为 Nebula Graph 中指定属性。
      # fields 和 nebula.fields 里的配置必须一一对应。
      # 如果需要指定多个列名称,用英文逗号(,)隔开。
      fields: [creation_time]
      nebula.fields: [creationDate]

      # 在 source 里,将 follow 表中某一列作为边的起始点数据源。
      # 在 target 里,将 follow 表中某一列作为边的目的点数据源。
      source: {
        field: bas_id
      }

      target: {
        field: phone
      }

      # 单批次写入 Nebula Graph 的数据条数。
      batch: 256

      # Spark 分区数量
      partition: 32
    }
  ]
}

导入日志

21/11/29 17:27:58 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@60a19573{/SQL,null,AVAILABLE,@Spark}
21/11/29 17:27:58 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@44a085e5{/SQL/json,null,AVAILABLE,@Spark}
21/11/29 17:27:58 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4db60246{/SQL/execution,null,AVAILABLE,@Spark}
21/11/29 17:27:58 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2152ab30{/SQL/execution/json,null,AVAILABLE,@Spark}
21/11/29 17:27:58 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@18137eab{/static/sql,null,AVAILABLE,@Spark}
21/11/29 17:27:59 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
Exception in thread "main" com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
        at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
        at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:835)
        at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:455)
        at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:240)
        at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:207)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:210)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:317)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
        at com.vesoft.nebula.exchange.reader.MySQLReader.read(ServerBaseReader.scala:92)
        at com.vesoft.nebula.exchange.Exchange$.com$vesoft$nebula$exchange$Exchange$$createDataSource(Exchange.scala:275)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:134)
        at com.vesoft.nebula.exchange.Exchange$$anonfun$main$2.apply(Exchange.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at com.vesoft.nebula.exchange.Exchange$.main(Exchange.scala:126)
        at com.vesoft.nebula.exchange.Exchange.main(Exchange.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
        at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
        at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)
        at com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167)
        at com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:91)
        at com.mysql.cj.NativeSession.connect(NativeSession.java:152)
        at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:955)
        at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:825)
        ... 31 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)
        at java.net.Socket.connect(Socket.java:606)
        at com.mysql.cj.protocol.StandardSocketFactory.connect(StandardSocketFactory.java:155)
        at com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:65)
        ... 34 more
21/11/29 17:27:59 INFO spark.SparkContext: Invoking stop() from shutdown hook

你为啥总是说没有什么报错信息呢。 mysql driver就没读到任何数据

日志中老是出现这样的 问题 ,会影响数据导入吗

EXCEPTION STACK TRACE:



** BEGIN NESTED EXCEPTION **

javax.net.ssl.SSLException
MESSAGE: closing inbound before receiving peer's close_notify

STACKTRACE:

javax.net.ssl.SSLException: closing inbound before receiving peer's close_notify

浙ICP备20010487号