是在原来装TigerGraph的一台机器装的Nebula Graph
没有连上StorageClient,你上个帖子说 你是在安装Nebula服务的机器上运行的的程序, 可以用JavaClient的MetaClient测试下listHosts的结果么。
是的
test case
public class TestMetaClient extends TestCase {
private static final Logger LOGGER = LoggerFactory.getLogger(TestMetaClient.class);
private final String address = "10.27.20.112";
private final int port = 9559;
private MetaClient metaClient;
public void testListHosts() {
if (metaClient == null) {
metaClient = new MetaClient(address, port);
}
metaClient.listHosts();
}
}
运行testListHosts()方法报错:
com.vesoft.nebula.client.graph.exception.IOErrorException: All servers are broken.
at com.vesoft.nebula.client.graph.net.ConnObjectPool.create(ConnObjectPool.java:24)
at com.vesoft.nebula.client.graph.net.ConnObjectPool.create(ConnObjectPool.java:10)
at org.apache.commons.pool2.BasePooledObjectFactory.makeObject(BasePooledObjectFactory.java:60)
at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:836)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:434)
at com.vesoft.nebula.client.graph.net.NebulaPool.getSession(NebulaPool.java:96)
at com.vesoft.nebula.client.meta.MockNebulaGraph.initGraph(MockNebulaGraph.java:37)
at com.vesoft.nebula.client.meta.TestMetaClient.setUp(TestMetaClient.java:29)
at junit.framework.TestCase.runBare(TestCase.java:140)
at junit.framework.TestResult$1.protect(TestResult.java:122)
at junit.framework.TestResult.runProtected(TestResult.java:142)
at junit.framework.TestResult.run(TestResult.java:125)
at junit.framework.TestCase.run(TestCase.java:130)
at junit.framework.TestSuite.runTest(TestSuite.java:241)
at junit.framework.TestSuite.run(TestSuite.java:236)
at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:90)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
你是不是执行成源码中的testMetaClient类了。 看堆栈中是执行的TestMetaClient的测试,首先会用到graphClient去创建space、tag和edge。 你只执行testListHosts方法看下能不能拿到storage的host信息。
可以了
public class TestMetaClient extends TestCase {
private final String address = "10.27.20.112";
private final int port = 9559;
private MetaClient metaClient;
public void testListHosts() throws TException {
if (metaClient == null) {
metaClient = new MetaClient(address, port);
}
metaClient.connect();
Set<HostAddr> hostAddrs = metaClient.listHosts();
System.out.println(hostAddrs);
}
}
可是我最上面的Spark任务的报错怎么解决呢?
这里可以获取到storagd服务的地址,应该也能连接上的。你直接去跑StorageClient的测试,如果可以连接,那Spark也应该能连上storaged。
可是spark报错了啊,我在最上面已经贴出来报错信息了
你用10.27.20.112:9559 这个地址 跑一下StorageClient的测试试下 看能否正常连接
简化了下程序
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
val sparkConf = new SparkConf
sparkConf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local")
.config(sparkConf)
.getOrCreate()
println("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("10.27.20.112:9559")
.withTimeout(3000)
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("csv")
.withLabel("user")
.withNoColumn(false)
.withLimit(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
vertex.printSchema()
vertex.show(20)
println("vertex count: " + vertex.count())
//readVertex(spark)
//readEdges(spark)
//readVertexGraph(spark)
//readEdgeGraph(spark)
spark.close()
sys.exit()
报错
# spark-shell -i NebulaSparkReaderExample.scala
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data1/spark/spark-2.4.7-bin-hadoop2.7/jars/nebula-spark-connector-2.0.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data1/spark/spark-2.4.7-bin-hadoop2.7/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
21/04/06 09:06:13 WARN Utils: Your hostname, tigergraph9001 resolves to a loopback address: 127.0.0.1; using 10.27.20.112 instead (on interface eth0)
21/04/06 09:06:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/04/06 09:06:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://tigergraph9001:4040
Spark context available as 'sc' (master = local[*], app id = local-1617699988425).
Spark session available as 'spark'.
21/04/06 09:06:36 WARN SparkSession$Builder: Using an existing SparkSession; some spark core configurations may not take effect.
start to read nebula vertices
21/04/06 09:06:36 WARN ReadNebulaConfig$: returnCols is empty and your result will contain all properties for user
root
|-- _vertexId: string (nullable = false)
|-- id: string (nullable = true)
[Stage 0:> (0 + 1) / 1]21/04/06 09:06:58 ERROR ScanVertexResultIterator: get storage client error,
java.util.NoSuchElementException: Unable to activate object
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:400)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:277)
at com.vesoft.nebula.client.storage.StorageConnPool.getStorageConnection(StorageConnPool.java:42)
at com.vesoft.nebula.client.storage.scan.ScanVertexResultIterator.lambda$next$0(ScanVertexResultIterator.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.facebook.thrift.transport.TTransportException: java.net.ConnectException: Cannot assign requested address
at com.facebook.thrift.transport.TSocket.open(TSocket.java:204)
at com.vesoft.nebula.client.storage.GraphStorageConnection.open(GraphStorageConnection.java:40)
at com.vesoft.nebula.client.storage.StorageConnPoolFactory.activateObject(StorageConnPoolFactory.java:59)
at com.vesoft.nebula.client.storage.StorageConnPoolFactory.activateObject(StorageConnPoolFactory.java:16)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:391)
... 8 more
Caused by: java.net.ConnectException: Cannot assign requested address
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at com.facebook.thrift.transport.TSocket.open(TSocket.java:199)
... 12 more
21/04/06 09:06:58 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:99)
at com.vesoft.nebula.client.storage.scan.ScanVertexResultIterator.next(ScanVertexResultIterator.java:133)
at com.vesoft.nebula.connector.reader.NebulaVertexPartitionReader.next(NebulaVertexPartitionReader.scala:67)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
21/04/06 09:06:58 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): com.vesoft.nebula.client.meta.exception.ExecuteFailedException:Execute failed: no parts succeed, error message: Unable to activate object
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace(“csv”)
.withLabel(“user”)
这里的withLabel(“user”)里面填csv空间下的节点名称user,这里没错吧
对的,配置没问题,但你还是storageClient还是连接不上,奇怪。我没有复现出这个问题, 你那边可以debug到StorageConnPool.java:42这个地方么
有Java直接连Nebula Graph然后运行GraphX任务的Example吗?
有的,就是你使用的那个代码: nebula-spark-utils/NebulaSparkReaderExample.scala at master · vesoft-inc/nebula-spark-utils · GitHub
这不是scala嘛,没有java的吗。。
根据nebula-spark-utils里面example写的程序连接Nebula报错了
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object NebulaPageRank {
private val LOG = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf
sparkConf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local[2]")
.config(sparkConf)
.getOrCreate()
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("10.27.20.112:9559")
.withTimeout(6000)
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("csv")
.withLabel("user")
.withNoColumn(false)
.withReturnCols(List("id"))
.withLimit(10)
.withPartitionNum(2)
.build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("csv")
.withLabel("action")
.withNoColumn(false)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
LOG.info("vertex rdd first record: " + vertex.first())
LOG.info("vertex rdd count: {}", vertex.count())
// val vertexRDD = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToGraphx()
// val edgeRDD = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToGraphx()
// val graph = Graph(vertexRDD, edgeRDD)
//
// val ranks = graph.pageRank(0.01).vertices
//
// LOG.info(ranks.top(10).mkString("\n"))
spark.close()
sys.exit()
}
}
报错:
INFO [main] - Started o.s.j.s.ServletContextHandler@79d9214d{/stages/stage/kill,null,AVAILABLE,@Spark}
INFO [main] - Bound SparkUI to 0.0.0.0, and started at http://10.232.4.56:4040
INFO [main] - Starting executor ID driver on host localhost
INFO [main] - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 60357.
INFO [main] - Server created on 10.232.4.56:60357
INFO [main] - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
INFO [main] - Registering BlockManager BlockManagerId(driver, 10.232.4.56, 60357, None)
INFO [dispatcher-event-loop-0] - Registering block manager 10.232.4.56:60357 with 2004.6 MB RAM, BlockManagerId(driver, 10.232.4.56, 60357, None)
INFO [main] - Registered BlockManager BlockManagerId(driver, 10.232.4.56, 60357, None)
INFO [main] - Initialized BlockManager: BlockManagerId(driver, 10.232.4.56, 60357, None)
INFO [main] - Started o.s.j.s.ServletContextHandler@59b65dce{/metrics/json,null,AVAILABLE,@Spark}
INFO [main] - start to read nebula vertices
INFO [main] - NebulaReadConfig={space=csv,label=user,returnCols=List(id),noColumn=false,partitionNum=2}
WARN [main] - returnCols is empty and your result will contain all properties for action
INFO [main] - NebulaReadConfig={space=csv,label=action,returnCols=List(),noColumn=false,partitionNum=100}
INFO [main] - Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/mvpzhao/tools/java-workspace/nebula-spark-utils/spark-warehouse/').
INFO [main] - Warehouse path is 'file:/Users/mvpzhao/tools/java-workspace/nebula-spark-utils/spark-warehouse/'.
INFO [main] - Started o.s.j.s.ServletContextHandler@2935fd2c{/SQL,null,AVAILABLE,@Spark}
INFO [main] - Started o.s.j.s.ServletContextHandler@3f018494{/SQL/json,null,AVAILABLE,@Spark}
INFO [main] - Started o.s.j.s.ServletContextHandler@62b790a5{/SQL/execution,null,AVAILABLE,@Spark}
INFO [main] - Started o.s.j.s.ServletContextHandler@7c52fc81{/SQL/execution/json,null,AVAILABLE,@Spark}
INFO [main] - Started o.s.j.s.ServletContextHandler@6bccd036{/static/sql,null,AVAILABLE,@Spark}
INFO [main] - Registered StateStoreCoordinator endpoint
INFO [main] - create reader
INFO [main] - options {executionretry=1, spacename=csv, nocolumn=false, limit=10, returncols=id, metaaddress=10.27.20.112:9559, label=user, type=vertex, partitionnumber=2, connectionretry=2, timeout=6000}
INFO [main] - dataset's schema: StructType(StructField(_vertexId,StringType,false), StructField(id,StringType,true))
INFO [main] - Code generated in 194.618011 ms
INFO [main] - Code generated in 12.265808 ms
INFO [Thread-1] - Invoking stop() from shutdown hook
Exception in thread "main" java.lang.AbstractMethodError: com.vesoft.nebula.connector.reader.NebulaDataSourceVertexReader.createDataReaderFactories()Ljava/util/List;
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.readerFactories$lzycompute(DataSourceV2ScanExec.scala:55)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.readerFactories(DataSourceV2ScanExec.scala:52)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDD$lzycompute(DataSourceV2ScanExec.scala:76)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDD(DataSourceV2ScanExec.scala:60)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.inputRDDs(DataSourceV2ScanExec.scala:79)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2496)
at org.apache.spark.sql.Dataset.first(Dataset.scala:2503)
at com.vesoft.nebula.examples.connector.NebulaPageRank$.main(NebulaPageRank.scala:59)
at com.vesoft.nebula.examples.connector.NebulaPageRank.main(NebulaPageRank.scala)
INFO [Thread-1] - Stopped Spark@4926fbc6{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
INFO [Thread-1] - Stopped Spark web UI at http://10.232.4.56:4040
INFO [dispatcher-event-loop-1] - MapOutputTrackerMasterEndpoint stopped!
INFO [Thread-1] - MemoryStore cleared
INFO [Thread-1] - BlockManager stopped
INFO [Thread-1] - BlockManagerMaster stopped
INFO [dispatcher-event-loop-0] - OutputCommitCoordinator stopped!
INFO [Thread-1] - Successfully stopped SparkContext
INFO [Thread-1] - Shutdown hook called
INFO [Thread-1] - Deleting directory /private/var/folders/v0/mbqbj_m161n4ll7pdlldlx540000gn/T/spark-be35c966-3cb4-4360-81e4-d413a2c6434c
看了下NebulaDataSourceVertexReader里面没有这个方法啊createDataReaderFactories()!
SparkConnector中没有定义createDataReaderFactories方法,你使用的Spark是什么版本的, 我们要求必须是2.3 以上版本。