nebula-python连接被接连拒接,出现问题

在社区中找到过这个问题,但是现象与我的不同:

https://discuss.nebula-graph.com.cn/t/topic/6736

使用Python连接虚拟机上docker运行的nebula时报错,程序如下:

from nebula3.mclient import MetaCache, HostAddr
from nebula3.sclient.GraphStorageClient import GraphStorageClient

# the metad servers's address
meta_cache = MetaCache([('192.168.120.173', 9559)],50000)

# option 1 metad usually discover the storage address automatically
graph_storage_client = GraphStorageClient(meta_cache)

# option 2 manually specify the storage address
storage_addrs = [HostAddr(host='192.168.120.173', port=9779)]
graph_storage_client = GraphStorageClient(meta_cache, storage_addrs)

resp = graph_storage_client.scan_vertex(
    space_name='test',
    tag_name='test')
while resp.has_next():
    result = resp.next()
    for vertex_data in result:
        print(vertex_data)
C:\Users\叼着零食打架\Desktop\nebual数据处理\nebual数据处理\Scripts\python.exe C:\Users\叼着零食打架\Desktop\nebual数据处理\nebula.py 
Traceback (most recent call last):
  File "C:\Users\叼着零食打架\Desktop\nebual数据处理\nebual数据处理\lib\site-packages\nebula3\fbthrift\transport\TSocket.py", line 286, in open
    handle.connect(address)
ConnectionRefusedError: [WinError 10061] 由于目标计算机积极拒绝,无法连接。

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\叼着零食打架\Desktop\nebual数据处理\nebula.py", line 5, in <module>
    meta_cache = MetaCache([('192.168.120.173', 9559)],50000)
  File "C:\Users\叼着零食打架\Desktop\nebual数据处理\nebual数据处理\lib\site-packages\nebula3\mclient\__init__.py", line 280, in __init__
    self._meta_client.open()
  File "C:\Users\叼着零食打架\Desktop\nebual数据处理\nebual数据处理\lib\site-packages\nebula3\mclient\__init__.py", line 64, in open
    transport.open()
  File "C:\Users\叼着零食打架\Desktop\nebual数据处理\nebual数据处理\lib\site-packages\nebula3\fbthrift\transport\TTransport.py", line 173, in open
    return self.__trans.open()
  File "C:\Users\叼着零食打架\Desktop\nebual数据处理\nebual数据处理\lib\site-packages\nebula3\fbthrift\transport\TSocket.py", line 301, in open
    raise TTransportException(TTransportException.NOT_OPEN, msg)
nebula3.fbthrift.transport.TTransport.TTransportException: socket error connecting to host 192.168.120.173, port 9559 (('192.168.120.173', 9559)): ConnectionRefusedError(10061, '由于目标计算机积极拒绝,无法连接。', None, 10061, None)

防火墙已关闭,nebula版本为3.4.0,单机测试,使用的是docker部署,上面测试代码ip为宿主机ip

1、nebula图数据docker部署运行正常,可视化nebula-graph-studio本地访问虚拟机的nebula正常。


报错程序1:

from nebula3.mclient import MetaCache, HostAddr
from nebula3.sclient.GraphStorageClient import GraphStorageClient

# the metad servers's address
meta_cache = MetaCache([('192.168.120.173', 9559)],50000)

# option 1 metad usually discover the storage address automatically
graph_storage_client = GraphStorageClient(meta_cache)

# option 2 manually specify the storage address
storage_addrs = [HostAddr(host='192.168.120.173', port=9779)]
graph_storage_client = GraphStorageClient(meta_cache, storage_addrs)

resp = graph_storage_client.scan_vertex(
    space_name='test',
    tag_name='demo')
while resp.has_next():
    result = resp.next()
    for vertex_data in result:
        print(vertex_data)

报错程序2:

from nebula3.mclient import MetaCache, HostAddr
from nebula3.sclient.GraphStorageClient import GraphStorageClient

# the metad servers's address
meta_cache = MetaCache([('192.168.120.173', 49191),
                        ('192.168.120.173', 49158),
                        ('192.168.120.173', 49188)],50000)

# option 1 metad usually discover the storage address automatically
graph_storage_client = GraphStorageClient(meta_cache)

# option 2 manually specify the storage address
storage_addrs = [HostAddr(host='192.168.120.173', port=49200),
                 HostAddr(host='192.168.120.173', port=49194),
                 HostAddr(host='192.168.120.173', port=49197)]
graph_storage_client = GraphStorageClient(meta_cache, storage_addrs)

resp = graph_storage_client.scan_vertex(
    space_name='test',
    tag_name='demo')
while resp.has_next():
    result = resp.next()
    for vertex_data in result:
        print(vertex_data)

报错程序3:

from nebula3.mclient import MetaCache, HostAddr
from nebula3.sclient.GraphStorageClient import GraphStorageClient

# the metad servers's address
meta_cache = MetaCache([('192.168.120.173', 9560),
                        ('192.168.120.173', 9560),
                        ('192.168.120.173', 9560)],50000)

# option 1 metad usually discover the storage address automatically
graph_storage_client = GraphStorageClient(meta_cache)

# option 2 manually specify the storage address
storage_addrs = [HostAddr(host='192.168.120.173', port=9780),
                 HostAddr(host='192.168.120.173', port=9780),
                 HostAddr(host='192.168.120.173', port=9780)]
graph_storage_client = GraphStorageClient(meta_cache, storage_addrs)

resp = graph_storage_client.scan_vertex(
    space_name='test',
    tag_name='demo')
while resp.has_next():
    result = resp.next()
    for vertex_data in result:
        print(vertex_data)

上面均报错:


故docker部署的nebula通过存储访问空间顶点和边时程序应该访问的哪个端口呢?

2、在通过下面的程序连接图数据库时程序正常访问。

from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config

# define a config
config = Config()
config.max_connection_pool_size = 10
# init connection pool
connection_pool = ConnectionPool()
# if the given servers are ok, return true, else return false
ok = connection_pool.init([('192.168.120.173', 49178)], config)

# option 1 control the connection release yourself
# get session from the pool
session = connection_pool.get_session('root', '123')

# select space
session.execute('USE test')

# show tags
result = session.execute('SHOW TAGS')
print(result)

# release session
session.release()

# close the pool
connection_pool.close()

连接被拒的原因和解决方法参考帖子:

应该是我程序端口的问题,以下提供一个成功测试示例程序(内含批量插入和非批量插入):

#!/usr/bin/env python
# --coding:utf-8--

# Copyright (c) 2020 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.

import time
import json
import prettytable

from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config

"""
导入数据时对属性
"""

def print_resp(resp):
    assert resp.is_succeeded()
    output_table = prettytable.PrettyTable()
    output_table.field_names = resp.keys()
    for recode in resp:
        value_list = []
        for col in recode:
            if col.is_empty():
                value_list.append('__EMPTY__')
            elif col.is_null():
                value_list.append('__NULL__')
            elif col.is_bool():
                value_list.append(col.as_bool())
            elif col.is_int():
                value_list.append(col.as_int())
            elif col.is_double():
                value_list.append(col.as_double())
            elif col.is_string():
                value_list.append(col.as_string())
            elif col.is_time():
                value_list.append(col.as_time())
            elif col.is_date():
                value_list.append(col.as_date())
            elif col.is_datetime():
                value_list.append(col.as_datetime())
            elif col.is_list():
                value_list.append(col.as_list())
            elif col.is_set():
                value_list.append(col.as_set())
            elif col.is_map():
                value_list.append(col.as_map())
            elif col.is_vertex():
                value_list.append(col.as_node())
            elif col.is_edge():
                value_list.append(col.as_relationship())
            elif col.is_path():
                value_list.append(col.as_path())
            elif col.is_geography():
                value_list.append(col.as_geography())
            else:
                print('ERROR: Type unsupported')
                return
        output_table.add_row(value_list)
    print(output_table)


"""
读取数据文件,返回参数numbers, relations
"""


def read_datas():
    with open(NUMBER_FILE, encoding='utf-8') as f:
        numbers = f.readlines()
    with open(RELATION_FILE, encoding='utf-8') as f:
        relations = f.readlines()
    return numbers, relations

def create_sql():
    create_sql = 'CREATE SPACE IF NOT EXISTS demo' \
                 '(partition_num = 10, replica_factor = 1, charset = utf8, collate = utf8_bin, vid_type = FIXED_STRING(32)); ' \
                 'USE demo; ' \
                 'CREATE TAG IF NOT EXISTS number();' \
                 'CREATE EDGE `relation` ( `beginTime` datetime NOT NULL) ttl_duration = 0, ttl_col = "";'
    return create_sql


# 批量插入node节点函数
def import_node(datas, client):
    values = []
    counter = 0

    for number in datas[0]:
        values.append(f"'{number.rstrip()}' : ()")
        counter += 1

        if counter == 20:
            values_str = ','.join(values)
            nsql = f'INSERT VERTEX number() VALUES {values_str};'
            resp = client.execute(nsql)

            # 重置计数器和值列表
            counter = 0
            values = []

    # 处理剩余不足 20 个的数据
    if counter > 0:
        values_str = ','.join(values)
        nsql = f'INSERT VERTEX number() VALUES {values_str};'
        resp = client.execute(nsql)
    return resp


"""
主函数:
"""
NUMBER_FILE = r'C:\Users\叼着零食打架\Desktop\nebual数据处理\测试数据\number.csv'
RELATION_FILE = r'C:\Users\叼着零食打架\Desktop\nebual数据处理\测试数据\relation.csv'
# nebula图数据库连接: ip / port
NEBULA_IP = '192.168.120.173'
NEBULA_PORT = 9669

if __name__ == '__main__':
    client = None
try:
    config = Config()
    config.max_connection_pool_size = 5
    # init connection pool
    connection_pool = ConnectionPool()
    assert connection_pool.init([(NEBULA_IP, NEBULA_PORT)], config)

    # get session from the pool
    client = connection_pool.get_session('root', 'nebula')
    assert client is not None

    # get the result in json format
    resp_json = client.execute_json("yield 1")
    json_obj = json.loads(resp_json)
    # print(json.dumps(json_obj, indent=2, sort_keys=True))

    # 获取创建图空间,顶点,关系边开始时间
    start_time1 = time.time()
    # 创建spcae图空间
    resp = client.execute(create_sql())
    assert resp.is_succeeded(), resp.error_msg()
    # 需要创建后等待一会时间,经测试6s有时还是会报错,导入数据时可能会报节点不存在。
    time.sleep(10)
    # 获取程序结束时间
    end_time1 = time.time()
    print("导入数据顶点和关系边花费时间:", (end_time1 - start_time1), "秒")

    """
    ----------------------------------------------------------------------------
    """
    # 读取结点和关系边数据。
    datas = read_datas()

    # 获取开始导入数据时间
    start_time2 = time.time()
    # 节点数据导入图空间
    resp = import_node(datas, client)
    assert resp.is_succeeded(), resp.error_msg()


    # 关系边数据导入图空间(非批量插入数据)
    for relation in datas[1]:
        relation = relation.rstrip().split('|')
        nsql_edge = 'INSERT EDGE relation(beginTime) VALUES "%s"->"%s":(datetime("%s"));' % (relation[0], relation[1], relation[2])
        resp = client.execute(nsql_edge)
        assert resp.is_succeeded(), resp.error_msg()
    # 获取数据导入数据结束时间
    end_time2 = time.time()
    print("导入数据顶点和关系边花费时间:", (end_time2 - start_time2), "秒")

    # resp = client.execute('FETCH PROP ON person "Bob" YIELD vertex as 源测试数据')
    # assert resp.is_succeeded(), resp.error_msg()
    # print_resp(resp)
    #
    # resp = client.execute('FETCH PROP ON like "Bob"->"Lily" YIELD edge as e')
    # assert resp.is_succeeded(), resp.error_msg()
    # print_resp(resp)

    # drop space
    # resp = client.execute('DROP SPACE test')
    # assert resp.is_succeeded(), resp.error_msg()
    print("数据导入结束!")

except Exception as x:
    import traceback

    print(traceback.format_exc())
    if client is not None:
        client.release()
    exit(1)

1 个赞

此话题已在最后回复的 7 天后被自动关闭。不再允许新回复。