Nebula Algorithm计算结果写入问题

  • nebula 版本:3.1.0
  • 部署方式:单机
  • 安装方式:RPM
  • 是否为线上版本:N

使用spark submit的方式提交jar包运行degreestatic算法。数据读入方式选择nebula,读出方式分别尝试了csv,text和nebula。其中仅csv写入计算结果成功。其他两类写入方式均报错。
1. 选择text写入,application.conf相关配置如下:

  data: {
    source: nebula
    sink: text
    hasWeight: false
  }
local: {
    read:{
        ......
    }
    write:{
        resultPath:/home/.../degreestatic
    }
  }

使用text写入结果时得到报错信息Text data source does not support bigint data type

Exception in thread "main" org.apache.spark.sql.AnalysisException: Text data source does not support bigint data type.;
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:69)
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$$anonfun$verifySchema$1.apply(DataSourceUtils.scala:67)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:67)
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifyWriteSchema(DataSourceUtils.scala:34)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:100)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:136)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:160)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:157)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:249)
	at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:634)
	at com.vesoft.nebula.algorithm.writer.TextWriter.write(AlgoWriter.scala:60)
	at com.vesoft.nebula.algorithm.Main$.saveAlgoResult(Main.scala:230)
	at com.vesoft.nebula.algorithm.Main$.main(Main.scala:92)
	at com.vesoft.nebula.algorithm.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:855)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:930)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:939)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

2. 选择nebula写入,application.conf相关配置如下:

  data: {
    source: nebula
    sink: nebula
    hasWeight: false
  }
nebula: {
    read: {
        ......
    }
    write:{
        graphAddress: "192.168.XXX.XXX:9669"
        metaAddress: "192.168.XXX.XXX:9559"
        user:root
        pswd:123456
        space:XXXX000003
        tag:degreestatic
        type:insert
    }
  }

使用nebula写入结果时得到报错信息E_TAG_NOT_FOUND

com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: Get tag execute failed, errorCode: E_TAG_NOT_FOUND
	at com.vesoft.nebula.client.meta.MetaClient.getTag(MetaClient.java:319)
	at com.vesoft.nebula.connector.nebula.MetaProvider.getTagSchema(MetaProvider.scala:103)
	at com.vesoft.nebula.connector.writer.NebulaVertexWriter.<init>(NebulaVertexWriter.scala:26)
	at com.vesoft.nebula.connector.writer.NebulaVertexWriterFactory.createDataWriter(NebulaSourceWriter.scala:28)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:113)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
	at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

当写入方式由insert改为update,并预先在图空间内插入名为degreestatic的tag时,得到报错信息 the maximum number of statements for Nebula is 512

Exception in thread "main" java.lang.AssertionError: assertion failed: the maximum number of statements for Nebula is 512
	at scala.Predef$.assert(Predef.scala:170)
	at com.vesoft.nebula.connector.WriteNebulaVertexConfig$WriteVertexConfigBuilder.check(NebulaConfig.scala:382)
	at com.vesoft.nebula.connector.WriteNebulaVertexConfig$WriteVertexConfigBuilder.build(NebulaConfig.scala:350)
	at com.vesoft.nebula.algorithm.writer.NebulaWriter.write(AlgoWriter.scala:45)
	at com.vesoft.nebula.algorithm.Main$.saveAlgoResult(Main.scala:222)
	at com.vesoft.nebula.algorithm.Main$.main(Main.scala:92)
	at com.vesoft.nebula.algorithm.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:855)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:930)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:939)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

麻烦看一下以上的写入错误情况,尤其是写入nebula的错误,多谢!

如果需要将算法结果写入到 Nebula Graph 中,请确保对应图空间中的 Tag 有和上表对应的属性名称和数据类型。参考:https://docs.nebula-graph.com.cn/3.2.0/graph-computing/nebula-algorithm/

1 个赞

这个应该是个bug

这个估计是您的顶点ID特别大

目前设置的是10位的int,实际生产中可能要更长
不过好在这个问题只在写入txt时出现

如果我对tag采用的是insert的方式,我理解应该是写入空间之前自动创建一个指定的tag(如下面用到的degreestatic),同时属性也自动生成(仍以degreestatic为例,写入的结果是四列:_id,degree,inDegree,outDegree)。但是按您的说法,其实选择了insert,也是需要我手动写入tag以及以上四个属性的吗?

nebula: {
    read: {
        ......
    }
    write:{
        ......
        tag:degreestatic
        type:insert
    }
  }

目前看来,当我试图将计算结果写入nebula时,无论我选择的tag写入方式是insert还是update,只要我预先插入了tag,都会返回这个"maximum…512"的报错。
无属性的tag和带有属性的tag都试过,情况相同。带属性的tag创建如下(属性参照了之前csv写入的结果):

create tag degreestatic(degree int, inDegree int, outDegree int, `_id` int)

应该是涉及到的 tag 里要新创建这个算法写回 vertex 里的 property 哈。用 ALTER TAG 去为对应的 TAG 增加属性就可以。

https://docs.nebula-graph.com.cn/3.2.0/3.ngql-guide/10.tag-statements/3.alter-tag/

1 个赞

您好,我分别采用了以下两种方式对空间内所有tag插入新属性:

alter tag XX add (degreestatic string) 

alter tag XX add (degree int, inDegree int, outDegree int)

使用这两种方式,返回内容仍为E_TAG_NOT_FOUND。麻烦您看一下以上插入属性的方式应如何更改。

此外还想问一下,是否有对多个tag统一插入同一个属性的语句?

抱歉,我误导您了 🧎🏻,你一开始的理解是对的,但是不是自动创建,是要咱们手动去创建的

CREATE TAG IF NOT EXISTS DegreeStatic(degree int, inDegree int, outDegree int);

已经成功将计算结果写入nebula了,非常感谢!

1 个赞

此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。