nebula-python 多线程慢的小实验

论坛里经常有人问起 nebula-python 多线程慢的问题,用一个小实验说明一下。

nebula-python 主要是 2 个问题:

  1. nebula-python 解码是使用 python 原生代码来解码的,解码效率会比较慢。
  2. 受限于 python 的全局解释锁,nebula-python 多线程时最多只能用到1 个 cpu。

两个问题叠加起来的现象就是,如果查询有比较多的数据,那多线程效率是不会提升的,因为单核的 cpu 已经被打满。甚至有可能因为多线程的竞争,比不用多线程更慢一些。

下面是实验的脚本,使用 LDBC sf30 的数据,分别用普通代码,多线程,多进程运行 10 次语句。
case 1, 语句
go 2 steps from 2608 over KNOWS yield KNOWS._dst, KNOWS.creationDate as creationDate
返回条数 118567,服务端 latency 在 232645 us 左右
单个语句,耗时在 3 秒左右

normalExecutor: running in 29.527837991714478 seconds
threadExecutor: running in 29.018051624298096 seconds
processExecutor: running in 15.147454977035522 seconds

case 2 语句
go 2 steps from 2608 over KNOWS yield KNOWS._dst, KNOWS.creationDate as creationDate | limit 1000
返回条数 1000,服务端 latency 在 103957 us 左右

normalExecutor: running in 1.3806407451629639 seconds
threadExecutor: running in 0.32038378715515137 seconds
processExecutor: running in 0.19157886505126953 seconds

case 3 语句
go 2 steps from 2608 over KNOWS yield KNOWS._dst, KNOWS.creationDate as creationDate | yield count(*)
返回条数 1,服务端 latency 在 182921 us 左右

normalExecutor: running in 1.8793082237243652 seconds
threadExecutor: running in 0.28518009185791016 seconds
processExecutor: running in 0.27771878242492676 seconds

代码如下

import time
import concurrent
from nebula2.Config import Config
from nebula2.gclient.net import ConnectionPool


worker_num = 10
sessions = []
connection_pool = ConnectionPool()


def run(stmt, index):
    session = sessions[index]
    resp = session.execute(stmt)
    return resp


def threadExecutor(stmt):
    with concurrent.futures.ThreadPoolExecutor(max_workers=worker_num) as executor:
        runner = []
        start = time.time()
        for i in range(worker_num):
            future = executor.submit(run, stmt, i)
            runner.append(future)
        for future in concurrent.futures.as_completed(runner):
            if future.exception() is not None:
                raise future.exception()
            else:
                rs = future.result()
                if rs is not None:
                    print("row size is {}".format(rs.row_size()))
                    print("latency is {}".format(rs.latency()))
    print("threadExecutor: running in {} seconds".format(time.time() - start))


def processExecutor(stmt):
    start = time.time()

    with concurrent.futures.ProcessPoolExecutor(max_workers=worker_num) as executor:
        runner = []
        start = time.time()
        for i in range(worker_num):
            future = executor.submit(run, stmt, i)
            runner.append(future)
        for future in concurrent.futures.as_completed(runner):
            if future.exception() is not None:
                raise future.exception()
            else:
                rs = future.result()
                if rs is not None:
                    print("row size is {}".format(rs.row_size()))
                    print("latency is {}".format(rs.latency()))
    print("processExecutor: running in {} seconds".format(time.time() - start))


def normalExecutor(stmt):
    start = time.time()
    for i in range(worker_num):
        rs = run(stmt, i)
        print("row size is {}".format(rs.row_size()))
        print("latency is {}".format(rs.latency()))
    print("normalExecutor: running in {} seconds".format(time.time() - start))


if __name__ == "__main__":
    stmt = "go 2 steps from 2608 over KNOWS yield KNOWS._dst, KNOWS.creationDate as creationDate| yield count(*)"
    config = Config()
    config.max_connection_pool_size = worker_num
    assert connection_pool.init([("192.168.15.13", 1669)], config)
    for _ in range(worker_num):
        sess = connection_pool.get_session("root", "nebula")
        sess.execute("use sf30")
        sessions.append(sess)

    normalExecutor(stmt)
    threadExecutor(stmt)
    processExecutor(stmt)

结论:

  1. 当查询的数据比较多的时候,受限于全局解释性锁,多线程并不会比串行执行好很多,因为解码这个时候是 cpu 密集
  2. 当查询的数据比较少的时候,尤其是 count(*) 比较明显,因为这个时候是 io 等待了,多线程会比非多线程快很多。
9 个赞

太赞了:+1:t2: