import json
from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config
def uploadJson():
# 从包含嵌套JSON数据的文件中读取数据
with open('data.json') as file:
data = json.load(file)
# 遍历JSON数据
for key in data:
if key['tag'] == "player":
sqlCreateTag = 'CREATE TAG IF NOT EXISTS `player`(`name` string, `age` int)'
elif key['tag'] == "team":
sqlCreateTag = 'CREATE TAG IF NOT EXISTS `team`(`name` string)'
print(sqlCreateTag)
print(session.execute(sqlCreateTag))
propsName = '('
propsData = '('
# 遍历JSON数据中的每个元素
for item in key:
if item == "tag" or item == "id":
continue
if type(key[item]) == str:
propsData += ('"' + key[item] + '", ')
propsName += (item + ', ')
else:
propsData += (str(key[item]) + ', ')
propsName += (item + ', ')
propsName = propsName[:len(propsName) - 2]
propsData = propsData[:len(propsData) - 2]
propsName += ')'
propsData += ')'
sqlInsertVertex = 'INSERT VERTEX {} {} VALUES "{}":{}'.format(key['tag'], propsName, key['id'], propsData)
print(session.execute(sqlInsertVertex))
print(sqlInsertVertex)
if __name__ == '__main__':
# 定义配置
config = Config()
config.max_connection_pool_size = 10
config.timeout = 500
# 初始化连接池
connection_pool = ConnectionPool()
# 如果给定的服务器正常,则返回true,否则返回false。
ok = connection_pool.init([('192.168.161.128', 9669)], config)
# 方法1:控制连接自行释放。
# 从连接池中获取会话
session = connection_pool.get_session('root', 'nebula')
# 创建Test空间
session.execute("CREATE SPACE IF NOT EXISTS `Test` (vid_type = FIXED_STRING(32))")
# 使用Test空间
session.execute("USE Test")
uploadJson()
# 释放会话
session.release()
# 关闭连接池
connection_pool.close()
修改完 schema 后,建议 sleep 20s(即两个心跳),可以确保 schema 肯定生效(背后原因是因为有异步流程)
此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。