给出我在项目中做的从其他图谱库中通过SPARQL导入本体和关系到NEBULA的代码,供各位参考:
from SPARQLWrapper import SPARQLWrapper, JSON
from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config
from nebula3.common import *
from nebula3.mclient import MetaCache, HostAddr
from nebula3.sclient.GraphStorageClient import GraphStorageClient
QueryService= SPARQLWrapper("http://XXX.XXX.XXX.XXX:3030/hhp20240119/query") # 查询TDB数据库连接
#查询TDB数据库
def QueryTDBData(Sql):
#连接数据库
QueryService.setQuery(Sql)
QueryService.setReturnFormat(JSON)
QueryService.setMethod('POST')
results = QueryService.query().convert()["results"]["bindings"] # 返回查询结果
return results
if __name__ == "__main__":
try:
#检索顶点所需本体的名称,此特例只有一个属性
sql = "prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> \
prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> \
prefix owl: <http://www.w3.org/2002/07/owl#> \
prefix xsd: <http://www.w3.org/2001/XMLSchema#> \
SELECT distinct ?s_name \
WHERE { \
?subject ?predicate ?object . \
?subject rdfs:label ?s_name . \
?subject rdf:type rdfs:Class }"
results= QueryTDBData(sql)
# define a nebula config
config = Config()
config.max_connection_pool_size = 10
# init connection pool
connection_pool = ConnectionPool()
# if the given servers are ok, return true, else return false
ok = connection_pool.init([('XXX.XXX.XXX.XXX', 9669)], config)
# option 1 control the connection release yourself
# get session from the pool
session = connection_pool.get_session('root', '123456')
session.execute('USE demospace')
session.execute('CREATE TAG IF NOT EXISTS weather_concept(name string)')
session.execute('CREATE EDGE include (name string)')
for cur in results:
name = cur["s_name"]["value"]
# 创建weather_concept标签
nebula_sql = 'INSERT VERTEX weather_concept(name) VALUES "'+name +'":("'+name+'")'
print(nebula_sql)
resp = session.execute(nebula_sql)
#为创建顶点关系,检索三元组
sql = "prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> \
prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> \
prefix owl: <http://www.w3.org/2002/07/owl#> \
prefix xsd: <http://www.w3.org/2001/XMLSchema#> \
SELECT distinct ?s_name ?predicate ?o_name \
WHERE { \
?subject ?predicate ?object . \
?subject rdfs:label ?s_name . \
?object rdfs:label ?o_name . \
?subject rdf:type rdfs:Class . \
?object rdf:type rdfs:Class .} "
results1= QueryTDBData(sql)
for cur in results1:
s_name = cur["s_name"]["value"]
o_name = cur["o_name"]["value"]
rela = cur["predicate"]["value"]
if rela == "http://www.w3.org/2000/01/rdf-schema#subClassOf" or rela == "https://piesat.cn/Knowledge_map/relation#隶属":
# 创建weather_concept标签
sql = 'INSERT EDGE include(name) VALUES "'+o_name+'"->"'+s_name+'":("包含")'
resp1 = session.execute(sql)
print(sql)
session.release()
connection_pool.close()
except Exception as x:
import traceback
print(traceback.format_exc())
if session is not None:
session.release()
exit(1)
创建索引:
CREATE tag INDEX weather_concept_index ON weather_concept(name(100));
REBUILD TAG INDEX weather_concept_index;
创建多属性顶点示例代码:
from SPARQLWrapper import SPARQLWrapper, JSON
from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config
from nebula3.common import *
from FormatResp import print_resp
from nebula3.mclient import MetaCache, HostAddr
from nebula3.sclient.GraphStorageClient import GraphStorageClient
QueryService= SPARQLWrapper("http://XXX.XXX.XXX.XXX:3030/hhp20240119/query") # 查询TDB数据库连接
#查询TDB数据库
def QueryTDBData(Sql):
#连接数据库
QueryService.setQuery(Sql)
QueryService.setReturnFormat(JSON)
QueryService.setMethod('POST')
results = QueryService.query().convert()["results"]["bindings"] # 返回查询结果
return results
if __name__ == "__main__":
try:
sql = "prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> \
prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> \
prefix owl: <http://www.w3.org/2002/07/owl#> \
prefix xsd: <http://www.w3.org/2001/XMLSchema#> \
select distinct ?attr_name \
WHERE { \
?subject ?predicate ?object . \
?subject rdfs:label ?s_name . \
?subject rdf:type ?ddd . \
?ddd rdf:type rdfs:Class . \
?ddd rdfs:label '站点' . \
?predicate rdf:type rdf:Property . \
?predicate rdfs:label ?attr_name } order by ?attr_name"
results= QueryTDBData(sql)
# define a nebula config
config = Config()
config.max_connection_pool_size = 10
# init connection pool
connection_pool = ConnectionPool()
# if the given servers are ok, return true, else return false
ok = connection_pool.init([('XXX.XXX.XXX.XXX', 9669)], config)
# option 1 control the connection release yourself
# get session from the pool
session = connection_pool.get_session('root', '123456')
session.execute('USE demospace')
#创建站网标签
#session.execute('CREATE TAG IF NOT EXISTS stationNet(name string,code string,descNet string)')
# 创建标签
label_name = "`站点`"
attr_array_str = ""
attr_array =[]
attr_json = {}
create_label_query = "CREATE TAG IF NOT EXISTS "+label_name+"(name string,"
for cur in results:
name = cur["attr_name"]["value"]
create_label_query += "`"+name+"` string,"
attr_array_str += "`"+name+"`,"
attr_array.append(name)
attr_json[name] = ""
create_label_query =create_label_query.rstrip(create_label_query[-1])
attr_array_str = attr_array_str.rstrip(attr_array_str[-1])
create_label_query +=")"
print(create_label_query)
session.execute(create_label_query)
sql = "prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> \
prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> \
prefix owl: <http://www.w3.org/2002/07/owl#> \
prefix xsd: <http://www.w3.org/2001/XMLSchema#> \
select distinct ?s_name ?attr_name ?object \
WHERE { \
?subject ?predicate ?object . \
?subject rdfs:label ?s_name . \
?subject rdf:type ?ddd . \
?ddd rdf:type rdfs:Class . \
?ddd rdfs:label '站点' . \
?predicate rdf:type rdf:Property . \
?predicate rdfs:label ?attr_name } order by ?s_name ?attr_name "
results= QueryTDBData(sql)
oldname = ""
jsonloop = attr_json
for cur in results:
name = cur["s_name"]["value"]
attr = cur["attr_name"]["value"]
attr_v = cur["object"]["value"]
if oldname == "":
oldname = name
jsonloop[attr] = attr_v
elif name == oldname :
jsonloop[attr] = attr_v
if cur == results[-1]:
insertVertex = 'INSERT VERTEX IF NOT EXISTS '+label_name+'(name,'+attr_array_str+') VALUES "' +\
oldname+'":("'+oldname+'",'
array_value =""
for xx in attr_array:
array_value += '"'+ jsonloop[xx]+'"'+","
array_value = array_value.rstrip(array_value[-1]) +")"
insertVertex += array_value
print(insertVertex)
session.execute(insertVertex)
else: #一个站网循环结束,写入顶点oldname
insertVertex = 'INSERT VERTEX IF NOT EXISTS '+label_name+'(name,'+attr_array_str+') VALUES "' +\
oldname+'":("'+oldname+'",'
array_value =""
for xx in attr_array:
array_value += '"'+ jsonloop[xx]+'"'+","
array_value = array_value.rstrip(array_value[-1]) +")"
insertVertex += array_value
print(insertVertex)
session.execute(insertVertex)
for key in jsonloop:
# 将属性的值设为空(这里设为None)
jsonloop[key] = ""
oldname = name
jsonloop[attr] = attr_v
session.release()
connection_pool.close()
except Exception as x:
import traceback
print(traceback.format_exc())
if session is not None:
session.release()
exit(1)
创建通用关系(包括本体与class,本体之间):
from SPARQLWrapper import SPARQLWrapper, JSON
from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config
from nebula3.common import *
from FormatResp import print_resp
from nebula3.mclient import MetaCache, HostAddr
from nebula3.sclient.GraphStorageClient import GraphStorageClient
QueryService= SPARQLWrapper("http://XXX.XXX.XXX.XXX:3030/hhp20240119/query") # 查询TDB数据库连接
#查询TDB数据库
def QueryTDBData(Sql):
#连接数据库
QueryService.setQuery(Sql)
QueryService.setReturnFormat(JSON)
QueryService.setMethod('POST')
results = QueryService.query().convert()["results"]["bindings"] # 返回查询结果
return results
if __name__ == "__main__":
try:
sql = "prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> \
prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> \
prefix owl: <http://www.w3.org/2002/07/owl#> \
prefix xsd: <http://www.w3.org/2001/XMLSchema#> \
SELECT distinct ?r_name \
WHERE { \
?s_class rdf:type rdfs:Class . \
?s rdf:type ?s_class . \
?s ?relation_uri ?object . \
?s rdfs:label ?s_name . \
?object rdfs:label ?o_name . \
?relation_uri rdfs:label ?r_name . \
?relation_uri rdf:type <http://www.w3.org/1999/02/22-rdf-syntax-ns#Relation>}"
#results= QueryTDBData(sql)
# define a nebula config
config = Config()
config.max_connection_pool_size = 10
# init connection pool
connection_pool = ConnectionPool()
# if the given servers are ok, return true, else return false
ok = connection_pool.init([('XXX.XXX.XXX.XXX', 9669)], config)
# option 1 control the connection release yourself
# get session from the pool
session = connection_pool.get_session('root', '123456')
session.execute('USE demospace')
# for cur in results:
# name = cur["r_name"]["value"]
# # 创建weather_concept标签
# nebula_sql = 'CREATE EDGE IF NOT EXISTS `'+name+'`(name string)'
# print(nebula_sql)
# session.execute(nebula_sql)
sql = "prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> \
prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> \
prefix owl: <http://www.w3.org/2002/07/owl#> \
prefix xsd: <http://www.w3.org/2001/XMLSchema#> \
SELECT distinct ?s_name ?r_name ?o_name \
WHERE { \
?s_class rdf:type rdfs:Class . \
?s rdf:type ?s_class . \
?s ?relation_uri ?object . \
?s rdfs:label ?s_name . \
?object rdfs:label ?o_name . \
?relation_uri rdfs:label ?r_name . \
?relation_uri rdf:type <http://www.w3.org/1999/02/22-rdf-syntax-ns#Relation>}"
results1= QueryTDBData(sql)
for cur in results1:
s_name = cur["s_name"]["value"]
r_name = cur["r_name"]["value"]
o_name = cur["o_name"]["value"]
# 创建weather_concept标签
sql = 'INSERT EDGE `'+r_name+'`(name) VALUES "'+s_name+'"->"'+o_name+'":("'+r_name+'")'
print(sql)
resp1 = session.execute(sql)
session.release()
connection_pool.close()
except Exception as x:
import traceback
print(traceback.format_exc())
if session is not None:
session.release()
exit(1)
小技巧:属性如果为中文名的话,必须用反引号括起来,否则失败,顶点名称和表名称可带双引或不带,均可,查询出来是一样的