想问下改如何实现。
from config import Config
import json
import random
import asyncio
from get_schema import get_schema
async def define_schema(session, graph_id):
“”“定义知识模式”“”
graph_id_str = f"pkg{graph_id}" if isinstance(graph_id, int) else graph_id
url = f’{Config.BASE_URL}/space/{graph_id_str}/tag-schemas/save’
headers = {
"Content-Type": "application/json",
"Authorization": f"Basic {Config.AUTH_STRING}",
"Cookie": f"Satoken={Config.HEADERS['satoken']}",
"satoken": Config.HEADERS['satoken']
}
data = {
"saveSchemas": {
"entitySchemas": [
{
"entityType": "测试性能",
"properties": [
{
"propertyName": "CHECK",
"type": "STRING"
},
{
"propertyName": "PRO_1",
"type": "STRING"
},
{
"propertyName": "NAME",
"type": "STRING"
}
],
"styleId": 1
}
],
"eventSchemas": [
{
"id": -1,
"entityType": "测试事件",
"relations": [
{
"edgeType": 1,
"relationType": "测试",
"tailEntityType": "测试性能"
},
{
"edgeType": 2,
"relationType": "测试",
"tailEntityType": "测试性能"
}
]
}
],
"relationSchemas": [
{
"headEntityType": "测试性能",
"relationType": "对应",
"tailEntityType": "测试性能"
}
]
},
"dropSchemas": {
"entitySchemas": [],
"eventSchemas": [],
"relationSchemas": []
},
"updateSchemas": {
"entitySchemas": [],
"eventSchemas": [],
"relationSchemas": []
}
}
print(f"\n=== 知识模式定义请求信息 ===")
print(f"URL: {url}")
print(f"Headers: {json.dumps(headers, indent=2, ensure_ascii=False)}")
print(f"Request Data: {json.dumps(data, indent=2, ensure_ascii=False)}")
try:
async with session.post(url, json=data, headers=headers, ssl=False) as response:
response_text = await response.text()
print(f"\n=== 知识模式定义响应信息 ===")
print(f"Status Code: {response.status}")
print(f"Response Headers: {dict(response.headers)}")
print(f"Response Body: {response_text}")
if response.status == 200:
result = json.loads(response_text)
if result.get('code') == '200':
print("知识模式定义成功")
await asyncio.sleep(5) # 等待 schema 同步
return True
else:
print(f"知识模式定义失败: {result.get('message')}")
print(f"错误详情: {result.get('data')}")
if result.get('data', {}).get('logMsg'):
print(f"错误日志: {result['data']['logMsg']}")
else:
print(f"知识模式定义请求失败: HTTP {response.status}")
print(f"响应内容: {response_text}")
return False
except Exception as e:
print(f"知识模式定义错误: {str(e)}")
print(f"错误类型: {type(e).__name__}")
import traceback
print(f"错误堆栈: {traceback.format_exc()}")
return False
async def get_entity_list(session, graph_id):
“”“获取实体列表”“”
graph_id_str = f"pkg{graph_id}" if isinstance(graph_id, int) else graph_id
url = f’{Config.BASE_URL}/space/{graph_id_str}/entity/list’
try:
async with session.get(url, ssl=False) as response:
text = await response.text()
result = json.loads(text)
if response.status == 200 and result.get('code') == '200':
return result.get('data', [])
return None
except Exception as e:
print(f"获取实体列表错误: {str(e)}")
return None
async def add_relation(session, graph_id):
“”“添加关系”“”
graph_id_str = f"pkg{graph_id}" if isinstance(graph_id, int) else graph_id
url = f’{Config.BASE_URL}/space/{graph_id_str}/view/relation/add’
# 首先获取实体列表
entities = await get_entity_list(session, graph_id)
if not entities or len(entities) < 2:
print("没有足够的实体来创建关系")
return False
# 随机选择两个不同的实体
selected_entities = random.sample(entities, 2)
data = {
"headSchemaId": selected_entities[0]['id'],
"headVid": selected_entities[0]['vid'],
"id": 0,
"tailSchemaId": selected_entities[1]['id'],
"tailVid": selected_entities[1]['vid']
}
try:
async with session.post(url, json=data, ssl=False) as response:
text = await response.text()
result = json.loads(text)
if response.status == 200 and result.get('code') == '200':
print("关系添加成功!")
return True
else:
print(f"关系添加失败: {text}")
return False
except Exception as e:
print(f"添加关系错误: {str(e)}")
return False