nebula-python连接被接连拒接,出现问题

应该是我程序端口的问题,以下提供一个成功测试示例程序(内含批量插入和非批量插入):

#!/usr/bin/env python
# --coding:utf-8--

# Copyright (c) 2020 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.

import time
import json
import prettytable

from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config

"""
导入数据时对属性
"""

def print_resp(resp):
    assert resp.is_succeeded()
    output_table = prettytable.PrettyTable()
    output_table.field_names = resp.keys()
    for recode in resp:
        value_list = []
        for col in recode:
            if col.is_empty():
                value_list.append('__EMPTY__')
            elif col.is_null():
                value_list.append('__NULL__')
            elif col.is_bool():
                value_list.append(col.as_bool())
            elif col.is_int():
                value_list.append(col.as_int())
            elif col.is_double():
                value_list.append(col.as_double())
            elif col.is_string():
                value_list.append(col.as_string())
            elif col.is_time():
                value_list.append(col.as_time())
            elif col.is_date():
                value_list.append(col.as_date())
            elif col.is_datetime():
                value_list.append(col.as_datetime())
            elif col.is_list():
                value_list.append(col.as_list())
            elif col.is_set():
                value_list.append(col.as_set())
            elif col.is_map():
                value_list.append(col.as_map())
            elif col.is_vertex():
                value_list.append(col.as_node())
            elif col.is_edge():
                value_list.append(col.as_relationship())
            elif col.is_path():
                value_list.append(col.as_path())
            elif col.is_geography():
                value_list.append(col.as_geography())
            else:
                print('ERROR: Type unsupported')
                return
        output_table.add_row(value_list)
    print(output_table)


"""
读取数据文件,返回参数numbers, relations
"""


def read_datas():
    with open(NUMBER_FILE, encoding='utf-8') as f:
        numbers = f.readlines()
    with open(RELATION_FILE, encoding='utf-8') as f:
        relations = f.readlines()
    return numbers, relations

def create_sql():
    create_sql = 'CREATE SPACE IF NOT EXISTS demo' \
                 '(partition_num = 10, replica_factor = 1, charset = utf8, collate = utf8_bin, vid_type = FIXED_STRING(32)); ' \
                 'USE demo; ' \
                 'CREATE TAG IF NOT EXISTS number();' \
                 'CREATE EDGE `relation` ( `beginTime` datetime NOT NULL) ttl_duration = 0, ttl_col = "";'
    return create_sql


# 批量插入node节点函数
def import_node(datas, client):
    values = []
    counter = 0

    for number in datas[0]:
        values.append(f"'{number.rstrip()}' : ()")
        counter += 1

        if counter == 20:
            values_str = ','.join(values)
            nsql = f'INSERT VERTEX number() VALUES {values_str};'
            resp = client.execute(nsql)

            # 重置计数器和值列表
            counter = 0
            values = []

    # 处理剩余不足 20 个的数据
    if counter > 0:
        values_str = ','.join(values)
        nsql = f'INSERT VERTEX number() VALUES {values_str};'
        resp = client.execute(nsql)
    return resp


"""
主函数:
"""
NUMBER_FILE = r'C:\Users\叼着零食打架\Desktop\nebual数据处理\测试数据\number.csv'
RELATION_FILE = r'C:\Users\叼着零食打架\Desktop\nebual数据处理\测试数据\relation.csv'
# nebula图数据库连接: ip / port
NEBULA_IP = '192.168.120.173'
NEBULA_PORT = 9669

if __name__ == '__main__':
    client = None
try:
    config = Config()
    config.max_connection_pool_size = 5
    # init connection pool
    connection_pool = ConnectionPool()
    assert connection_pool.init([(NEBULA_IP, NEBULA_PORT)], config)

    # get session from the pool
    client = connection_pool.get_session('root', 'nebula')
    assert client is not None

    # get the result in json format
    resp_json = client.execute_json("yield 1")
    json_obj = json.loads(resp_json)
    # print(json.dumps(json_obj, indent=2, sort_keys=True))

    # 获取创建图空间,顶点,关系边开始时间
    start_time1 = time.time()
    # 创建spcae图空间
    resp = client.execute(create_sql())
    assert resp.is_succeeded(), resp.error_msg()
    # 需要创建后等待一会时间,经测试6s有时还是会报错,导入数据时可能会报节点不存在。
    time.sleep(10)
    # 获取程序结束时间
    end_time1 = time.time()
    print("导入数据顶点和关系边花费时间:", (end_time1 - start_time1), "秒")

    """
    ----------------------------------------------------------------------------
    """
    # 读取结点和关系边数据。
    datas = read_datas()

    # 获取开始导入数据时间
    start_time2 = time.time()
    # 节点数据导入图空间
    resp = import_node(datas, client)
    assert resp.is_succeeded(), resp.error_msg()


    # 关系边数据导入图空间(非批量插入数据)
    for relation in datas[1]:
        relation = relation.rstrip().split('|')
        nsql_edge = 'INSERT EDGE relation(beginTime) VALUES "%s"->"%s":(datetime("%s"));' % (relation[0], relation[1], relation[2])
        resp = client.execute(nsql_edge)
        assert resp.is_succeeded(), resp.error_msg()
    # 获取数据导入数据结束时间
    end_time2 = time.time()
    print("导入数据顶点和关系边花费时间:", (end_time2 - start_time2), "秒")

    # resp = client.execute('FETCH PROP ON person "Bob" YIELD vertex as 源测试数据')
    # assert resp.is_succeeded(), resp.error_msg()
    # print_resp(resp)
    #
    # resp = client.execute('FETCH PROP ON like "Bob"->"Lily" YIELD edge as e')
    # assert resp.is_succeeded(), resp.error_msg()
    # print_resp(resp)

    # drop space
    # resp = client.execute('DROP SPACE test')
    # assert resp.is_succeeded(), resp.error_msg()
    print("数据导入结束!")

except Exception as x:
    import traceback

    print(traceback.format_exc())
    if client is not None:
        client.release()
    exit(1)

1 个赞