应该是我程序端口的问题,以下提供一个成功测试示例程序(内含批量插入和非批量插入):
#!/usr/bin/env python
# --coding:utf-8--
# Copyright (c) 2020 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.
import time
import json
import prettytable
from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config
"""
导入数据时对属性
"""
def print_resp(resp):
assert resp.is_succeeded()
output_table = prettytable.PrettyTable()
output_table.field_names = resp.keys()
for recode in resp:
value_list = []
for col in recode:
if col.is_empty():
value_list.append('__EMPTY__')
elif col.is_null():
value_list.append('__NULL__')
elif col.is_bool():
value_list.append(col.as_bool())
elif col.is_int():
value_list.append(col.as_int())
elif col.is_double():
value_list.append(col.as_double())
elif col.is_string():
value_list.append(col.as_string())
elif col.is_time():
value_list.append(col.as_time())
elif col.is_date():
value_list.append(col.as_date())
elif col.is_datetime():
value_list.append(col.as_datetime())
elif col.is_list():
value_list.append(col.as_list())
elif col.is_set():
value_list.append(col.as_set())
elif col.is_map():
value_list.append(col.as_map())
elif col.is_vertex():
value_list.append(col.as_node())
elif col.is_edge():
value_list.append(col.as_relationship())
elif col.is_path():
value_list.append(col.as_path())
elif col.is_geography():
value_list.append(col.as_geography())
else:
print('ERROR: Type unsupported')
return
output_table.add_row(value_list)
print(output_table)
"""
读取数据文件,返回参数numbers, relations
"""
def read_datas():
with open(NUMBER_FILE, encoding='utf-8') as f:
numbers = f.readlines()
with open(RELATION_FILE, encoding='utf-8') as f:
relations = f.readlines()
return numbers, relations
def create_sql():
create_sql = 'CREATE SPACE IF NOT EXISTS demo' \
'(partition_num = 10, replica_factor = 1, charset = utf8, collate = utf8_bin, vid_type = FIXED_STRING(32)); ' \
'USE demo; ' \
'CREATE TAG IF NOT EXISTS number();' \
'CREATE EDGE `relation` ( `beginTime` datetime NOT NULL) ttl_duration = 0, ttl_col = "";'
return create_sql
# 批量插入node节点函数
def import_node(datas, client):
values = []
counter = 0
for number in datas[0]:
values.append(f"'{number.rstrip()}' : ()")
counter += 1
if counter == 20:
values_str = ','.join(values)
nsql = f'INSERT VERTEX number() VALUES {values_str};'
resp = client.execute(nsql)
# 重置计数器和值列表
counter = 0
values = []
# 处理剩余不足 20 个的数据
if counter > 0:
values_str = ','.join(values)
nsql = f'INSERT VERTEX number() VALUES {values_str};'
resp = client.execute(nsql)
return resp
"""
主函数:
"""
NUMBER_FILE = r'C:\Users\叼着零食打架\Desktop\nebual数据处理\测试数据\number.csv'
RELATION_FILE = r'C:\Users\叼着零食打架\Desktop\nebual数据处理\测试数据\relation.csv'
# nebula图数据库连接: ip / port
NEBULA_IP = '192.168.120.173'
NEBULA_PORT = 9669
if __name__ == '__main__':
client = None
try:
config = Config()
config.max_connection_pool_size = 5
# init connection pool
connection_pool = ConnectionPool()
assert connection_pool.init([(NEBULA_IP, NEBULA_PORT)], config)
# get session from the pool
client = connection_pool.get_session('root', 'nebula')
assert client is not None
# get the result in json format
resp_json = client.execute_json("yield 1")
json_obj = json.loads(resp_json)
# print(json.dumps(json_obj, indent=2, sort_keys=True))
# 获取创建图空间,顶点,关系边开始时间
start_time1 = time.time()
# 创建spcae图空间
resp = client.execute(create_sql())
assert resp.is_succeeded(), resp.error_msg()
# 需要创建后等待一会时间,经测试6s有时还是会报错,导入数据时可能会报节点不存在。
time.sleep(10)
# 获取程序结束时间
end_time1 = time.time()
print("导入数据顶点和关系边花费时间:", (end_time1 - start_time1), "秒")
"""
----------------------------------------------------------------------------
"""
# 读取结点和关系边数据。
datas = read_datas()
# 获取开始导入数据时间
start_time2 = time.time()
# 节点数据导入图空间
resp = import_node(datas, client)
assert resp.is_succeeded(), resp.error_msg()
# 关系边数据导入图空间(非批量插入数据)
for relation in datas[1]:
relation = relation.rstrip().split('|')
nsql_edge = 'INSERT EDGE relation(beginTime) VALUES "%s"->"%s":(datetime("%s"));' % (relation[0], relation[1], relation[2])
resp = client.execute(nsql_edge)
assert resp.is_succeeded(), resp.error_msg()
# 获取数据导入数据结束时间
end_time2 = time.time()
print("导入数据顶点和关系边花费时间:", (end_time2 - start_time2), "秒")
# resp = client.execute('FETCH PROP ON person "Bob" YIELD vertex as 源测试数据')
# assert resp.is_succeeded(), resp.error_msg()
# print_resp(resp)
#
# resp = client.execute('FETCH PROP ON like "Bob"->"Lily" YIELD edge as e')
# assert resp.is_succeeded(), resp.error_msg()
# print_resp(resp)
# drop space
# resp = client.execute('DROP SPACE test')
# assert resp.is_succeeded(), resp.error_msg()
print("数据导入结束!")
except Exception as x:
import traceback
print(traceback.format_exc())
if client is not None:
client.release()
exit(1)