nebula3-python INSERT VERTEX如何类似于动态SQL批量插入数据

请问nebula-python能否像动态SQL那样导入数据呢,应该怎样写呢,自己拼接字符串吗?

#!/usr/bin/env python
# --coding:utf-8--

# Copyright (c) 2020 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.

import time
import json
import prettytable

from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config


def print_resp(resp):
    assert resp.is_succeeded()
    output_table = prettytable.PrettyTable()
    output_table.field_names = resp.keys()
    for recode in resp:
        value_list = []
        for col in recode:
            if col.is_empty():
                value_list.append('__EMPTY__')
            elif col.is_null():
                value_list.append('__NULL__')
            elif col.is_bool():
                value_list.append(col.as_bool())
            elif col.is_int():
                value_list.append(col.as_int())
            elif col.is_double():
                value_list.append(col.as_double())
            elif col.is_string():
                value_list.append(col.as_string())
            elif col.is_time():
                value_list.append(col.as_time())
            elif col.is_date():
                value_list.append(col.as_date())
            elif col.is_datetime():
                value_list.append(col.as_datetime())
            elif col.is_list():
                value_list.append(col.as_list())
            elif col.is_set():
                value_list.append(col.as_set())
            elif col.is_map():
                value_list.append(col.as_map())
            elif col.is_vertex():
                value_list.append(col.as_node())
            elif col.is_edge():
                value_list.append(col.as_relationship())
            elif col.is_path():
                value_list.append(col.as_path())
            elif col.is_geography():
                value_list.append(col.as_geography())
            else:
                print('ERROR: Type unsupported')
                return
        output_table.add_row(value_list)
    print(output_table)


def read_datas():
    number_file = r'C:\Users\叼着零食打架\Desktop\nebual数据处理\测试数据\number.csv'
    relation_file = r'C:\Users\叼着零食打架\Desktop\nebual数据处理\测试数据\relation.csv'
    with open(number_file, encoding='utf-8') as f:
        numbers = f.readlines()
    with open(relation_file, encoding='utf-8') as f:
        relations = f.readlines()
    return numbers, relations


# if __name__ == '__main__':
#     print(read_datas())

if __name__ == '__main__':
    client = None
try:
    config = Config()
    config.max_connection_pool_size = 2
    # init connection pool
    connection_pool = ConnectionPool()
    assert connection_pool.init([('192.168.120.173', 9669)], config)

    # get session from the pool
    client = connection_pool.get_session('root', 'nebula')
    assert client is not None

    # get the result in json format
    resp_json = client.execute_json("yield 1")
    json_obj = json.loads(resp_json)
    print(json.dumps(json_obj, indent=2, sort_keys=True))

    client.execute(
        'CREATE SPACE IF NOT EXISTS demo(vid_type=FIXED_STRING(30)); USE demo;'
        'CREATE TAG IF NOT EXISTS number(number string);'
        'CREATE EDGE relation (relation string, beginTime, datetime);'
    )

    # insert data need to sleep after create schema
    time.sleep(6)

    # insert vertex
    resp = client.execute('INSERT VERTEX number(number) VALUES "number":("11111111111"), "number":("22222222222")')
    assert resp.is_succeeded(), resp.error_msg()

    # insert edges
    resp = client.execute('INSERT EDGE relation(relation, beginTime) VALUES "Bob"->"Lily":(relation, beginTime);')
    assert resp.is_succeeded(), resp.error_msg()

    # resp = client.execute('FETCH PROP ON person "Bob" YIELD vertex as node')
    # assert resp.is_succeeded(), resp.error_msg()
    # print_resp(resp)
    #
    # resp = client.execute('FETCH PROP ON like "Bob"->"Lily" YIELD edge as e')
    # assert resp.is_succeeded(), resp.error_msg()
    # print_resp(resp)

    # drop space
    # resp = client.execute('DROP SPACE test')
    # assert resp.is_succeeded(), resp.error_msg()

    print("Example finished")

except Exception as x:
    import traceback

    print(traceback.format_exc())
    if client is not None:
        client.release()
    exit(1)

直接写nGQL语句就只能拼字符串了,另外有一个python的ORM项目你可以看看有没有帮助:
https://github.com/nebula-contrib/nebula-carina

1 个赞

嗯呢,感谢您的建议,这边使用拼接nsql封装了下方法,贴一下哈。

# 创建图空间nsql语句
def create_sql():
    create_sql = 'CREATE SPACE IF NOT EXISTS demo' \
                 '(partition_num = 10, replica_factor = 1, charset = utf8, collate = utf8_bin, vid_type = FIXED_STRING(32)); ' \
                 'USE demo; ' \
                 'CREATE TAG IF NOT EXISTS number();' \
                 'CREATE EDGE `relation` ( `beginTime` datetime NOT NULL) ttl_duration = 0, ttl_col = "";'
    return create_sql


# 插入node节点函数
def import_node(datas, client):
    values = []
    counter = 0

    for number in datas[0]:
        values.append(f"'{number.rstrip()}' : ()")
        counter += 1

        if counter == 20:
            values_str = ','.join(values)
            nsql_node = f'INSERT VERTEX number() VALUES {values_str};'
            resp = client.execute(nsql_node)

            # 重置计数器和值列表
            counter = 0
            values = []

    # 处理剩余不足 20 个的数据
    if counter > 0:
        values_str = ','.join(values)
        nsql_node = f'INSERT VERTEX number() VALUES {values_str};'
        resp = client.execute(nsql_node)
    return resp


# 插入edge数据
def import_edge(datas, client):
    values = []
    counter = 0

    # 关系边数据导入图空间
    for relation in datas[1]:
        relation = relation.rstrip().split('|')
        values.append(f"'{relation[0]}'-> '{relation[1]}':(datetime('{relation[2]}'))")
        counter += 1

        if counter == 20:
            values_str = ','.join(values)
            nsql_edge = f'INSERT EDGE relation(beginTime) VALUES  {values_str};'
            print(nsql_edge)
            resp = client.execute(nsql_edge)

            # 重置计数器和值列表
            counter = 0
            values = []

    # 处理剩余不足 20 个的数据
    if counter > 0:
        values_str = ','.join(values)
        nsql_edge = f'INSERT EDGE relation(beginTime) VALUES {values_str};'
        resp = client.execute(nsql_edge)
    return resp

此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。