nebula_algorithm运行报错

  • nebula 版本:v2.0.1
  • 部署方式(分布式 / 单机 / Docker / DBaaS):分布式
  • 是否为线上版本:N

spark version 2.3.2.3.1.0.0-78
Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_232)

https://github.com/vesoft-inc/nebula-spark-utils/tree/master/nebula-algorithm

对于scala和spark都不会,但想试试nebula-algorithm的demo
编译nebula-algorithm后运行出现如下报错:

21/05/20 21:03:17 INFO NebulaDataSourceEdgeReader: prop name start_year, type 2
21/05/20 21:03:17 INFO NebulaDataSourceEdgeReader: prop name end_year, type 2
21/05/20 21:03:17 INFO NebulaDataSourceEdgeReader: dataset's schema: StructType(StructField(_srcId,StringType,false), StructField(_dstId,StringType,false), StructField(_rank,L, StructField(start_year,LongType,true), StructField(end_year,LongType,true))
21/05/20 21:03:20 INFO CodeGenerator: Code generated in 396.44616 ms
Exception in thread "main" java.lang.AbstractMethodError: com.vesoft.nebula.connector.reader.NebulaDataSourceEdgeReader.createDataReaderFactories()Ljava/util/List;
        at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.readerFactories$lzycompute(DataSourceV2ScanExec.scala:55)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.readerFactories(DataSourceV2ScanExec.scala:52)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDD$lzycompute(DataSourceV2ScanExec.scala:76)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDD(DataSourceV2ScanExec.scala:60)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDDs(DataSourceV2ScanExec.scala:79)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
       at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.MapElementsExec.doExecute(objects.scala:233)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
        at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2980)
        at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2978)
        at com.vesoft.nebula.algorithm.utils.NebulaUtil$.loadInitGraph(NebulaUtil.scala:33)
        at com.vesoft.nebula.algorithm.lib.PageRankAlgo$.apply(PageRankAlgo.scala:37)
        at com.vesoft.nebula.algorithm.Main$.executeAlgorithm(Main.scala:127)
        at com.vesoft.nebula.algorithm.Main$.main(Main.scala:75)
        at com.vesoft.nebula.algorithm.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
21/05/20 21:03:20 INFO SparkContext: Invoking stop() from shutdown hook
21/05/20 21:03:20 INFO AbstractConnector: Stopped Spark@32c0915e{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}

配置文件

    # data source. optional of nebula,csv,json
    source: nebula
    # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
    sink: nebula
    # if your algorithm needs weight
    hasWeight: false
  }

  # Nebula Graph relation config
  nebula: {
    # algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
    read: {
        # Nebula metad server address, multiple addresses are split by English comma
        metaAddress: "XXX"
        # Nebula space
        space: nb
        # Nebula edge types, multiple labels means that data from multiple edges will union together
        labels: ["serve"]
        # Nebula edge property name for each edge type, this property will be as weight col for algorithm.
        # Make sure the weightCols are corresponding to labels.
        weightCols: ["start_year"]
    }

    # algo result sink into Nebula. If data.sink is nebula, then this nebula.write config can be valid.
    write:{
        # Nebula graphd server address, multiple addresses are split by English comma
        graphAddress: "XX"
        # Nebula metad server address, multiple addresses are split by English comma
        metaAddress: "XXX"
        user:root
        pswd:"XXX"
        # Nebula space name
        space:nb
        # Nebula tag name, the algorithm result will be write into this tag
        tag:pagerank
    }
  }
  algorithm: {
    # the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent,
    # labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount,
    # betweenness]
    executeAlgo: pagerank

    # PageRank parameter
    pagerank: {
        maxIter: 10
        resetProb: 0.15  # default 0.15
    }

    # Louvain parameter
    louvain: {
        maxIter: 20
        internalIter: 10
        tol: 0.5
   }

图空间

CREATE SPACE nb;
CREATE TAG player (name string, age int);
CREATE TAG team (name string);
CREATE EDGE serve (start_year int, end_year int);
CREATE EDGE follow (degree int);

我这边没有复现这个问题,Schema打印出来后会打印SourceReader日志进行读取数据。

2021-05-25 03:53:23 INFO [nebula.connector.NebulaDataSource:createReader:45] create reader
2021-05-25 03:53:23 INFO [nebula.connector.NebulaDataSource:createReader:46] options {spacename=test, nocolumn=false, metaaddress=192.168.8.94:9559, label=serve, type=edge, connectionretry=2, timeout=6000, executionretry=1, paths=[], limit=1000, returncols=, partitionnumber=100}
2021-05-25 03:53:23 INFO [connector.reader.NebulaSourceReader$$anonfun$getSchema$1:apply:78] prop name start_year, type INT64
2021-05-25 03:53:23 INFO [connector.reader.NebulaSourceReader$$anonfun$getSchema$1:apply:78] prop name end_year, type INT64
2021-05-25 03:53:23 INFO [connector.reader.NebulaSourceReader:readSchema:32] dataset's schema: StructType(StructField(_srcId,StringType,false), StructField(_dstId,StringType,false), StructField(_rank,LongType,false), StructField(start_year,LongType,true), StructField(end_year,LongType,true))
2021-05-25 03:53:25 INFO [nebula.connector.NebulaDataSource:createReader:45] create reader
2021-05-25 03:53:25 INFO [nebula.connector.NebulaDataSource:createReader:46] options {spacename=test, nocolumn=false, metaaddress=192.168.8.94:9559, label=serve, type=edge, connectionretry=2, timeout=6000, executionretry=1, paths=[], limit=1000, returncols=, partitionnumber=100}
2021-05-25 03:53:25 INFO [spark.internal.Logging$class:logInfo:54]
Pushing operators to class com.vesoft.nebula.connector.NebulaDataSource
Pushed Filters:
Post-Scan Filters:
Output: _srcId#0, _dstId#1, _rank#2L, start_year#3L, end_year#4L

你在本地IDE执行下spark-utils repo中的example吧 nebula-spark-utils/NebulaSparkReaderExample.scala at master · vesoft-inc/nebula-spark-utils · GitHub

你这个没有引用相应的包吧。你要把相应的依赖的打进去

浙ICP备20010487号