原子性问题
随着 NebulaGraph 产品的不断发展,我们越来越重视整个 NebulaGraph 分布式图数据库向分布式的图事务能力的演进,即将传统 OTLP 数据库的 ACID 事务性约束(原子性,一致性,隔离性,持久性)引入分布式的 NebulaGraph 及其所支持的图查询语言,如 nGQL。为了实现这个目标,我们开始有针对性地面向广大 NebulaGraph 社区伙伴和商业合作伙伴调研大家对 ACID 事务性的实际需求,用来指导我们对分布式图事务的技术方案设计和工程实现。欢迎感兴趣的伙伴在社区中一起讨论事务相关的问题。
在当前版本尚不具备完备的分布式图事务功能的情况下,如果实际应用需要一定的事务能力,也许可以考虑在应用层做开发来应对。下文使用 python 实现了一个简单的批量插入和回滚的功能,即在批量插入时,如果有一条插入失败了,整个批次被回滚。
使用 python 实现简单的批量插入及回滚
如下的 python 脚本使用了两个 list:todo 和 undo。在一个批次中每一条等待执行的语句都会被插入 todo list,同时将这条语句对应的回滚语句插入到 undo list 中(参考 genBatch() 方法)。在下面的脚本中,我们以插入一个批次的点为例,todo list 中是插入点的语句,undo list 中是将相应的点删除的语句。我们通过向预先定义的语句模版(参考 insertVertexTemplate 和 rollbackTemplate)中填入从 csv 文件中读入的数据来构造这些语句。
脚本启动后,先从 csv 文件中读取数据,按上述逻辑填充当前批次中相应的语句并插入相应的 list,然后调用 exeBatch() 方法执行该批次。若这一批次中有某一条语句未执行成功,客户端返回了错误,那么这条语句在这个批次中的标号将被返回并传入 rollback() 方法。rollback() 根据标号确定已经被正确执行的语句范围,调用它们相应的 undo 语句,将它们已经插入的点删除,完成回滚。
在执行语句的过程中,脚本以客户端返回的状态确定一条语句是否执行成功,若失败,exeQueryWithRetries() 方法会进行一定次数的重试(参考 retryTimes 全局变量)。若所有重试均失败,则放弃执行该条语句。如果是 todo 语句执行失败,回滚流程会被出发。如果是执行回滚的 undo 语句执行失败,脚本会记录相应的 error 日志。我们可以根据日志了解当前批次的执行和回滚情况。
具体的代码和 csv 例子详见:demo/atomic_batch_insert at main · xtcyclist/demo · GitHub
from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config
import csv
import logging
# Query template for todo queries
insertVertexTemplate = "INSERT VERTEX player(name, age) VALUES \"%s\":(\"%s\", %s)"
# Query template for undo queries that delete the inserted vertices
rollbackTemplate = "DELETE VERTEX \"%s\""
# Retry each query execution for the following times
retryTimes = 10
logging.basicConfig(level=logging.INFO)
def readCSV(filename):
data = []
with open(filename, mode = 'r') as f:
csvFile = csv.reader(f, delimiter = ',')
for line in csvFile:
data.append([line[0], line[1]])
return data
# Generate two lists of queries: todo and undo.
# (1) Append each query waiting to be executed into the todo list.
# (2) For each such query, prepare an counterpart that undos its changes in the database.
# Append this undo counterpart into the undo list.
# (3) The undo counterpart of the i-th todo query is the i-th query in the undo list.
def genBatch(data):
todo = []
undo = []
for player in data:
insert = insertVertexTemplate % (player[0], player[0], player[1])
rollback = rollbackTemplate % (player[0])
todo.append(insert)
undo.append(rollback)
# Ingest some errors for testing:
# undo[0] = insertVertexTemplate
# todo[4] = insertVertexTemplate
return todo, undo
# Execute the given query multiple times.
# Return a None as the result, if all attempted executions failed.
# The error code and message of the last attempt is logged.
def exeQueryWithRetries(query, session):
result = session.execute(query)
if result.is_succeeded():
return result
i = 0
while i < retryTimes:
logging.info("Executing %s." % query)
result = session.execute(query)
if not result.is_succeeded():
i = i + 1
else:
break
if i == retryTimes:
logging.error("Error %s (%d), while executing query %s." % (result.error_msg(), result.error_code(), query))
return None
elif result.is_succeeded():
return result
else:
raise Exception("Failed at trying to execute query %s." % (query))
# Execute queries in a batch.
def exeBatch(space, batch, session):
counter = 0
session.execute("use " + space)
for query in batch:
result = exeQueryWithRetries(query, session)
if result == None:
return counter
else:
counter = counter + 1
return counter
# Rollback the batch execution by executing the undo counterparts of all successfully executed queries.
def rollback(undo, progress, session):
count = 0
while (count < progress):
result = exeQueryWithRetries(undo[count], session)
if result == None:
logging.error("Rollback failed while executing the %d-th undo statement \"%s\"." % (count, undo[count]))
return False
else:
count = count + 1
if count == progress:
return True
else:
return False
# Main entry
if __name__ == "__main__":
# Load data to fill query templates from a csv
csvfile = "players.csv"
data = readCSV(csvfile)
todo, undo = genBatch(data)
config = Config()
config.max_connection_pool_size = 10
conn = ConnectionPool()
# IP and port of the nebula-graphd service
addr = "127.0.0.1"
port = 18588
# The default login
usr = "root"
pwd = "nebula"
# Initialize the connection pool
status = conn.init([(addr, port)], config)
if status:
with conn.session_context(usr, pwd) as session:
progress = exeBatch('nba', todo, session)
if (progress != len(todo)):
if rollback(undo, progress, session) == False:
logging.error("Rollback failed.")
else:
logging.warning("Bacth insert failed, with all inserted vertices rolled back.")
else:
logging.info("Batch insert succeeded.")
else:
logging.error("Connection pool initialization failed.")
conn.close()
未来的事务支持
上述 python 脚本并不能真正保证批量执行语句的原子性,比如 undo 语句的执行可能失败,客户端和数据库服务端的连接可能断开等等。同时,它也需要用户手动生成相应的语句和其回滚语句,仅仅适用于比较简单的场景。在未来的迭代中,NebulaGraph 中将在内核中逐渐支持分布式图事务能力。我们会在 NUC 2022 大会中向大家进一步地报告我们在分布式图事务上的工作。