Meta Storage Client 线程安全

我想咨询一下:

  1. Cpp 的 Graph、Meta和Storage Client 是线程安全的吗?
    2.我们现在现在每个线程分别对应一个单独的Meta和Storage Client,50-150并发就会报错
E1112 18:34:58.516834 54496 StorageClient.inl:123] Request to [10.9*.16*.9*:44500] failed: N6apache6thrift9transport19TTransportExceptionE: AsyncSocketException: write time out during connection, type = Timed out

Write Time Out 是不是我们用客户端的用法不对

配置
12台Storage机器
Meta 和 Storage 的 IO Thread Pool 都是12

//creat client vector
    vector<std::shared_ptr<nebula::storage::StorageClient>> clientVector;
    vector<std::shared_ptr<nebula::meta::MetaClient>> metaVector;
    vector<std::shared_ptr<folly::IOThreadPoolExecutor>> ioMetaVector;
    vector<std::shared_ptr<folly::IOThreadPoolExecutor>> ioStorageVector;

    for (int t = 0; t < thNum; ++t) {

        //creat meta io-thread
        std::shared_ptr<folly::IOThreadPoolExecutor> tmpIoMeta = std::make_shared<folly::IOThreadPoolExecutor>(
                ioMeta);
        nebula::meta::MetaClientOptions tmpoptions;
        //get meta client
        std::shared_ptr<nebula::meta::MetaClient> tmpmetaClient =
                std::make_shared<nebula::meta::MetaClient>(tmpIoMeta, hostAddrs.value(), tmpoptions);
        tmpmetaClient->waitForMetadReady();

        //creat storage io-thread
        std::shared_ptr<folly::IOThreadPoolExecutor> tmpIoStorage = std::make_shared<folly::IOThreadPoolExecutor>(
                ioStorage);

        std::shared_ptr<nebula::storage::StorageClient> tmpstorageClient =
                std::make_shared<nebula::storage::StorageClient>(tmpIoStorage, tmpmetaClient.get());

        metaVector.push_back(tmpmetaClient);
        clientVector.push_back(tmpstorageClient);

        ioMetaVector.push_back(tmpIoMeta);
        ioStorageVector.push_back(tmpIoStorage);
    }

    vector<std::future<void>> future(thNum);
    vector<bool> mGetStatus(thNum);

    for (int t = 0; t < thNum; ++t) {
        future[t] = async(launch::async,
                          [t, thNum, keys, clientVector] {
                              // Get 接口
                              // 通过 StorageClient 发送请求,相应的参数为 spaceId,以及要获取的 keys
                              std::cout << "get size = " << keys.size() << std::endl;
                              std::shared_ptr<nebula::storage::StorageClient> tmpstorageClient = clientVector.at(t);
                              auto start = std::chrono::system_clock::now();

                              auto futureGet = tmpstorageClient->get(1, std::move(keys), true);
                              // 获取结果
                              auto respGet = std::move(futureGet).get();
                              auto end = std::chrono::system_clock::now();
                              const auto &hostLantencys = respGet.hostLatency();
                              for (auto &item : hostLantencys) {
                                  std::cout << std::this_thread::get_id() <<
                                            "Host:" << std::get<0>(item).first << ",latency:" << std::get<1>(item)
                                            << ",e2eLatency:" << std::get<2>(item) << std::endl;
                              }
                              auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
                              std::cout << std::this_thread::get_id() << "response use time = " << duration.count()
                                        << " us" << std::endl;
                              std::cout << std::this_thread::get_id() << "response completeness = "
                                        << respGet.completeness() << "%" << std::endl;
                              std::cout << std::this_thread::get_id() << "response max latency = "
                                        << respGet.maxLatency() << " us" << std::endl;
                          });
    }

    for (int t = 0; t < thNum; ++t) {
        future[t].get();
    }

    std::cout << "Operation end " << 1 << std::endl;
1 个赞

你好,你当前版本是1.0 还是 2.0 ?如果是1.0的话,目前所有client都是线程不安全的,具体可以参考这里:https://discuss.nebula-graph.com.cn/t/topic/1489/4。

另外,这里有一个最近相关的答复,看起来和你的问题相似,你可以参考下:关于graphd和storage的线程模型

不是线程安全,打出来latatency多少? 看起来并发一高都超时了

E1117 11:16:05.656872 46447 StorageClient.inl:123] Request to [10.97.162.206:44500] failed: N6apache6thrift9transport19TTransportExceptionE: AsyncSocketException: write timed out during connection, type = Timed out
140034575853312Host:174170777,latency:1331,e2eLatency:12999
140034575853312Host:174170798,latency:1118,e2eLatency:62702
140034575853312Host:174170679,latency:1425,e2eLatency:65604
140034575853312Host:174170681,latency:1354,e2eLatency:59307
140034575853312Host:174170834,latency:1635,e2eLatency:66143
140034575853312Host:174170685,latency:1531,e2eLatency:65137
140034575853312Host:174170786,latency:1534,e2eLatency:59990
140034575853312Host:174170662,latency:1705,e2eLatency:60732
140034575853312Host:174173810,latency:1412,e2eLatency:69292
140034575853312Host:174170715,latency:1497,e2eLatency:63439
140034575853312Host:174170246,latency:1511,e2eLatency:64306
140034575853312response use time = 1015379 us
140034575853312response completeness = 91%
140034575853312response max latency = 1705 us

这是150并发时延情况,12台机器中一个request超时了,e2e latency也比服务端时延高很多,一次查1500条,每条3kb,每个Meta和Storage Client IOThreadPool都是 12 ,机器是 88core。

  1. 我理解 关于graphd和storage的线程模型 这个帖子说的,Storage Client 线程模型应该跟Graph Client 类似,并发线程上限跟IOThreadPool大小有关,是不是我的IOThreadPool数目配错了。

  2. 多台机器起Client可以提高并发吗?,不知道底层Server能抗住吗

有个参数可以改超时时间 默认就是你图里的60秒 目前看单个机器并发不行了 你可以上多台机器

好的,那Client用法是一个线程配一个单独Client吗?每个Client给多少IOThreadPool合适

那Client用法是一个线程配一个单独Client吗?

是的,客户端都是非线程安全的

每个Client给多少IOThreadPool合适

IOThreadPool 是个pool,你每个client会从IOThreadPool里面拿executor,你只要保证client的数量小于等于IOThreadPool 里面executor的数量就可以。

2 个赞
  1. IOThreadPool 是在外围定义一个大的Pool所有线程Client共用的吗?

  2. 我用下面的方式初始化多线程客户端 ,ioThreadNum=1,错误率和时延都还ok,但是加大ioThreadNum 时延和错误反而上升了,这是什么原因,官方能提供一下多线程的Client使用样例吗?

    for (int t = 0; t < thNum; ++t) {
        //get ioThreadPool
        std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(
                ioThreadNum);
        nebula::meta::MetaClientOptions tmpoptions;
        //get meta client
        std::shared_ptr<nebula::meta::MetaClient> tmpmetaClient =
                std::make_shared<nebula::meta::MetaClient>(ioThreadPool, hostAddrs.value(), tmpoptions);
        tmpmetaClient->waitForMetadReady();
        //get storage client
        std::shared_ptr<nebula::storage::StorageClient> tmpstorageClient =
                std::make_shared<nebula::storage::StorageClient>(ioThreadPool, tmpmetaClient.get());

        metaVector.push_back(tmpmetaClient);
        clientVector.push_back(tmpstorageClient);
    }

谢谢

1个thread1个client,所有thread共享一个ioThreadPool

1 个赞