通过 SPARQL 将本体从其他类型图谱库中(TDB, Neo4j)导入 NebulaGraph 的示例代码

给出我在项目中做的从其他图谱库中通过SPARQL导入本体和关系到NEBULA的代码,供各位参考:

from SPARQLWrapper import SPARQLWrapper, JSON
from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config

from nebula3.common import *

from nebula3.mclient import MetaCache, HostAddr
from nebula3.sclient.GraphStorageClient import GraphStorageClient

QueryService= SPARQLWrapper("http://XXX.XXX.XXX.XXX:3030/hhp20240119/query")  # 查询TDB数据库连接

#查询TDB数据库
def QueryTDBData(Sql):
    #连接数据库
    QueryService.setQuery(Sql)
    QueryService.setReturnFormat(JSON)
    QueryService.setMethod('POST')
    results = QueryService.query().convert()["results"]["bindings"]  # 返回查询结果
    return results

if __name__ == "__main__":
    try:
        #检索顶点所需本体的名称,此特例只有一个属性
        sql = "prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> \
                   prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> \
                   prefix owl: <http://www.w3.org/2002/07/owl#> \
                   prefix xsd: <http://www.w3.org/2001/XMLSchema#> \
                   SELECT distinct  ?s_name  \
                   WHERE { \
                     ?subject ?predicate ?object .  \
                     ?subject rdfs:label ?s_name .  \
                     ?subject rdf:type rdfs:Class }"     
        results= QueryTDBData(sql)
        # define a nebula config
        config = Config()
        config.max_connection_pool_size = 10
        # init connection pool
        connection_pool = ConnectionPool()
        # if the given servers are ok, return true, else return false
        ok = connection_pool.init([('XXX.XXX.XXX.XXX', 9669)], config)
        # option 1 control the connection release yourself
        # get session from the pool
        session = connection_pool.get_session('root', '123456')
        
        session.execute('USE demospace')
        
        session.execute('CREATE TAG IF NOT EXISTS weather_concept(name string)')
        session.execute('CREATE EDGE include (name string)')
        for cur in results: 
            name =  cur["s_name"]["value"]
            # 创建weather_concept标签
            nebula_sql = 'INSERT VERTEX weather_concept(name) VALUES "'+name +'":("'+name+'")'
            print(nebula_sql)
            resp = session.execute(nebula_sql)
            
        #为创建顶点关系,检索三元组    
        sql =  "prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> \
                   prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> \
                   prefix owl: <http://www.w3.org/2002/07/owl#> \
                   prefix xsd: <http://www.w3.org/2001/XMLSchema#> \
                   SELECT distinct  ?s_name  ?predicate ?o_name  \
                   WHERE {  \
                      ?subject ?predicate ?object .  \
                      ?subject rdfs:label ?s_name .  \
                      ?object rdfs:label ?o_name .  \
                      ?subject rdf:type rdfs:Class .  \
                      ?object rdf:type rdfs:Class .} "   
        results1= QueryTDBData(sql)
        for cur in results1: 
            s_name =  cur["s_name"]["value"]
            o_name = cur["o_name"]["value"]
            rela = cur["predicate"]["value"]
            if rela == "http://www.w3.org/2000/01/rdf-schema#subClassOf" or rela == "https://piesat.cn/Knowledge_map/relation#隶属":
                # 创建weather_concept标签
                sql = 'INSERT EDGE include(name) VALUES "'+o_name+'"->"'+s_name+'":("包含")'
                resp1 = session.execute(sql)
                  
                print(sql) 
    
        session.release()
        connection_pool.close()
    
    except Exception as x:
        import traceback

        print(traceback.format_exc())
        if session is not None:
            session.release()
        exit(1)

创建索引:

CREATE tag INDEX weather_concept_index ON weather_concept(name(100));
REBUILD TAG INDEX weather_concept_index;

创建多属性顶点示例代码:

from SPARQLWrapper import SPARQLWrapper, JSON
from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config

from nebula3.common import *
from FormatResp import print_resp

from nebula3.mclient import MetaCache, HostAddr
from nebula3.sclient.GraphStorageClient import GraphStorageClient

QueryService= SPARQLWrapper("http://XXX.XXX.XXX.XXX:3030/hhp20240119/query")  # 查询TDB数据库连接

#查询TDB数据库
def QueryTDBData(Sql):
    #连接数据库
    QueryService.setQuery(Sql)
    QueryService.setReturnFormat(JSON)
    QueryService.setMethod('POST')
    results = QueryService.query().convert()["results"]["bindings"]  # 返回查询结果
    return results

if __name__ == "__main__":
    try:
        
        sql = "prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> \
                   prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> \
                   prefix owl: <http://www.w3.org/2002/07/owl#> \
                   prefix xsd: <http://www.w3.org/2001/XMLSchema#> \
                   select distinct ?attr_name \
                    WHERE {  \
                      ?subject ?predicate ?object . \
                      ?subject rdfs:label ?s_name . \
                      ?subject rdf:type ?ddd . \
                      ?ddd rdf:type rdfs:Class . \
                      ?ddd rdfs:label '站点' . \
                      ?predicate  rdf:type     rdf:Property . \
                      ?predicate  rdfs:label     ?attr_name  } order by ?attr_name"     
        results= QueryTDBData(sql)
        # define a nebula config
        config = Config()
        config.max_connection_pool_size = 10
        # init connection pool
        connection_pool = ConnectionPool()
        # if the given servers are ok, return true, else return false
        ok = connection_pool.init([('XXX.XXX.XXX.XXX', 9669)], config)
        # option 1 control the connection release yourself
        # get session from the pool
        session = connection_pool.get_session('root', '123456')
        
        session.execute('USE demospace')
        #创建站网标签
        #session.execute('CREATE TAG IF NOT EXISTS stationNet(name string,code string,descNet string)')
        # 创建标签  
        label_name = "`站点`"  
        attr_array_str = ""
        attr_array =[]
        attr_json = {} 
        create_label_query = "CREATE TAG IF NOT EXISTS "+label_name+"(name string,"
        for cur in results: 
            name = cur["attr_name"]["value"]
            create_label_query += "`"+name+"` string,"
            attr_array_str += "`"+name+"`,"
            attr_array.append(name)
            attr_json[name] = ""
        create_label_query =create_label_query.rstrip(create_label_query[-1])  
        attr_array_str = attr_array_str.rstrip(attr_array_str[-1])  
        create_label_query +=")"
        print(create_label_query)
        session.execute(create_label_query)  

        sql = "prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> \
           prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> \
           prefix owl: <http://www.w3.org/2002/07/owl#> \
           prefix xsd: <http://www.w3.org/2001/XMLSchema#> \
           select distinct ?s_name  ?attr_name ?object \
            WHERE {  \
              ?subject ?predicate ?object . \
              ?subject rdfs:label ?s_name . \
              ?subject rdf:type ?ddd . \
              ?ddd rdf:type rdfs:Class . \
              ?ddd rdfs:label '站点' . \
              ?predicate  rdf:type     rdf:Property . \
              ?predicate  rdfs:label     ?attr_name  } order by ?s_name ?attr_name "   
        results= QueryTDBData(sql)
        
        oldname = ""
        jsonloop = attr_json
        for cur in results:
            name =  cur["s_name"]["value"]
            attr = cur["attr_name"]["value"]
            attr_v = cur["object"]["value"]
            if oldname == "":
                oldname = name
                jsonloop[attr] = attr_v 
            elif name == oldname :   
                jsonloop[attr] = attr_v
                if cur == results[-1]:
                    insertVertex = 'INSERT VERTEX IF NOT EXISTS '+label_name+'(name,'+attr_array_str+') VALUES "' +\
                    oldname+'":("'+oldname+'",'
                    array_value =""
                    for xx in attr_array:
                        array_value += '"'+ jsonloop[xx]+'"'+","
                    array_value = array_value.rstrip(array_value[-1]) +")"
                    insertVertex += array_value
                    print(insertVertex)
                    session.execute(insertVertex)
            else:   #一个站网循环结束,写入顶点oldname
                insertVertex = 'INSERT VERTEX IF NOT EXISTS '+label_name+'(name,'+attr_array_str+') VALUES "' +\
                oldname+'":("'+oldname+'",'
                array_value =""
                for xx in attr_array:
                    array_value += '"'+ jsonloop[xx]+'"'+","
                array_value = array_value.rstrip(array_value[-1]) +")"
                insertVertex += array_value
                print(insertVertex)
                session.execute(insertVertex)
                for key in jsonloop:
                    # 将属性的值设为空(这里设为None)
                    jsonloop[key] = ""
                oldname = name
                jsonloop[attr] = attr_v 
                       
        session.release()
        connection_pool.close()
    
    except Exception as x:
        import traceback

        print(traceback.format_exc())
        if session is not None:
            session.release()
        exit(1)

创建通用关系(包括本体与class,本体之间):

from SPARQLWrapper import SPARQLWrapper, JSON
from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config

from nebula3.common import *
from FormatResp import print_resp

from nebula3.mclient import MetaCache, HostAddr
from nebula3.sclient.GraphStorageClient import GraphStorageClient

QueryService= SPARQLWrapper("http://XXX.XXX.XXX.XXX:3030/hhp20240119/query")  # 查询TDB数据库连接

#查询TDB数据库
def QueryTDBData(Sql):
    #连接数据库
    QueryService.setQuery(Sql)
    QueryService.setReturnFormat(JSON)
    QueryService.setMethod('POST')
    results = QueryService.query().convert()["results"]["bindings"]  # 返回查询结果
    return results

if __name__ == "__main__":
    try:
        sql = "prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> \
                   prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> \
                   prefix owl: <http://www.w3.org/2002/07/owl#> \
                   prefix xsd: <http://www.w3.org/2001/XMLSchema#> \
                   SELECT distinct  ?r_name \
                    WHERE { \
                      ?s_class rdf:type rdfs:Class . \
                      ?s rdf:type ?s_class . \
                      ?s  ?relation_uri ?object . \
                      ?s rdfs:label ?s_name  . \
                      ?object rdfs:label ?o_name  . \
                      ?relation_uri rdfs:label ?r_name . \
                     ?relation_uri rdf:type <http://www.w3.org/1999/02/22-rdf-syntax-ns#Relation>}"   
        #results= QueryTDBData(sql)
        # define a nebula config
        config = Config()
        config.max_connection_pool_size = 10
        # init connection pool
        connection_pool = ConnectionPool()
        # if the given servers are ok, return true, else return false
        ok = connection_pool.init([('XXX.XXX.XXX.XXX', 9669)], config)
        # option 1 control the connection release yourself
        # get session from the pool
        session = connection_pool.get_session('root', '123456')
        
        session.execute('USE demospace')
#         for cur in results: 
#             name =  cur["r_name"]["value"]
#             # 创建weather_concept标签
#             nebula_sql = 'CREATE EDGE IF NOT EXISTS `'+name+'`(name string)'
#             print(nebula_sql)
#             session.execute(nebula_sql)
    
        sql = "prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> \
                   prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> \
                   prefix owl: <http://www.w3.org/2002/07/owl#> \
                   prefix xsd: <http://www.w3.org/2001/XMLSchema#> \
                   SELECT distinct ?s_name ?r_name ?o_name \
                    WHERE { \
                      ?s_class rdf:type rdfs:Class . \
                      ?s rdf:type ?s_class . \
                      ?s  ?relation_uri ?object . \
                      ?s rdfs:label ?s_name  . \
                      ?object rdfs:label ?o_name  . \
                      ?relation_uri rdfs:label ?r_name . \
                     ?relation_uri rdf:type <http://www.w3.org/1999/02/22-rdf-syntax-ns#Relation>}"   
           
        results1= QueryTDBData(sql)
        for cur in results1: 
            s_name =  cur["s_name"]["value"]
            r_name = cur["r_name"]["value"]
            o_name = cur["o_name"]["value"]
        
            # 创建weather_concept标签
            sql = 'INSERT EDGE `'+r_name+'`(name) VALUES "'+s_name+'"->"'+o_name+'":("'+r_name+'")'
            print(sql)
            resp1 = session.execute(sql)
    
        session.release()
        connection_pool.close()
    
    except Exception as x:
        import traceback

        print(traceback.format_exc())
        if session is not None:
            session.release()
        exit(1)

小技巧:属性如果为中文名的话,必须用反引号括起来,否则失败,顶点名称和表名称可带双引或不带,均可,查询出来是一样的

2 个赞