Nebula Graph2.0GA单机版运行GraphX问题

:thinking:

是在原来装TigerGraph的一台机器装的Nebula Graph :grinning:

没有连上StorageClient,你上个帖子说 你是在安装Nebula服务的机器上运行的的程序, 可以用JavaClient的MetaClient测试下listHosts的结果么。

是参考这个连接吗?GitHub - vesoft-inc/nebula-java: Client API and data importer of Nebula Graph in Java

是的

test case

public class TestMetaClient extends TestCase {

    private static final Logger LOGGER = LoggerFactory.getLogger(TestMetaClient.class);

    private final String address = "10.27.20.112";
    private final int port = 9559;

    private MetaClient metaClient;

    public void testListHosts() {
        if (metaClient == null) {
            metaClient = new MetaClient(address, port);
        }
        metaClient.listHosts();
    }
}

运行testListHosts()方法报错:

com.vesoft.nebula.client.graph.exception.IOErrorException: All servers are broken.
	at com.vesoft.nebula.client.graph.net.ConnObjectPool.create(ConnObjectPool.java:24)
	at com.vesoft.nebula.client.graph.net.ConnObjectPool.create(ConnObjectPool.java:10)
	at org.apache.commons.pool2.BasePooledObjectFactory.makeObject(BasePooledObjectFactory.java:60)
	at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:836)
	at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:434)
	at com.vesoft.nebula.client.graph.net.NebulaPool.getSession(NebulaPool.java:96)
	at com.vesoft.nebula.client.meta.MockNebulaGraph.initGraph(MockNebulaGraph.java:37)
	at com.vesoft.nebula.client.meta.TestMetaClient.setUp(TestMetaClient.java:29)
	at junit.framework.TestCase.runBare(TestCase.java:140)
	at junit.framework.TestResult$1.protect(TestResult.java:122)
	at junit.framework.TestResult.runProtected(TestResult.java:142)
	at junit.framework.TestResult.run(TestResult.java:125)
	at junit.framework.TestCase.run(TestCase.java:130)
	at junit.framework.TestSuite.runTest(TestSuite.java:241)
	at junit.framework.TestSuite.run(TestSuite.java:236)
	at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:90)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)

你是不是执行成源码中的testMetaClient类了。 看堆栈中是执行的TestMetaClient的测试,首先会用到graphClient去创建space、tag和edge。 你只执行testListHosts方法看下能不能拿到storage的host信息。

可以了

public class TestMetaClient extends TestCase {

    private final String address = "10.27.20.112";
    private final int port = 9559;

    private MetaClient metaClient;

    public void testListHosts() throws TException {
        if (metaClient == null) {
            metaClient = new MetaClient(address, port);
        }
        metaClient.connect();
        Set<HostAddr> hostAddrs = metaClient.listHosts();
        System.out.println(hostAddrs);
    }
}

可是我最上面的Spark任务的报错怎么解决呢?

这里可以获取到storagd服务的地址,应该也能连接上的。你直接去跑StorageClient的测试,如果可以连接,那Spark也应该能连上storaged。

可是spark报错了啊,我在最上面已经贴出来报错信息了 :rofl:

你用10.27.20.112:9559 这个地址 跑一下StorageClient的测试试下 看能否正常连接

测试了,能正常连接

简化了下程序

import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory


val sparkConf = new SparkConf
sparkConf
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
  .builder()
  .master("local")
  .config(sparkConf)
  .getOrCreate()

println("start to read nebula vertices")
val config =
  NebulaConnectionConfig
    .builder()
    .withMetaAddress("10.27.20.112:9559")
    .withTimeout(3000)
    .withConenctionRetry(2)
    .build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
  .builder()
  .withSpace("csv")
  .withLabel("user")
  .withNoColumn(false)
  .withLimit(10)
  .build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
vertex.printSchema()
vertex.show(20)
println("vertex count: " + vertex.count())

//readVertex(spark)
//readEdges(spark)
//readVertexGraph(spark)
//readEdgeGraph(spark)

spark.close()
sys.exit()

报错

# spark-shell -i NebulaSparkReaderExample.scala
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data1/spark/spark-2.4.7-bin-hadoop2.7/jars/nebula-spark-connector-2.0.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data1/spark/spark-2.4.7-bin-hadoop2.7/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
21/04/06 09:06:13 WARN Utils: Your hostname, tigergraph9001 resolves to a loopback address: 127.0.0.1; using 10.27.20.112 instead (on interface eth0)
21/04/06 09:06:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/04/06 09:06:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://tigergraph9001:4040
Spark context available as 'sc' (master = local[*], app id = local-1617699988425).
Spark session available as 'spark'.
21/04/06 09:06:36 WARN SparkSession$Builder: Using an existing SparkSession; some spark core configurations may not take effect.
start to read nebula vertices
21/04/06 09:06:36 WARN ReadNebulaConfig$: returnCols is empty and your result will contain all properties for user
root
 |-- _vertexId: string (nullable = false)
 |-- id: string (nullable = true)

[Stage 0:>                                                          (0 + 1) / 1]21/04/06 09:06:58 ERROR ScanVertexResultIterator: get storage client error,
java.util.NoSuchElementException: Unable to activate object
        at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:400)
        at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:277)
        at com.vesoft.nebula.client.storage.StorageConnPool.getStorageConnection(StorageConnPool.java:42)
        at com.vesoft.nebula.client.storage.scan.ScanVertexResultIterator.lambda$next$0(ScanVertexResultIterator.java:81)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: com.facebook.thrift.transport.TTransportException: java.net.ConnectException: Cannot assign requested address
        at com.facebook.thrift.transport.TSocket.open(TSocket.java:204)
        at com.vesoft.nebula.client.storage.GraphStorageConnection.open(GraphStorageConnection.java:40)
        at com.vesoft.nebula.client.storage.StorageConnPoolFactory.activateObject(StorageConnPoolFactory.java:59)
        at com.vesoft.nebula.client.storage.StorageConnPoolFactory.activateObject(StorageConnPoolFactory.java:16)
        at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:391)
        ... 8 more
Caused by: java.net.ConnectException: Cannot assign requested address
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at com.facebook.thrift.transport.TSocket.open(TSocket.java:199)
        ... 12 more
21/04/06 09:06:58 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
        at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:99)
        at com.vesoft.nebula.client.storage.scan.ScanVertexResultIterator.next(ScanVertexResultIterator.java:133)
        at com.vesoft.nebula.connector.reader.NebulaVertexPartitionReader.next(NebulaVertexPartitionReader.scala:67)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
21/04/06 09:06:58 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): com.vesoft.nebula.client.meta.exception.ExecuteFailedException:Execute failed: no parts succeed, error message: Unable to activate object

val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace(“csv”)
.withLabel(“user”)

这里的withLabel(“user”)里面填csv空间下的节点名称user,这里没错吧

对的,配置没问题,但你还是storageClient还是连接不上,奇怪。我没有复现出这个问题, 你那边可以debug到StorageConnPool.java:42这个地方么

有Java直接连Nebula Graph然后运行GraphX任务的Example吗?

有的,就是你使用的那个代码: nebula-spark-utils/NebulaSparkReaderExample.scala at master · vesoft-inc/nebula-spark-utils · GitHub

这不是scala嘛,没有java的吗。。

根据nebula-spark-utils里面example写的程序连接Nebula报错了

import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory

object NebulaPageRank {

  private val LOG = LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf
    sparkConf
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
    val spark = SparkSession
      .builder()
      .master("local[2]")
      .config(sparkConf)
      .getOrCreate()

    LOG.info("start to read nebula vertices")

    val config =
      NebulaConnectionConfig
        .builder()
        .withMetaAddress("10.27.20.112:9559")
        .withTimeout(6000)
        .withConenctionRetry(2)
        .build()

    val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("csv")
      .withLabel("user")
      .withNoColumn(false)
      .withReturnCols(List("id"))
      .withLimit(10)
      .withPartitionNum(2)
      .build()

    val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
      .builder()
      .withSpace("csv")
      .withLabel("action")
      .withNoColumn(false)
      .build()

    val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
    LOG.info("vertex rdd first record: " + vertex.first())
    LOG.info("vertex rdd count: {}", vertex.count())

    //    val vertexRDD = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToGraphx()
    //    val edgeRDD = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToGraphx()
    //    val graph = Graph(vertexRDD, edgeRDD)
    //
    //    val ranks = graph.pageRank(0.01).vertices
    //
    //    LOG.info(ranks.top(10).mkString("\n"))

    spark.close()
    sys.exit()
  }
}

报错:

INFO [main] - Started o.s.j.s.ServletContextHandler@79d9214d{/stages/stage/kill,null,AVAILABLE,@Spark}
 INFO [main] - Bound SparkUI to 0.0.0.0, and started at http://10.232.4.56:4040
 INFO [main] - Starting executor ID driver on host localhost
 INFO [main] - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 60357.
 INFO [main] - Server created on 10.232.4.56:60357
 INFO [main] - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
 INFO [main] - Registering BlockManager BlockManagerId(driver, 10.232.4.56, 60357, None)
 INFO [dispatcher-event-loop-0] - Registering block manager 10.232.4.56:60357 with 2004.6 MB RAM, BlockManagerId(driver, 10.232.4.56, 60357, None)
 INFO [main] - Registered BlockManager BlockManagerId(driver, 10.232.4.56, 60357, None)
 INFO [main] - Initialized BlockManager: BlockManagerId(driver, 10.232.4.56, 60357, None)
 INFO [main] - Started o.s.j.s.ServletContextHandler@59b65dce{/metrics/json,null,AVAILABLE,@Spark}
 INFO [main] - start to read nebula vertices
 INFO [main] - NebulaReadConfig={space=csv,label=user,returnCols=List(id),noColumn=false,partitionNum=2}
 WARN [main] - returnCols is empty and your result will contain all properties for action
 INFO [main] - NebulaReadConfig={space=csv,label=action,returnCols=List(),noColumn=false,partitionNum=100}
 INFO [main] - Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/mvpzhao/tools/java-workspace/nebula-spark-utils/spark-warehouse/').
 INFO [main] - Warehouse path is 'file:/Users/mvpzhao/tools/java-workspace/nebula-spark-utils/spark-warehouse/'.
 INFO [main] - Started o.s.j.s.ServletContextHandler@2935fd2c{/SQL,null,AVAILABLE,@Spark}
 INFO [main] - Started o.s.j.s.ServletContextHandler@3f018494{/SQL/json,null,AVAILABLE,@Spark}
 INFO [main] - Started o.s.j.s.ServletContextHandler@62b790a5{/SQL/execution,null,AVAILABLE,@Spark}
 INFO [main] - Started o.s.j.s.ServletContextHandler@7c52fc81{/SQL/execution/json,null,AVAILABLE,@Spark}
 INFO [main] - Started o.s.j.s.ServletContextHandler@6bccd036{/static/sql,null,AVAILABLE,@Spark}
 INFO [main] - Registered StateStoreCoordinator endpoint
 INFO [main] - create reader
 INFO [main] - options {executionretry=1, spacename=csv, nocolumn=false, limit=10, returncols=id, metaaddress=10.27.20.112:9559, label=user, type=vertex, partitionnumber=2, connectionretry=2, timeout=6000}
 INFO [main] - dataset's schema: StructType(StructField(_vertexId,StringType,false), StructField(id,StringType,true))
 INFO [main] - Code generated in 194.618011 ms
 INFO [main] - Code generated in 12.265808 ms
 INFO [Thread-1] - Invoking stop() from shutdown hook
Exception in thread "main" java.lang.AbstractMethodError: com.vesoft.nebula.connector.reader.NebulaDataSourceVertexReader.createDataReaderFactories()Ljava/util/List;
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.readerFactories$lzycompute(DataSourceV2ScanExec.scala:55)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.readerFactories(DataSourceV2ScanExec.scala:52)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDD$lzycompute(DataSourceV2ScanExec.scala:76)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDD(DataSourceV2ScanExec.scala:60)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDDs(DataSourceV2ScanExec.scala:79)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2496)
	at org.apache.spark.sql.Dataset.first(Dataset.scala:2503)
	at com.vesoft.nebula.examples.connector.NebulaPageRank$.main(NebulaPageRank.scala:59)
	at com.vesoft.nebula.examples.connector.NebulaPageRank.main(NebulaPageRank.scala)
 INFO [Thread-1] - Stopped Spark@4926fbc6{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
 INFO [Thread-1] - Stopped Spark web UI at http://10.232.4.56:4040
 INFO [dispatcher-event-loop-1] - MapOutputTrackerMasterEndpoint stopped!
 INFO [Thread-1] - MemoryStore cleared
 INFO [Thread-1] - BlockManager stopped
 INFO [Thread-1] - BlockManagerMaster stopped
 INFO [dispatcher-event-loop-0] - OutputCommitCoordinator stopped!
 INFO [Thread-1] - Successfully stopped SparkContext
 INFO [Thread-1] - Shutdown hook called
 INFO [Thread-1] - Deleting directory /private/var/folders/v0/mbqbj_m161n4ll7pdlldlx540000gn/T/spark-be35c966-3cb4-4360-81e4-d413a2c6434c

看了下NebulaDataSourceVertexReader里面没有这个方法啊createDataReaderFactories()!

SparkConnector中没有定义createDataReaderFactories方法,你使用的Spark是什么版本的, 我们要求必须是2.3 以上版本。