在应用层实现简单的批量插入与失败后回滚功能

原子性问题

随着 NebulaGraph 产品的不断发展,我们越来越重视整个 NebulaGraph 分布式图数据库向分布式的图事务能力的演进,即将传统 OTLP 数据库的 ACID 事务性约束(原子性,一致性,隔离性,持久性)引入分布式的 NebulaGraph 及其所支持的图查询语言,如 nGQL。为了实现这个目标,我们开始有针对性地面向广大 NebulaGraph 社区伙伴和商业合作伙伴调研大家对 ACID 事务性的实际需求,用来指导我们对分布式图事务的技术方案设计和工程实现。欢迎感兴趣的伙伴在社区中一起讨论事务相关的问题。

在当前版本尚不具备完备的分布式图事务功能的情况下,如果实际应用需要一定的事务能力,也许可以考虑在应用层做开发来应对。下文使用 python 实现了一个简单的批量插入和回滚的功能,即在批量插入时,如果有一条插入失败了,整个批次被回滚。

使用 python 实现简单的批量插入及回滚

如下的 python 脚本使用了两个 list:todo 和 undo。在一个批次中每一条等待执行的语句都会被插入 todo list,同时将这条语句对应的回滚语句插入到 undo list 中(参考 genBatch() 方法)。在下面的脚本中,我们以插入一个批次的点为例,todo list 中是插入点的语句,undo list 中是将相应的点删除的语句。我们通过向预先定义的语句模版(参考 insertVertexTemplate 和 rollbackTemplate)中填入从 csv 文件中读入的数据来构造这些语句。

脚本启动后,先从 csv 文件中读取数据,按上述逻辑填充当前批次中相应的语句并插入相应的 list,然后调用 exeBatch() 方法执行该批次。若这一批次中有某一条语句未执行成功,客户端返回了错误,那么这条语句在这个批次中的标号将被返回并传入 rollback() 方法。rollback() 根据标号确定已经被正确执行的语句范围,调用它们相应的 undo 语句,将它们已经插入的点删除,完成回滚。

在执行语句的过程中,脚本以客户端返回的状态确定一条语句是否执行成功,若失败,exeQueryWithRetries() 方法会进行一定次数的重试(参考 retryTimes 全局变量)。若所有重试均失败,则放弃执行该条语句。如果是 todo 语句执行失败,回滚流程会被出发。如果是执行回滚的 undo 语句执行失败,脚本会记录相应的 error 日志。我们可以根据日志了解当前批次的执行和回滚情况。

具体的代码和 csv 例子详见:https://github.com/xtcyclist/demo/tree/main/atomic_batch_insert

from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config
import csv
import logging

# Query template for todo queries
insertVertexTemplate = "INSERT VERTEX player(name, age) VALUES \"%s\":(\"%s\", %s)"
# Query template for undo queries that delete the inserted vertices
rollbackTemplate = "DELETE VERTEX \"%s\""
# Retry each query execution for the following times
retryTimes = 10

logging.basicConfig(level=logging.INFO)

def readCSV(filename):
  data = []
  with open(filename, mode = 'r') as f:
    csvFile = csv.reader(f, delimiter = ',')
    for line in csvFile:
      data.append([line[0], line[1]])
  return data

# Generate two lists of queries: todo and undo.
# (1) Append each query waiting to be executed into the todo list.
# (2) For each such query, prepare an counterpart that undos its changes in the database.
#     Append this undo counterpart into the undo list.
# (3) The undo counterpart of the i-th todo query is the i-th query in the undo list.
def genBatch(data):
  todo = []
  undo = []
  for player in data:
    insert = insertVertexTemplate % (player[0], player[0], player[1])
    rollback = rollbackTemplate % (player[0])
    todo.append(insert)
    undo.append(rollback)
  # Ingest some errors for testing:
  # undo[0] = insertVertexTemplate
  # todo[4] = insertVertexTemplate
  return todo, undo

# Execute the given query multiple times.
# Return a None as the result, if all attempted executions failed.
# The error code and message of the last attempt is logged.
def exeQueryWithRetries(query, session):
  result = session.execute(query)
  if result.is_succeeded():
    return result
  i = 0
  while i < retryTimes:
    logging.info("Executing %s." % query)
    result = session.execute(query)
    if not result.is_succeeded():
      i = i + 1
    else:
      break
  if i == retryTimes:
    logging.error("Error %s (%d), while executing query %s." % (result.error_msg(), result.error_code(), query))
    return None
  elif result.is_succeeded():
    return result
  else:
    raise Exception("Failed at trying to execute query %s." % (query))

# Execute queries in a batch.
def exeBatch(space, batch, session):
  counter = 0
  session.execute("use " + space)
  for query in batch:
    result = exeQueryWithRetries(query, session)
    if result == None:
      return counter
    else:
      counter = counter + 1
  return counter

# Rollback the batch execution by executing the undo counterparts of all successfully executed queries.
def rollback(undo, progress, session):
  count = 0
  while (count < progress):
    result = exeQueryWithRetries(undo[count], session)
    if result == None:
      logging.error("Rollback failed while executing the %d-th undo statement \"%s\"." % (count, undo[count]))
      return False
    else:
      count = count + 1
  if count == progress:
    return True
  else:
    return False

# Main entry
if __name__ == "__main__":
  # Load data to fill query templates from a csv
  csvfile = "players.csv"
  data = readCSV(csvfile)
  todo, undo = genBatch(data)
  config = Config()
  config.max_connection_pool_size = 10
  conn = ConnectionPool()
  # IP and port of the nebula-graphd service
  addr = "127.0.0.1"
  port = 18588
  # The default login
  usr = "root"
  pwd = "nebula"
  # Initialize the connection pool
  status = conn.init([(addr, port)], config)
  if status:
    with conn.session_context(usr, pwd) as session:
      progress = exeBatch('nba', todo, session)
      if (progress != len(todo)):
        if rollback(undo, progress, session) == False:
          logging.error("Rollback failed.")
        else:
          logging.warning("Bacth insert failed, with all inserted vertices rolled back.")
      else:
        logging.info("Batch insert succeeded.")
  else:
    logging.error("Connection pool initialization failed.")
  conn.close()

未来的事务支持

上述 python 脚本并不能真正保证批量执行语句的原子性,比如 undo 语句的执行可能失败,客户端和数据库服务端的连接可能断开等等。同时,它也需要用户手动生成相应的语句和其回滚语句,仅仅适用于比较简单的场景。在未来的迭代中,NebulaGraph 中将在内核中逐渐支持分布式图事务能力。我们会在 NUC 2022 大会中向大家进一步地报告我们在分布式图事务上的工作。

4 个赞

很棒的例子:+1:t2:

请问一下哈,咱们的nebula为啥会出现连接超时这种问题呢?我们批量入一批数据有时候会报连接超时,虽然可以通过上述方案的重发解决,但是我理解连接超时和数据库事务没有直接联系,作为一个数据库不应该会出现连接超时这种错误

你的理解不对,数据库会关闭超时的连接,这是一个很传统的功能。可以在论坛里搜搜超时,看看改进办法。