Star

find path 获取路径后的显示问题

  • nebula 版本:1.2.0
  • 部署方式(分布式 / 单机 / Docker / DBaaS):单机
  • 硬件信息
    • 磁盘( 必须为 SSD ,不支持 HDD):SSD
    • CPU、内存信息:Xeon ,128G
  • 出问题的 Space 的创建方式:执行 describe space xxx;
  • 问题的具体描述
    想要通过三步分别的操作获取到路径的描述,第一步是通过go语句获取到所有源点的id并保存为$clients;
    $clients = go from hash(‘XX’) over ideal_app_clients yield ideal_app_clients._dst as id;
    第二步获取到所有目标id $var2
    $services = go from hash(‘XX’) over ideal_app_services yield ideal_app_services._dst as id;
    第三步通过find all path 查看源到目的的路由路径
    执行含有中间变量的find path语句:
    find all path from $clients.id to $services.id over *;
    后的列表显示结果为:

    但是导出到子图显示时提示:
    1644286874340902143, <access_ha,-2939029372456479421>, 678455751252824753, -5744189721757906653, <access_service,-2939029372456479421>, 2684703134607705562, -6619449238566780350, 7321322133804549396, 7817039153486962458不存在
    但是手动探索有实际上他们之间的关系是存在的……
    ps. 查看了下graphd的日志,发现有错误输出:

    这是不是因为着我的Edge名称有问题?

access_ha和access_service是有什么schema吗,大概率是有bug,因为1.x版本vid只能是个int,先插个眼,感谢反馈。

另外可以看下$clients和$services的值是否正确

1赞

另外看下能否给个最小复现的操作,我们这边也验证下是不是studio本身解析的问题,按你的描述来说应该是find path查询出结果后,点击导入子图到图探索会报节点不存在?

是的,在控制台上能够拿到路径结果,显示正常,但是导出到探索后就报错;另外我试了试你们的示例数据,find path的导出显示是没问题的,难道是我的schema设计中的定义涉及到了studio内部的关键字?导出到studio的报错信息为:


ps. 报完节点找不到后整个studio会白屏。。。

尝试找到最小验证数据,但是发现由于导入的数据量大可以复现,但是小量数据无法复现

代表的是访问HA服务器的连接,使用app_name的hash值作为边rank,这样做有风险吗?是不是不加rank会好点

实际关联的边信息, rank为app_name对应的hash

            "CREATE EDGE access_f5(app_name string, access_timestamp int, relation_type string default "
            "'access_f5');"
            "CREATE EDGE access_ha(app_name string , access_timestamp int, relation_type string default "
            "'access_ha');"
            "CREATE EDGE access_service(app_name string, relation_type string default 'access_service');"

如果说这个schema有问题那为啥少量数据插入的时候不存在这个bug,当大量数据比较混杂的时候这个问题就有可能出现

clients和services的值都验证过,在console下正确,在探索下也正确,手动探索这些节点以及边都存在,就是从console下的find path结果导出到探索后有问题且会把studio搞崩溃

1赞

可否方便把console的csv结果集私下发我们看看,便于复现看下studio为何会崩溃?数据量是否太大?

关系可以不复杂,但是数据的边的rank可劲儿造,试试崩溃不,我猜测是同样两个节点上的关联边的rank太多导致的,我这个数据也是测试数据,问题应该在自己伪造的数据上也可以复现

你是说两个点之间关联了多条边么?因为一条边不是只能有一个rank么,这里可劲儿造是指造多条边?

rank默认值是0,所以我给rank进行了赋值,来区分相同两个节点间同一类型关系下不同时刻的关系;当时考虑的是两个节点间可能存在不同时刻的交互关联,因此使用rank来区分了下

这个我验证下,不一定能复现,不知道两个相同点之间你的边数量有多少?10条以上?

数据涉及到客户的信息所以不太方便提供,处理的脚本倒是可以提供给你们试试,主要是schema的设计部分:
def run_in_debug_data():
“”" 接受原始access日志来构建图谱关联

:return:
"""

# 初始化配置
import json
from tools.loguru_tools import logger
keep_fresh = True
g_ip = '192.168.4.1'
g_port = 3699
space_name = "XXXX"
user = "user"
password = "password"
# init connection pool
connection_pool = ConnectionPool(g_ip, g_port, 10, 10000)

# execute queries
client = None
start_time = time.time()
try:
    logger.info(f'using space_name : {space_name}')
    # Get one client
    client = GraphClient(connection_pool)
    # when connection is broken use the following space to set the current session's space
    client.set_space(space_name)
    auth_resp = client.authenticate(user, password)
    if auth_resp.error_code:
        raise AuthException("Auth failed")

    # do_simple_execute(client,f"UPDATE CONFIGS storage:wal_ttl=3600;")
    # do_simple_execute(client,"UPDATE CONFIGS storage:rocksdb_column_family_options = { disable_auto_compactions = true };")

    query_resp = client.execute_query('SHOW SPACES')
    recreat = False
    if has_space(query_resp.rows, space_name):
        if keep_fresh:
            print('has %s, drop it' % space_name)
            do_simple_execute(client, 'DROP SPACE %s' % space_name)
            recreat = True
    else:
        recreat = True

    if recreat:
        # Create space mySpace
        do_simple_execute(client, f'CREATE SPACE {space_name}')
        do_simple_execute(client, f'USE {space_name}')
        time.sleep(1)
        
        # Create tag and edge
        do_simple_execute(
            client,
            ############################# 节点 ################################
            # 实际概念,带有IP的资源
            "CREATE TAG UserClient(ip string, node_type string default '客户端'); "
            "CREATE TAG F5Server(ip string, node_type string default 'F5服务器'); "
            "CREATE TAG HAServer(ip string, node_type string default 'HA服务器'); "
            "CREATE TAG AppServer(ip string,app_name string, node_type string default '应用服务器'); "
            "CREATE TAG DBServer(ip string, node_type string default 'DB服务器', db_type string default 'Oracle'); "
            "CREATE TAG AppService(ip_port string,app_name string, node_type string default '应用服务实例'); "

            # 抽象概念 APP
            "CREATE TAG APP(app_name string, node_type string default '应用'); "
            # 抽象概念 HA、F5、DB、开始 等
            "CREATE TAG ResourceManager(resource_name string, node_type string default '资源管理'); "

            ############################   边  ################################
            # 实际关联的边信息, rank为app_name对应的hash
            "CREATE EDGE access_f5(app_name string, access_timestamp int, relation_type string default "
            "'access_f5');"
            "CREATE EDGE access_ha(app_name string , access_timestamp int, relation_type string default "
            "'access_ha');"
            "CREATE EDGE access_service(app_name string, relation_type string default 'access_service');"  
            # rank为访问服务的端口号
            "CREATE EDGE access_server(target_port int, app_name string, access_timestamp int, "
            "relation_type string default 'access_server');"
            # 暂无rank
            "CREATE EDGE access_db(access_timestamp int, relation_type string default 'access_db');"

            # 抽象关联的边信息, rank为app_name对应的hash
            "CREATE EDGE ideal_app_f5ha(app_name string, relation_type string default 'app_related_f5ha'); "
            "CREATE EDGE ideal_app_clients(app_name string, relation_type string default 'app_related_clients'); "
            "CREATE EDGE ideal_app_servers(app_name string, relation_type string default 'app_related_servers'); "
            "CREATE EDGE ideal_app_services(app_name string, relation_type string default 'app_related_services'); "
            "CREATE EDGE ideal_start_includes(relation_type string default '包含应用'); "  # 开始节点关联的所有app

            # 抽象实例关系,rank为app_name对应的hash
            "CREATE EDGE ideal_f5_instances(app_name string, relation_type string default 'F5资源实例'); "
            "CREATE EDGE ideal_ha_instances(app_name string, relation_type string default 'HA资源实例'); "
            "CREATE EDGE ideal_db_instances(app_name string, relation_type string default 'DB资源实例'); "
            
            # 访问统计信息,rank为win_start值
            "CREATE EDGE ideal_access_count(freq_count int, win_start int, win_end int, "
            "relation_type string default '访问统计'); "

        )
        # # It should large than the cycle of loading the schema
        time.sleep(6)
    else:
        do_simple_execute(client, f"USE {space_name}")
        time.sleep(1)

    # 插入共有节点
    for name in ["开始", "F5", "HA", "DB"]:
        do_simple_execute(client, f"INSERT VERTEX ResourceManager(resource_name) VALUES "
        f"hash('{name}'):('{name}'); ")

    # 首先导入access中的信息
    source_file = f"{basedir}/data/ceshi-sample.csv"
    # # Insert vertex and edge
    i = 1
    for line in open(source_file, encoding='utf8'):
        print(f"Processing : {i}")
        itm = json.loads(line)
        # "source_ip": "10.147.148.42",					    UserClient
        # "remote_addr": "10.147.148.42",					F5Server (有可能为空)
        # "host": "10.153.200.40",						    HAServer
        # "upstream_addr": "10.233.85.199:8080",			AppServer
        # "app_name": "xxxx",
        # "systimestamp": 1610535812388,            
        app_name = itm.get("app_name", "default")
        app_hash_id = get_nebula_hash(client,app_name)
        client_ip = itm.get("source_ip", "")
        f5_ip = itm.get("remote_addr", "")
        ha_ip = itm.get("host", "")

        access_timestamp = itm.get("systemstamp", -1)

        # 插入必选节点
        do_simple_execute(client, f"INSERT VERTEX APP(app_name) VALUES "
        f"hash('{app_name}'):('{app_name}'); ")
        do_simple_execute(client, f"INSERT EDGE ideal_start_includes() VALUES "
        f"hash('开始')->hash('{app_name}'):(); ")

        # 插入Client相关节点以及抽象关系
        if client_ip:
            do_simple_execute(client, f"INSERT VERTEX UserClient(ip) VALUES "
            f"hash('{client_ip}'):('{client_ip}'); "
                              )
            do_simple_execute(client, f"INSERT EDGE ideal_app_clients(app_name) VALUES "
            f"hash('{app_name}')->hash('{client_ip}')@{app_hash_id}:('{app_name}'); ")

        # 插入HA相关节点以及抽象关系
        if ha_ip:
            do_simple_execute(client, f"INSERT VERTEX HAServer(ip) VALUES "
            f"hash('{ha_ip}'):('{ha_ip}'); ")
            do_simple_execute(client, f"INSERT EDGE ideal_ha_instances (app_name) VALUES "
            f"hash('HA')->hash('{ha_ip}')@{app_hash_id}:('{app_name}'); "
                              )
            do_simple_execute(client, f"INSERT EDGE ideal_app_f5ha(app_name) VALUES "
            f"hash('{app_name}')->hash('{ha_ip}')@{app_hash_id}:('ideal_app_f5ha'); ")

        # 插入F5相关节点并整理client->f5->ha的关系
        if f5_ip:
            do_simple_execute(client, f"INSERT VERTEX F5Server(ip) VALUES hash('{f5_ip}'):('{f5_ip}'); ")
            do_simple_execute(client, f"INSERT EDGE ideal_app_f5ha(app_name) VALUES "
            f"hash('{app_name}')->hash('{f5_ip}')@{app_hash_id}:('{app_name}'); ")
            do_simple_execute(client, f"INSERT EDGE ideal_f5_instances(app_name) VALUES "
            f"hash('F5')->hash('{f5_ip}')@{app_hash_id}:('{app_name}'); ")

            if client_ip and f5_ip:
                do_simple_execute(client,
                                  f"INSERT EDGE access_f5(app_name, access_timestamp) VALUES "
                                  f"hash('{client_ip}')->hash('{f5_ip}')@{app_hash_id}:('{app_name}', {access_timestamp}); ")

            if f5_ip and ha_ip:
                do_simple_execute(client,
                                  f"INSERT EDGE access_ha(app_name, access_timestamp) "
                                  f"VALUES hash('{f5_ip}')->hash('{ha_ip}')@{app_hash_id}:('{app_name}', {access_timestamp}); "
                                  f"INSERT EDGE ideal_app_f5ha(app_name) VALUES hash('{app_name}')->hash('{ha_ip}')@{app_hash_id}:('{app_name}'); ")

        else:  # 没有F5的情形,client直达HA
            if client_ip and ha_ip:
                do_simple_execute(client, f"INSERT EDGE access_ha(app_name, access_timestamp) "
                f"VALUES hash('{client_ip}')->hash('{ha_ip}')@{app_hash_id}:('{app_name}', {access_timestamp}); ")

        # 以下开始整理ha->appserver的关系,涉及到多服务器的情况
        app_server_ip_ports = itm.get('upstream_addr', '')
        for app_server_ip_port in app_server_ip_ports.split(','):

            if ":" in app_server_ip_port:
                app_server_ip, app_server_port = app_server_ip_port.split(":")
                app_server_port = -1 if '-' in app_server_port else int(app_server_port)
            else:
                app_server_ip = app_server_ip_port
                app_server_port = -1
            if app_server_ip:
                # appserver,必有
                do_simple_execute(client, f"INSERT VERTEX AppServer(ip, app_name) VALUES "
                f"hash('{app_server_ip}'):('{app_server_ip}','{app_name}'); ")
                # 插入app->appserver的连接
                do_simple_execute(client,
                                  f"INSERT EDGE ideal_app_servers(app_name) VALUES "
                                  f"hash('{app_name}')->hash('{app_server_ip}')@{app_hash_id}:('{app_name}'); ")
                # 插入ha->appserver的连接
                do_simple_execute(client,
                                  f"INSERT EDGE access_server(target_port, app_name, access_timestamp ) VALUES "
                                  f"hash('{ha_ip}')->hash('{app_server_ip}')@{app_server_port}:({app_server_port},'{app_name}', {access_timestamp}); "
                                  )

                # appservice,必有
                do_simple_execute(client, f"INSERT VERTEX AppService(ip_port, app_name) VALUES "
                f"hash('{app_server_ip_port}'):('{app_server_ip_port}','{app_name}'); ")
                # 插入app->appservice的连接
                do_simple_execute(client, f"INSERT EDGE ideal_app_services(app_name) VALUES "
                f"hash('{app_name}')->hash('{app_server_ip_port}')@{app_hash_id}:('{app_name}'); ")
                # 插入appserver->appservice的连接
                do_simple_execute(client, f"INSERT EDGE access_service(app_name) VALUES "
                f"hash('{app_server_ip}')->hash('{app_server_ip_port}')@{app_hash_id}:("
                f"'{app_name}');")

        i += 1


except Exception as e:
    logger.exception(e)
finally:
    # do_simple_execute(client,"UPDATE CONFIGS storage:rocksdb_column_family_options = { disable_auto_compactions = false };"
    #                          "UPDATE CONFIGS storage:wal_ttl=86400;")
    client.sign_out()
    print(f">>>> 结束数据导入操作,整体用时:{time.time() - start_time} s")

数据的格式为一行一条json数据,json中只需包含image 这几个字段以及这一个字段即可image

好的,谢谢:handshake:

不知道问题能够复现了吗?有没有好的解决办法呀?有需要我配合的还可以继续说哈 :grin:

可以把导入图探索的点 直接图探索那边添加起点试试

浙ICP备20010487号