我想咨询一下:
- Cpp 的 Graph、Meta和Storage Client 是线程安全的吗?
2.我们现在现在每个线程分别对应一个单独的Meta和Storage Client,50-150并发就会报错
E1112 18:34:58.516834 54496 StorageClient.inl:123] Request to [10.9*.16*.9*:44500] failed: N6apache6thrift9transport19TTransportExceptionE: AsyncSocketException: write time out during connection, type = Timed out
Write Time Out 是不是我们用客户端的用法不对
配置
12台Storage机器
Meta 和 Storage 的 IO Thread Pool 都是12
//creat client vector
vector<std::shared_ptr<nebula::storage::StorageClient>> clientVector;
vector<std::shared_ptr<nebula::meta::MetaClient>> metaVector;
vector<std::shared_ptr<folly::IOThreadPoolExecutor>> ioMetaVector;
vector<std::shared_ptr<folly::IOThreadPoolExecutor>> ioStorageVector;
for (int t = 0; t < thNum; ++t) {
//creat meta io-thread
std::shared_ptr<folly::IOThreadPoolExecutor> tmpIoMeta = std::make_shared<folly::IOThreadPoolExecutor>(
ioMeta);
nebula::meta::MetaClientOptions tmpoptions;
//get meta client
std::shared_ptr<nebula::meta::MetaClient> tmpmetaClient =
std::make_shared<nebula::meta::MetaClient>(tmpIoMeta, hostAddrs.value(), tmpoptions);
tmpmetaClient->waitForMetadReady();
//creat storage io-thread
std::shared_ptr<folly::IOThreadPoolExecutor> tmpIoStorage = std::make_shared<folly::IOThreadPoolExecutor>(
ioStorage);
std::shared_ptr<nebula::storage::StorageClient> tmpstorageClient =
std::make_shared<nebula::storage::StorageClient>(tmpIoStorage, tmpmetaClient.get());
metaVector.push_back(tmpmetaClient);
clientVector.push_back(tmpstorageClient);
ioMetaVector.push_back(tmpIoMeta);
ioStorageVector.push_back(tmpIoStorage);
}
vector<std::future<void>> future(thNum);
vector<bool> mGetStatus(thNum);
for (int t = 0; t < thNum; ++t) {
future[t] = async(launch::async,
[t, thNum, keys, clientVector] {
// Get 接口
// 通过 StorageClient 发送请求,相应的参数为 spaceId,以及要获取的 keys
std::cout << "get size = " << keys.size() << std::endl;
std::shared_ptr<nebula::storage::StorageClient> tmpstorageClient = clientVector.at(t);
auto start = std::chrono::system_clock::now();
auto futureGet = tmpstorageClient->get(1, std::move(keys), true);
// 获取结果
auto respGet = std::move(futureGet).get();
auto end = std::chrono::system_clock::now();
const auto &hostLantencys = respGet.hostLatency();
for (auto &item : hostLantencys) {
std::cout << std::this_thread::get_id() <<
"Host:" << std::get<0>(item).first << ",latency:" << std::get<1>(item)
<< ",e2eLatency:" << std::get<2>(item) << std::endl;
}
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
std::cout << std::this_thread::get_id() << "response use time = " << duration.count()
<< " us" << std::endl;
std::cout << std::this_thread::get_id() << "response completeness = "
<< respGet.completeness() << "%" << std::endl;
std::cout << std::this_thread::get_id() << "response max latency = "
<< respGet.maxLatency() << " us" << std::endl;
});
}
for (int t = 0; t < thNum; ++t) {
future[t].get();
}
std::cout << "Operation end " << 1 << std::endl;