论坛里经常有人问起 nebula-python 多线程慢的问题,用一个小实验说明一下。
nebula-python 主要是 2 个问题:
- nebula-python 解码是使用 python 原生代码来解码的,解码效率会比较慢。
- 受限于 python 的全局解释锁,nebula-python 多线程时最多只能用到1 个 cpu。
两个问题叠加起来的现象就是,如果查询有比较多的数据,那多线程效率是不会提升的,因为单核的 cpu 已经被打满。甚至有可能因为多线程的竞争,比不用多线程更慢一些。
下面是实验的脚本,使用 LDBC sf30 的数据,分别用普通代码,多线程,多进程运行 10 次语句。
case 1, 语句
go 2 steps from 2608 over KNOWS yield KNOWS._dst, KNOWS.creationDate as creationDate
返回条数 118567,服务端 latency 在 232645 us 左右
单个语句,耗时在 3 秒左右
normalExecutor: running in 29.527837991714478 seconds
threadExecutor: running in 29.018051624298096 seconds
processExecutor: running in 15.147454977035522 seconds
case 2 语句
go 2 steps from 2608 over KNOWS yield KNOWS._dst, KNOWS.creationDate as creationDate | limit 1000
返回条数 1000,服务端 latency 在 103957 us 左右
normalExecutor: running in 1.3806407451629639 seconds
threadExecutor: running in 0.32038378715515137 seconds
processExecutor: running in 0.19157886505126953 seconds
case 3 语句
go 2 steps from 2608 over KNOWS yield KNOWS._dst, KNOWS.creationDate as creationDate | yield count(*)
返回条数 1,服务端 latency 在 182921 us 左右
normalExecutor: running in 1.8793082237243652 seconds
threadExecutor: running in 0.28518009185791016 seconds
processExecutor: running in 0.27771878242492676 seconds
代码如下
import time
import concurrent
from nebula2.Config import Config
from nebula2.gclient.net import ConnectionPool
worker_num = 10
sessions = []
connection_pool = ConnectionPool()
def run(stmt, index):
session = sessions[index]
resp = session.execute(stmt)
return resp
def threadExecutor(stmt):
with concurrent.futures.ThreadPoolExecutor(max_workers=worker_num) as executor:
runner = []
start = time.time()
for i in range(worker_num):
future = executor.submit(run, stmt, i)
runner.append(future)
for future in concurrent.futures.as_completed(runner):
if future.exception() is not None:
raise future.exception()
else:
rs = future.result()
if rs is not None:
print("row size is {}".format(rs.row_size()))
print("latency is {}".format(rs.latency()))
print("threadExecutor: running in {} seconds".format(time.time() - start))
def processExecutor(stmt):
start = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=worker_num) as executor:
runner = []
start = time.time()
for i in range(worker_num):
future = executor.submit(run, stmt, i)
runner.append(future)
for future in concurrent.futures.as_completed(runner):
if future.exception() is not None:
raise future.exception()
else:
rs = future.result()
if rs is not None:
print("row size is {}".format(rs.row_size()))
print("latency is {}".format(rs.latency()))
print("processExecutor: running in {} seconds".format(time.time() - start))
def normalExecutor(stmt):
start = time.time()
for i in range(worker_num):
rs = run(stmt, i)
print("row size is {}".format(rs.row_size()))
print("latency is {}".format(rs.latency()))
print("normalExecutor: running in {} seconds".format(time.time() - start))
if __name__ == "__main__":
stmt = "go 2 steps from 2608 over KNOWS yield KNOWS._dst, KNOWS.creationDate as creationDate| yield count(*)"
config = Config()
config.max_connection_pool_size = worker_num
assert connection_pool.init([("192.168.15.13", 1669)], config)
for _ in range(worker_num):
sess = connection_pool.get_session("root", "nebula")
sess.execute("use sf30")
sessions.append(sess)
normalExecutor(stmt)
threadExecutor(stmt)
processExecutor(stmt)
结论:
- 当查询的数据比较多的时候,受限于全局解释性锁,多线程并不会比串行执行好很多,因为解码这个时候是 cpu 密集
- 当查询的数据比较少的时候,尤其是 count(*) 比较明显,因为这个时候是 io 等待了,多线程会比非多线程快很多。