- nebula 版本:1.2.0
- 部署方式(分布式 / 单机 / Docker / DBaaS):单机
- 硬件信息
- 磁盘( 必须为 SSD ,不支持 HDD):SSD
- CPU、内存信息:Xeon ,128G
- 出问题的 Space 的创建方式:执行
describe space xxx
;
- 问题的具体描述
想要通过三步分别的操作获取到路径的描述,第一步是通过go语句获取到所有源点的id并保存为$clients;
$clients = go from hash(‘XX’) over ideal_app_clients yield ideal_app_clients._dst as id;
第二步获取到所有目标id $var2
$services = go from hash(‘XX’) over ideal_app_services yield ideal_app_services._dst as id;
第三步通过find all path 查看源到目的的路由路径
执行含有中间变量的find path语句:
find all path from $clients.id to $services.id over *;
后的列表显示结果为:
但是导出到子图显示时提示:
1644286874340902143, <access_ha,-2939029372456479421>, 678455751252824753, -5744189721757906653, <access_service,-2939029372456479421>, 2684703134607705562, -6619449238566780350, 7321322133804549396, 7817039153486962458不存在
但是手动探索有实际上他们之间的关系是存在的……
ps. 查看了下graphd的日志,发现有错误输出:
这是不是因为着我的Edge名称有问题?
access_ha和access_service是有什么schema吗,大概率是有bug,因为1.x版本vid只能是个int,先插个眼,感谢反馈。
另外可以看下$clients和$services的值是否正确
另外看下能否给个最小复现的操作,我们这边也验证下是不是studio本身解析的问题,按你的描述来说应该是find path查询出结果后,点击导入子图到图探索会报节点不存在?
是的,在控制台上能够拿到路径结果,显示正常,但是导出到探索后就报错;另外我试了试你们的示例数据,find path的导出显示是没问题的,难道是我的schema设计中的定义涉及到了studio内部的关键字?导出到studio的报错信息为:
ps. 报完节点找不到后整个studio会白屏。。。
尝试找到最小验证数据,但是发现由于导入的数据量大可以复现,但是小量数据无法复现
代表的是访问HA服务器的连接,使用app_name的hash值作为边rank,这样做有风险吗?是不是不加rank会好点
实际关联的边信息, rank为app_name对应的hash
"CREATE EDGE access_f5(app_name string, access_timestamp int, relation_type string default "
"'access_f5');"
"CREATE EDGE access_ha(app_name string , access_timestamp int, relation_type string default "
"'access_ha');"
"CREATE EDGE access_service(app_name string, relation_type string default 'access_service');"
如果说这个schema有问题那为啥少量数据插入的时候不存在这个bug,当大量数据比较混杂的时候这个问题就有可能出现
clients和services的值都验证过,在console下正确,在探索下也正确,手动探索这些节点以及边都存在,就是从console下的find path结果导出到探索后有问题且会把studio搞崩溃
可否方便把console的csv结果集私下发我们看看,便于复现看下studio为何会崩溃?数据量是否太大?
关系可以不复杂,但是数据的边的rank可劲儿造,试试崩溃不,我猜测是同样两个节点上的关联边的rank太多导致的,我这个数据也是测试数据,问题应该在自己伪造的数据上也可以复现
你是说两个点之间关联了多条边么?因为一条边不是只能有一个rank么,这里可劲儿造是指造多条边?
rank默认值是0,所以我给rank进行了赋值,来区分相同两个节点间同一类型关系下不同时刻的关系;当时考虑的是两个节点间可能存在不同时刻的交互关联,因此使用rank来区分了下
这个我验证下,不一定能复现,不知道两个相同点之间你的边数量有多少?10条以上?
数据涉及到客户的信息所以不太方便提供,处理的脚本倒是可以提供给你们试试,主要是schema的设计部分:
def run_in_debug_data():
“”" 接受原始access日志来构建图谱关联
:return:
"""
# 初始化配置
import json
from tools.loguru_tools import logger
keep_fresh = True
g_ip = '192.168.4.1'
g_port = 3699
space_name = "XXXX"
user = "user"
password = "password"
# init connection pool
connection_pool = ConnectionPool(g_ip, g_port, 10, 10000)
# execute queries
client = None
start_time = time.time()
try:
logger.info(f'using space_name : {space_name}')
# Get one client
client = GraphClient(connection_pool)
# when connection is broken use the following space to set the current session's space
client.set_space(space_name)
auth_resp = client.authenticate(user, password)
if auth_resp.error_code:
raise AuthException("Auth failed")
# do_simple_execute(client,f"UPDATE CONFIGS storage:wal_ttl=3600;")
# do_simple_execute(client,"UPDATE CONFIGS storage:rocksdb_column_family_options = { disable_auto_compactions = true };")
query_resp = client.execute_query('SHOW SPACES')
recreat = False
if has_space(query_resp.rows, space_name):
if keep_fresh:
print('has %s, drop it' % space_name)
do_simple_execute(client, 'DROP SPACE %s' % space_name)
recreat = True
else:
recreat = True
if recreat:
# Create space mySpace
do_simple_execute(client, f'CREATE SPACE {space_name}')
do_simple_execute(client, f'USE {space_name}')
time.sleep(1)
# Create tag and edge
do_simple_execute(
client,
############################# 节点 ################################
# 实际概念,带有IP的资源
"CREATE TAG UserClient(ip string, node_type string default '客户端'); "
"CREATE TAG F5Server(ip string, node_type string default 'F5服务器'); "
"CREATE TAG HAServer(ip string, node_type string default 'HA服务器'); "
"CREATE TAG AppServer(ip string,app_name string, node_type string default '应用服务器'); "
"CREATE TAG DBServer(ip string, node_type string default 'DB服务器', db_type string default 'Oracle'); "
"CREATE TAG AppService(ip_port string,app_name string, node_type string default '应用服务实例'); "
# 抽象概念 APP
"CREATE TAG APP(app_name string, node_type string default '应用'); "
# 抽象概念 HA、F5、DB、开始 等
"CREATE TAG ResourceManager(resource_name string, node_type string default '资源管理'); "
############################ 边 ################################
# 实际关联的边信息, rank为app_name对应的hash
"CREATE EDGE access_f5(app_name string, access_timestamp int, relation_type string default "
"'access_f5');"
"CREATE EDGE access_ha(app_name string , access_timestamp int, relation_type string default "
"'access_ha');"
"CREATE EDGE access_service(app_name string, relation_type string default 'access_service');"
# rank为访问服务的端口号
"CREATE EDGE access_server(target_port int, app_name string, access_timestamp int, "
"relation_type string default 'access_server');"
# 暂无rank
"CREATE EDGE access_db(access_timestamp int, relation_type string default 'access_db');"
# 抽象关联的边信息, rank为app_name对应的hash
"CREATE EDGE ideal_app_f5ha(app_name string, relation_type string default 'app_related_f5ha'); "
"CREATE EDGE ideal_app_clients(app_name string, relation_type string default 'app_related_clients'); "
"CREATE EDGE ideal_app_servers(app_name string, relation_type string default 'app_related_servers'); "
"CREATE EDGE ideal_app_services(app_name string, relation_type string default 'app_related_services'); "
"CREATE EDGE ideal_start_includes(relation_type string default '包含应用'); " # 开始节点关联的所有app
# 抽象实例关系,rank为app_name对应的hash
"CREATE EDGE ideal_f5_instances(app_name string, relation_type string default 'F5资源实例'); "
"CREATE EDGE ideal_ha_instances(app_name string, relation_type string default 'HA资源实例'); "
"CREATE EDGE ideal_db_instances(app_name string, relation_type string default 'DB资源实例'); "
# 访问统计信息,rank为win_start值
"CREATE EDGE ideal_access_count(freq_count int, win_start int, win_end int, "
"relation_type string default '访问统计'); "
)
# # It should large than the cycle of loading the schema
time.sleep(6)
else:
do_simple_execute(client, f"USE {space_name}")
time.sleep(1)
# 插入共有节点
for name in ["开始", "F5", "HA", "DB"]:
do_simple_execute(client, f"INSERT VERTEX ResourceManager(resource_name) VALUES "
f"hash('{name}'):('{name}'); ")
# 首先导入access中的信息
source_file = f"{basedir}/data/ceshi-sample.csv"
# # Insert vertex and edge
i = 1
for line in open(source_file, encoding='utf8'):
print(f"Processing : {i}")
itm = json.loads(line)
# "source_ip": "10.147.148.42", UserClient
# "remote_addr": "10.147.148.42", F5Server (有可能为空)
# "host": "10.153.200.40", HAServer
# "upstream_addr": "10.233.85.199:8080", AppServer
# "app_name": "xxxx",
# "systimestamp": 1610535812388,
app_name = itm.get("app_name", "default")
app_hash_id = get_nebula_hash(client,app_name)
client_ip = itm.get("source_ip", "")
f5_ip = itm.get("remote_addr", "")
ha_ip = itm.get("host", "")
access_timestamp = itm.get("systemstamp", -1)
# 插入必选节点
do_simple_execute(client, f"INSERT VERTEX APP(app_name) VALUES "
f"hash('{app_name}'):('{app_name}'); ")
do_simple_execute(client, f"INSERT EDGE ideal_start_includes() VALUES "
f"hash('开始')->hash('{app_name}'):(); ")
# 插入Client相关节点以及抽象关系
if client_ip:
do_simple_execute(client, f"INSERT VERTEX UserClient(ip) VALUES "
f"hash('{client_ip}'):('{client_ip}'); "
)
do_simple_execute(client, f"INSERT EDGE ideal_app_clients(app_name) VALUES "
f"hash('{app_name}')->hash('{client_ip}')@{app_hash_id}:('{app_name}'); ")
# 插入HA相关节点以及抽象关系
if ha_ip:
do_simple_execute(client, f"INSERT VERTEX HAServer(ip) VALUES "
f"hash('{ha_ip}'):('{ha_ip}'); ")
do_simple_execute(client, f"INSERT EDGE ideal_ha_instances (app_name) VALUES "
f"hash('HA')->hash('{ha_ip}')@{app_hash_id}:('{app_name}'); "
)
do_simple_execute(client, f"INSERT EDGE ideal_app_f5ha(app_name) VALUES "
f"hash('{app_name}')->hash('{ha_ip}')@{app_hash_id}:('ideal_app_f5ha'); ")
# 插入F5相关节点并整理client->f5->ha的关系
if f5_ip:
do_simple_execute(client, f"INSERT VERTEX F5Server(ip) VALUES hash('{f5_ip}'):('{f5_ip}'); ")
do_simple_execute(client, f"INSERT EDGE ideal_app_f5ha(app_name) VALUES "
f"hash('{app_name}')->hash('{f5_ip}')@{app_hash_id}:('{app_name}'); ")
do_simple_execute(client, f"INSERT EDGE ideal_f5_instances(app_name) VALUES "
f"hash('F5')->hash('{f5_ip}')@{app_hash_id}:('{app_name}'); ")
if client_ip and f5_ip:
do_simple_execute(client,
f"INSERT EDGE access_f5(app_name, access_timestamp) VALUES "
f"hash('{client_ip}')->hash('{f5_ip}')@{app_hash_id}:('{app_name}', {access_timestamp}); ")
if f5_ip and ha_ip:
do_simple_execute(client,
f"INSERT EDGE access_ha(app_name, access_timestamp) "
f"VALUES hash('{f5_ip}')->hash('{ha_ip}')@{app_hash_id}:('{app_name}', {access_timestamp}); "
f"INSERT EDGE ideal_app_f5ha(app_name) VALUES hash('{app_name}')->hash('{ha_ip}')@{app_hash_id}:('{app_name}'); ")
else: # 没有F5的情形,client直达HA
if client_ip and ha_ip:
do_simple_execute(client, f"INSERT EDGE access_ha(app_name, access_timestamp) "
f"VALUES hash('{client_ip}')->hash('{ha_ip}')@{app_hash_id}:('{app_name}', {access_timestamp}); ")
# 以下开始整理ha->appserver的关系,涉及到多服务器的情况
app_server_ip_ports = itm.get('upstream_addr', '')
for app_server_ip_port in app_server_ip_ports.split(','):
if ":" in app_server_ip_port:
app_server_ip, app_server_port = app_server_ip_port.split(":")
app_server_port = -1 if '-' in app_server_port else int(app_server_port)
else:
app_server_ip = app_server_ip_port
app_server_port = -1
if app_server_ip:
# appserver,必有
do_simple_execute(client, f"INSERT VERTEX AppServer(ip, app_name) VALUES "
f"hash('{app_server_ip}'):('{app_server_ip}','{app_name}'); ")
# 插入app->appserver的连接
do_simple_execute(client,
f"INSERT EDGE ideal_app_servers(app_name) VALUES "
f"hash('{app_name}')->hash('{app_server_ip}')@{app_hash_id}:('{app_name}'); ")
# 插入ha->appserver的连接
do_simple_execute(client,
f"INSERT EDGE access_server(target_port, app_name, access_timestamp ) VALUES "
f"hash('{ha_ip}')->hash('{app_server_ip}')@{app_server_port}:({app_server_port},'{app_name}', {access_timestamp}); "
)
# appservice,必有
do_simple_execute(client, f"INSERT VERTEX AppService(ip_port, app_name) VALUES "
f"hash('{app_server_ip_port}'):('{app_server_ip_port}','{app_name}'); ")
# 插入app->appservice的连接
do_simple_execute(client, f"INSERT EDGE ideal_app_services(app_name) VALUES "
f"hash('{app_name}')->hash('{app_server_ip_port}')@{app_hash_id}:('{app_name}'); ")
# 插入appserver->appservice的连接
do_simple_execute(client, f"INSERT EDGE access_service(app_name) VALUES "
f"hash('{app_server_ip}')->hash('{app_server_ip_port}')@{app_hash_id}:("
f"'{app_name}');")
i += 1
except Exception as e:
logger.exception(e)
finally:
# do_simple_execute(client,"UPDATE CONFIGS storage:rocksdb_column_family_options = { disable_auto_compactions = false };"
# "UPDATE CONFIGS storage:wal_ttl=86400;")
client.sign_out()
print(f">>>> 结束数据导入操作,整体用时:{time.time() - start_time} s")
数据的格式为一行一条json数据,json中只需包含 这几个字段以及这一个字段即可
好的,谢谢
不知道问题能够复现了吗?有没有好的解决办法呀?有需要我配合的还可以继续说哈
可以把导入图探索的点 直接图探索那边添加起点试试