作者简介:
阿旺(奇富科技)
本人是一名 java 老菜鸟,工作内容并非 C++ 方向,平时也很少接触 C++ 语言,不懂的代码都是使用 GPT 作代码辅助解释,故难免存在准确性问题,但大体思路应该没太大问题,如有错误问题还望评论区指出回复,相互学习共同成长。以下内容源自我在使用 NebulaGraph 过程中遇到的一些问题和思考。我通过查阅源码、动手实践,解决了一些实际生产中的疑难问题。也希望借此与大家分享经验,探讨更多技术细节。同时,也想鼓励每一位技术人——不要给自己轻易设限,勇于突破边界,探索新的领域。
nebula源码分支: release-3.6
内容大纲
- NebulaGraph 目录结构说明
- NebulaGraph 启动会运行多少个rocksdb实例
- NebulaGraph 是如何查询一个点的数据
- NebulaGraph 中 value 是如何编码的
- NebulaGraph 中 TTL 是如何实现的
- NebulaGraph 中如何管理 session
- NebulaGraph 错误日志中未知错误码如何定位原因
一、NebulaGraph 源码 src 目录结构
src
|-clients : 内部服务客户端,主要用于 graphd、metad、storaged 内部服务 RPC 调用
|-codec :nebula 数据的 key-value 编码和解码
|-daemons :启动入口,graphd、metad、storaged 启动
|-graph :graphd 相关业务实现,graphd 主要负责处理客户端请求,属于 nebula 的计算层
|-interface : thrift 接口定义,nebula 采用 thrift 做异构语言 rpc 调用,nebula 客户端与 nebula 交互接口皆定义于此
|-kvstore :键值存储引擎,storaged 和 metad 都会用到,比如:rocksdb 和 hbase,nebula 默认采用 rocksdb 作为内嵌存储
|-meta :元数据相关,比如:session 会话、用户信息、分片信息等等,属于nebula元数据管理层
|-parser : 语法解析
|-storage : nebula 存储相关,提供一些简单的kv存储和查询的接口服务给graphd调用
温馨提示: nebula 通过 CMake 进行编译打包,如果想了解打包过程和依赖信息见每个目录的 CMakeLists.txt 文件
二、单个 nebula-storaged 启动时会运行多少个 rocksdb 实例
结论: 数据存储磁盘目录数 *space总数
数据存储磁盘目录: etc/nebula-storaged.conf 中 data_path
比如:有2个 space,3个数据目录,则 rocksdb 实例数=2*3=6
相关知识点:
space 和 part 分区数越多 nebula-storaged 启动越慢
大体过程:
1.StorageDaemon::main() 程序启动入口
2.StorageServer::start() 启动存储引擎
3.StorageServer::getStoreInstance() 初始化存储引擎
4.NebulaStore::init() nebula存储引擎初始化
5.NebulaStore::loadPartFromDataPath() 通过获取data_path存储目录路径,为每个目录下space启动一个rocksdb实例,并且建立了space和kvEngine的映射关系,以及partId和part的映射关系,后面查询或者更新实则是定位数据partId,然后通过partId获取到part进行操作,part中保留了kvEngine的引用
最终构建spaces_,里面包含了SpaceId和spacePart分片的映射关系,后面所有的读写都需要此映射关系操作对应的rocksdb,这里的映射关系类似于java中SpringBean
std::unordered_map<GraphSpaceID, std::shared_ptr<SpacePartInfo>> spaces_;
struct SpacePartInfo {
//Part中保留了engin的引用,当前版本engine其实就是rocksdb
std::unordered_map<PartitionID, std::shared_ptr<Part>> parts_;
std::vector<std::unique_ptr<KVEngine>> engines_;
}
关键代码
// 创建storageServer并启动
StorageDaemon::main(){
auto storageServer = std::make_unique<nebula::storage::StorageServer>(
localhost, metaAddrsRet.value(), paths, FLAGS_wal_path, FLAGS_listener_path);
storageServer->start()
}
// 初始化kv存储引擎
StorageServer::start(){
LOG(INFO) << "Init kvstore";
kvstore_ = getStoreInstance();
}
// 实例化nebula存储引擎并初始化
StorageServer::getStoreInstance(){
auto nbStore = std::make_unique<kvstore::NebulaStore>(
std::move(options), ioThreadPool_, localHost_, workers_);
nbStore->init()
}
// 从data_path中载入part分区数据
NebulaStore::init(){
loadPartFromDataPath()
}
//遍历data_path目录为每个space创建一个rocksdb引擎
NebulaStore::loadPartFromDataPath(){
std::vector<folly::Future<std::pair<GraphSpaceID, std::unique_ptr<KVEngine>>>> futures;
std::vector<std::string> enginesPath;
for (auto& path : options_.dataPaths_) {
//这里其实就是data_path,比如:/usr/local/nebula/data
auto rootPath = folly::stringPrintf("%s/nebula", path.c_str());
//获取每个data_pah子目录,其实就是space
// 比如 /usr/local/nebula/data/1、/usr/local/nebula/data/2
auto dirs = fs::FileUtils::listAllDirsInDir(rootPath.c_str());
//遍历space目录,为每个space目录创建一个rocksdb
for (auto& dir : dirs) {
//目录名称就是spaceId,比如/usr/local/nebula/data/1中的1就是spaceId
GraphSpaceID spaceId = folly::to<GraphSpaceID>(dir);
// 目录为0的跳过
if (spaceId == 0) {
// skip the system space, only handle data space here.
continue;
}
enginesPath.emplace_back(rootPath + "/" + dir);
//异步创建rocksdb存储引擎
futures.emplace_back(newEngineAsync(spaceId, path, options_.walPath_));
}
// space和kvEngine列表的映射关系
std::unordered_map<GraphSpaceID, std::vector<std::unique_ptr<KVEngine>>> spaceEngines;
// 等待并获取前面的异步创建引擎列表
auto tries = folly::collectAll(futures).get();
for (auto& t : tries) {
//p就是个二元组,第一个元素为space,第二个元素为kvEngine
auto&& p = t.value();
auto spaceIt = spaceEngines.find(p.first);
if (spaceIt == spaceEngines.end()) {
spaceIt = spaceEngines.emplace(p.first, std::vector<std::unique_ptr<KVEngine>>()).first;
}
spaceIt->second.emplace_back(std::move(p.second));
}
// 遍历所有space存储引擎
for (auto& spaceEngine : spaceEngines) {
GraphSpaceID spaceId = spaceEngine.first;
for (auto& engine : spaceEngine.second) {
// part分片与part副本节点集合,后面raft需要用到
std::map<PartitionID, Peers> partRaftPeers;
partRaftPeers.emplace(partId, raftPeers);
//建立起partId和part映射关系,后面操作某partId的数据时就是通过part进行的,part中其实保留了kvEngine的引用
for (auto& it : partRaftPeers) {
auto part = newPart(spaceId, partId, enginePtr, isLearner, addrs);
auto ret = iter->second->parts_.emplace(partId, part);
}
}
}
}
}
// 创建rocksdb引擎
NebulaStore::newEngineAsync() {
engine = std::make_unique<RocksEngine>(
spaceId, vIdLen, dataPath, walPath, options_.mergeOp_, cfFactory);
}
三、NebulaGraph 是如何查询一个点的数据
graphd负责接受client客户端请求,并解析 nGQL,生成查询计划,再经过优化器,最终执行executor,为了突出重点省去从接受请求开始到调用executor,而直接定位executor,这边以GetVerticesExecutor.cpp为例来探究如何查找vertex点的数据,主要探究vertexId如何定位到数据
相关知识点
- part数和storaged查询并发数成正比
- vid和partId映射计算公式: partId = hash(vid) % numParts + 1
大体过程
//nebula-graphd
1.GetVerticesExecutor::getVertices() 获取多个点属性列表
2.StorageClient::getProps() 计算vertexId对应的part,然后请求对应part分片数据
//nebula-storaged
3.GraphStorageServiceHandler::future_getProps() storagd收到graphd来的请求进行处理
4.GetPropProcessor::doProcess() 构建tag属性查询计划并调用
5.TagNode::doExecute() 通过vid和tagId构建rocksdb中key,访问rocksdb获取value属性
6.GetTagPropNode::doExecute() 收集vid对应多个tag value属性列表,并过滤出查询返回结果保留的prop属性
关键源码
nebula-graphd部分
// 通过上下文获取到storageClient然后调用获取属性的方法
GetVerticesExecutor::getVertices() {
StorageClient *storageClient = qctx()->getStorageClient();
storageClient->getProps(param,
std::move(vertices),
gv->props(),
nullptr,
gv->exprs(),
gv->dedup(),
gv->orderBy(),
gv->getValidLimit(),
gv->filter())
}
// 通过vertexId获取到part所在节点信息
// 请求对应节点的vertexId信息
StorageRpcRespFuture<cpp2::GetPropResponse> StorageClient::getProps(){
// 建立vertexId和part节点映射关系
auto status = clusterIdsToHosts(param.space, input.rows, std::move(cbStatus).value());
// 为不同的节点构建对应的request
cpp2::GetPropRequest> requests;
for (auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.space_id_ref() = param.space;
req.parts_ref() = std::move(c.second);
}
//调用rpc请求storaged获取点的属性信息
return collectResponse(
param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::GetPropRequest& r) {
return client->future_getProps(r);
});
}
//通过(hash(vid) % numParts + 1)计算出对应的part
// 这种方式也预示着space设定的分区数量一旦创建不可更改,否则数据就会映射不上
StorageClientBase::clusterIdsToHosts(){
//获取space空间有多少个part
auto status = metaClient_->partsNum(spaceId);
std::unordered_map<PartitionID, HostAddr> leaders;
//建立起partId和leader节点的映射关系
for (int32_t partId = 1; partId <= numParts; ++partId) {
auto leader = getLeader(spaceId, partId);
leaders[partId] = std::move(leader).value();
}
//
for (auto& id : ids) {
// 计算vid对应的partId
PartitionID part = vid % numParts + 1;
const auto& leader = leaders[part];
clusters[leader][part].emplace_back(std::move(id));
}
return clusters;
}
nebula-storaged部分
folly::Future<cpp2::GetPropResponse> GraphStorageServiceHandler::future_getProps(
const cpp2::GetPropRequest& req) {
auto* processor = GetPropProcessor::instance(env_, &kGetPropCounters, readerPool_.get());
RETURN_FUTURE(processor);
}
GetPropProcessor::doProcess(){
if (!FLAGS_query_concurrently) {
//为了方便理解,这里直接选择单线程
runInSingleThread(req);
} else {
runInMultipleThread(req);
}
}
GetPropProcessor::runInSingleThread(){
// 构建tag查询计划,这个计划相当于构建一个DAG
// 里面有完整的依赖信息
auto plan = buildTagPlan(&contexts_.front(), &resultDataSet_);
// 遍历待查询的part分区
for (const auto& partEntry : req.get_parts()) {
auto partId = partEntry.first;
for (const auto& row : partEntry.second) {
auto vId = row.values[0].getStr();
// 执行查询计划
plan.go(partId, vId);
}
}
}
// 构建执行计划DAG
GetPropProcessor::buildTagPlan(RuntimeContext* context,
nebula::DataSet* result) {
StoragePlan<VertexID> plan;
std::vector<TagNode*> tags;
for (const auto& tc : tagContext_.propContexts_) {
// TagNode获取vid指定tag的属性列表,TagNode will return a DataSet of specified props of tagId
// 这里面就有从rocksdb中读取tag的value属性
// 查询语句指定多少个tag,这里就会有多少个TagNode
// 比如:fetch prop on tag_1,tag_2,tag_3 idxxx,那这里就有3个TagNode
auto tag = std::make_unique<TagNode>(context, &tagContext_, tc.first, &tc.second);
tags.emplace_back(tag.get());
plan.addNode(std::move(tag));
}
// GetTagPropNode用于收集vid对应多个tag的属性
auto output = std::make_unique<GetTagPropNode>(
context, tags, result, filter_ == nullptr ? nullptr : filter_->clone(), limit_, &tagContext_);
for (auto* tag : tags) {
output->addDependency(tag);
}
plan.addNode(std::move(output));
return plan;
}
TagNode::doExecute(){
// 根据vid和tagId构建rocksdb中key
key_ = NebulaKeyUtils::tagKey(context_->vIdLen(), partId, vId, tagId_);
// 从响应分片的rocksdb中获取value,此value为经过RowWriter编码的value
ret = context_->env()->kvstore_->get(context_->spaceId(), partId, key_, &value_);
// RowReaderWrapper设置schema和对应的value,后面用于GetTagPropNode收集prop做准备
reader_.reset(*schemas_, value_);
}
// 收集vid对应所有tag的属性值列表
GetTagPropNode::doExecute(){
// 这个集合用于保存查询到的vid对应不同tag的属性集合
List row;
// vertexId is the first column
if (context_->isIntId()) {
row.emplace_back(*reinterpret_cast<const int64_t*>(vId.data()));
} else {
row.emplace_back(vId);
}
// 遍历本次要查询的所有tag
for (auto* tagNode : tagNodes_) {
ret = tagNode->collectTagPropsIfValid(
// 收集tagNode中属性到row中
[&row, vIdLen, isIntId, tagNode, this](){
auto status = QueryUtils::collectVertexProps(
key, vIdLen, isIntId, reader, props, row, expCtx_.get(), tagNode->getTagName());
});
}
}
QueryUtils::collectVertexProps(){
// 遍历tag中的属性列表,挨个的取出来放入到list中,这个list就是GetTagPropNode::doExecute中的row
for (const auto& prop : *props) {
auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop);
list.emplace_back(value.value());
}
}
// 读取属性值
QueryUtils::readVertexProp(){
switch (prop.propInKeyType_) {
// prop in value
case PropContext::PropInKeyType::NONE: {
// ddl中的属性,这里的readValue就是RowReader根据schame获取字段的offset,然后获取对应的field值
return readValue(reader, prop.name_, prop.field_);
}
case PropContext::PropInKeyType::VID: {
// 属性类型为vid
auto vId = NebulaKeyUtils::getVertexId(vIdLen, key);
if (isIntId) {
return *reinterpret_cast<const int64_t*>(vId.data());
} else {
return vId.subpiece(0, vId.find_first_of('\0')).toString();
}
}
case PropContext::PropInKeyType::TAG: {
// 属性类型为tagId
auto tag = NebulaKeyUtils::getTagId(vIdLen, key);
return tag;
}
}
}
四、NebulaGraph 中 value 是如何编码的
从2.x开始起nebula使用新的编码方式version 2,由5个部分组成:
<header> <schema version> <NULL flags> <all properties> <string content>
| | | |
1 byte 0 - 7 bytes 0+ bytes N bytes
header: 占用1个字节,用来标识schema version字节长度,目前取值0~7
schema version: 0~7个字节,标识schema版本
NULL flags: NULL字段标识,跟mysql设计类似,为NULL的字段就在对应的bit位设置为1
all properties: 固定长度的属性值,根据schema进行排列,由于固定长度所以想获取某个字段的值可以直接计算offset+length就能拿到, 除 STRING 类型属性外,所有属性都就地存储。STRING 属性将字符串内容的偏移量存储在前 4 个字节中,将字符串的长度存储在后 4 个字节中。字符串内容将附加到编码字符串的末尾,相当于多了一层指针
string content: 存储非固定长度的string字符串,获取string属性内容得多次计算,先计算string字符串指针位置,然后在根据指针报错的offset+length得到完整的string内容
各类型占用内存空间大小:
BOOL (1 byte)
INT8 (1 byte)
INT16 (2 bytes)
INT32 (4 bytes)
INT64 (8 bytes)
FLOAT (4 bytes)
DOUBLE (8 bytes)
STRING (8 bytes) *
FIXED_STRING (Length defined in the schema)
TIMESTAMP (8 bytes)
DATE (4 bytes)
DATETIME (15 bytes)
GEOGRAPHY (8 bytes) *
关键代码:
RowWriterV2.cpp和RowWriterV2.h
五、NebulaGraph 中 TTL 是如何实现的
nebula中TTL是通过rocksdb中KVFilter实现的,KVFilter是rocksdb对外保留的扩展接口,查询和compact时用来做数据的过滤,数据每次搜索时都会经过此Filter,nebula实现此接口用来过滤TTL过期的数据,以及rocksdb做compact时筛选出过期的数据让其删除。
相关知识点:
TTL到期的数据可能并未删除,仅仅只是查询不出来,如果此时加大TTL或者删除TTL,原来查询不到的数据又会查询出来
// KVFilter在启动的rocksdb的时候就需要设置进去,所以这里直接看rocksdb启动的代码即可
StorageServer::getStoreInstance(){
kvstore::KVOptions options;
// 这个StorageCompactionFilter就是nebula给予VKFilter实现的
options.compaction_filter_factory = std::make_unique<StorageCompactionFilterFactoryBuilder>(schemaMan_.get(), indexMan_.get());
status = rocksdb::DB::Open(options, path, &db);
}
class StorageCompactionFilter final : public kvstore::KVFilter {
//这里是filter的入口
bool filter(int level,
GraphSpaceID spaceId,
const folly::StringPiece& key,
const folly::StringPiece& val) const override {
if (level < FLAGS_min_level_for_custom_filter) {
// for upper level such as L0/L1, we don't go through the custom
// validation to achieve better performance
return false;
}
if (NebulaKeyUtils::isTag(vIdLen_, key)) {
return !tagValid(spaceId, key, val);
} else if (NebulaKeyUtils::isEdge(vIdLen_, key)) {
return !edgeValid(spaceId, key, val);
} else if (IndexKeyUtils::isIndexKey(key)) {
return !indexValid(spaceId, key, val);
} else if (!FLAGS_use_vertex_key && NebulaKeyUtils::isVertex(key)) {
return true;
} else if (NebulaKeyUtils::isLock(vIdLen_, key)) {
return !lockValid(spaceId, key);
} else {
// skip uuid/system/operation
VLOG(3) << "Skip the system key inside, key " << key;
}
return false;
}
//判断tag是否有效,这里其实就是比对ttl字段的值对比当前时间是否超过ttl
private:
bool tagValid(GraphSpaceID spaceId,
const folly::StringPiece& key,
const folly::StringPiece& val) const {
......
if (ttlExpired(schema.get(), reader.get())) {
VLOG(3) << "Ttl expired";
return false;
}
return true;
}
//判断边是否过期
bool edgeValid(GraphSpaceID spaceId,
const folly::StringPiece& key,
const folly::StringPiece& val) const {
....
if (ttlExpired(schema.get(), reader.get())) {
VLOG(3) << "Ttl expired";
return false;
}
return true;
}
// TODO(panda) Optimize the method in the future
bool ttlExpired(const meta::NebulaSchemaProvider* schema,
nebula::RowReaderWrapper* reader) const {
if (schema == nullptr) {
return true;
}
auto ttl = CommonUtils::ttlProps(schema);
// Only support the specified ttl_col mode
// Not specifying or non-positive ttl_duration behaves like ttl_duration =
// infinity
if (!ttl.first) {
return false;
}
//这里比对数据的ttl字段的值和当前时间,如果过期就返回true
return CommonUtils::checkDataExpiredForTTL(schema, reader, ttl.second.second, ttl.second.first);
}
bool ttlExpired(const meta::NebulaSchemaProvider* schema, const Value& v) const {
if (schema == nullptr) {
return true;
}
auto ttl = CommonUtils::ttlProps(schema);
if (!ttl.first) {
return false;
}
return CommonUtils::checkDataExpiredForTTL(schema, v, ttl.second.second, ttl.second.first);
}
};
//判断ttl是否过期
bool CommonUtils::checkDataExpiredForTTL(const meta::NebulaSchemaProvider* schema,
const Value& v,
const std::string& ttlCol,
int64_t ttlDuration) {
const auto& ftype = schema->getFieldType(ttlCol);
if (ftype != nebula::cpp2::PropertyType::TIMESTAMP &&
ftype != nebula::cpp2::PropertyType::INT64) {
return false;
}
int64_t now;
// The unit of ttl expiration unit is controlled by user, we just use a gflag here.
if (!FLAGS_ttl_use_ms) {
now = std::time(nullptr);
} else {
auto t = std::chrono::system_clock::now();
now = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch()).count();
}
// if the value is not INT type (sush as NULL), it will never expire.
// TODO (sky) : DateTime
if (v.isInt() && (now > (v.getInt() + ttlDuration))) {
VLOG(2) << "ttl expired";
return true;
}
return false;
}
//从ddl的schema里面读取ttl_duration和ttl_col字段
std::pair<bool, std::pair<int64_t, std::string>> CommonUtils::ttlProps(
const meta::NebulaSchemaProvider* schema) {
DCHECK(schema != nullptr);
const auto* ns = dynamic_cast<const meta::NebulaSchemaProvider*>(schema);
const auto sp = ns->getProp();
int64_t duration = 0;
if (sp.get_ttl_duration()) {
duration = *sp.get_ttl_duration();
}
std::string col;
if (sp.get_ttl_col()) {
col = *sp.get_ttl_col();
}
return std::make_pair(!(duration <= 0 || col.empty()), std::make_pair(duration, col));
}
//获取ttl_col字段的值
StatusOr<Value> CommonUtils::ttlValue(const meta::NebulaSchemaProvider* schema,
RowReaderWrapper* reader) {
DCHECK(schema != nullptr);
const auto* ns = dynamic_cast<const meta::NebulaSchemaProvider*>(schema);
auto ttlProp = ttlProps(ns);
if (!ttlProp.first) {
return Status::Error();
}
return reader->getValueByName(std::move(ttlProp).second.second);
}
六、NebulaGraph 中是如何管理 session 的
session 是客户端和服务端 nebula-graphd 交互的凭证,session 保存在 nebula-metad 中,session 的创建和销毁都需要跟 nebula-metad 交互,后面将分析session的创建和销毁。
相关知识点
- client客户端通过sessionId来标识session
- nebula-graphd重启不会影响已创建的session
- nebula-metad将session保存到rocksdb中
总体步骤
- nebula-client创建连接并调用用户验证创建session
- nebula-graphd接收到client的请求验证,并调用nebula-metad进行session创建
- nebula-metad生成sessionId并通过raft协议进行保存
session的创建
nebula-client-3.6 客户端代码:创建nebula连接并认证
// nebula客户端创建连接都需要先进行用户名和密码验证,nebula服务端验证成功后将返回sessionId,客户端使用sessionId作为凭证操作nebula
private NebulaSession createSessionObject(SessionState state){
SyncConnection connection = new SyncConnection();
//建立连接
connection.open(getAddress(), sessionPoolConfig.getTimeout());
//用户名密码认证
AuthResult authResult = connection.authenticate(sessionPoolConfig.getUsername(),
sessionPoolConfig.getPassword());
//将认证结果sessionId放到NebulaSession中
NebulaSession nebulaSession = new NebulaSession(connection, authResult.getSessionId(),
authResult.getTimezoneOffset(), state);
return nebulaSession;
}
// NebulaSession执行语句
public ResultSet execute(String stmt) throws IOErrorException {
//传入sessionId进行RPC请求
return new ResultSet(connection.execute(sessionID, stmt), timezoneOffset);
}
// 调用thrift接口,传入sessionId和语句进行nebula服务端请求
public ExecutionResponse executeWithParameter(long sessionId, byte[] stmt, Map parameterMap) throws TException
{
ContextStack ctx = getContextStack("GraphService.executeWithParameter", null);
this.setContextStack(ctx);
send_executeWithParameter(sessionId, stmt, parameterMap);
return recv_executeWithParameter();
}
nebula-graphd接收client的授权请求,并返回sessionId
GraphService::future_authenticate(const std::string& username,const std::string& password){
// 请求nebula-metad校验用户名密码
auto authResult = auth(username, password);
if (!authResult.ok()) {
//用户密码错误
ctx->resp().errorCode = ErrorCode::E_BAD_USERNAME_PASSWORD;
return future;
}
if (sessionManager_->isOutOfConnections()) {
//session创建过多
ctx->resp().errorCode = ErrorCode::E_TOO_MANY_CONNECTIONS;
return future;
}
//请求nebula-metad创建会话
sessionManager_->createSession(username, clientIp, getThreadManager())
}
GraphSessionManager::createSession(){
std::string key = userName + clientIp;
//nebula-graphd.conf中配置的max_sessions_per_ip_per_user参数值
auto maxSessions = FLAGS_max_sessions_per_ip_per_user;
//获取相同用户IP创建session数量
auto uiscFindPtr = sessionCnt(key);
if (uiscFindPtr->get() > maxSessions - 1) {
return Status::Error("Create Session failed: Too many sessions created");
}
//通过metaClient创建会话
return metaClient_->createSession(userName, myAddr_, clientIp)
}
MetaClient::createSession(){
//像nebula-metad发起RPC请求创建session
client->future_createSession(request)
}
nebula-metad接收到nebula-graphd请求进行session创建
//接收到session创建请求并进行处理
MetaServiceHandler::future_createSession(
const cpp2::CreateSessionReq& req) {
auto* processor = CreateSessionProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
}
CreateSessionProcessor::process(){
cpp2::Session session;
// 创建sessionId,以当前时间微妙作为值
session.session_id_ref() = time::WallClock::fastNowInMicroSec();
session.create_time_ref() = session.get_session_id();
session.update_time_ref() = session.get_create_time();
session.user_name_ref() = user;
session.graph_addr_ref() = req.get_graph_addr();
session.client_ip_ref() = req.get_client_ip();
std::vector<kvstore::KV> data;
//构建kv引擎键值对,key为sessionId,value为会话信息
data.emplace_back(MetaKeyUtils::sessionKey(session.get_session_id()),
MetaKeyUtils::sessionVal(session));
resp_.session_ref() = session;
//调用kv引擎的put方法
ret = doSyncPut(std::move(data));
}
static const PartitionID kDefaultPartId = 0;
static const GraphSpaceID kDefaultSpaceId = 0;
//调用NebulaStore的put方法
BaseProcessor<RESP>::doSyncPut(){
kvstore_->asyncMultiPut(kDefaultSpaceId,kDefaultPartId)
}
NebulaStore::asyncMultiPut(){
//获取session会话分片信息,spaceId和 partId都为0
auto ret = part(spaceId, partId);
auto part = nebula::value(ret);
//分片数据写入
part->asyncMultiPut(std::move(keyValues), std::move(cb));
}
Part::asyncMultiPut(){
//简直编码
std::string log = encodeMultiValues(OP_MULTI_PUT, keyValues);
//通过raft协议进行数据写入
appendLogAsync(source, LogType::NORMAL, std::move(log))
}
session的销毁
session的销毁通过两种途径:
- client客户端主动signout
- 空闲时间超过session_idle_time自动被清除。
方式一、client客户端主动quit
---------nebula-client------------------
//NebulaSession中release方法退出session
public void release() {
connection.signout(sessionID);
connection.close();
}
//通过thrift协议调用GraphService.signout方法,这里向nebula-graphd发起退出请求
public void signout(long sessionId)
{
ContextStack ctx = getContextStack("GraphService.signout", null);
this.setContextStack(ctx);
send_signout(sessionId);
}
---------nebula-graphd------------------
//接收到nebula-client的请求进行会话清除
GraphService::signout(int64_t sessionId) {
VLOG(2) << "Sign out session " << sessionId;
sessionManager_->removeSession(sessionId);
}
//调用metad服务进行session的移除
GraphSessionManager::removeMultiSessions(){
metaClient_->removeSessions(ids)
}
方式二、nebula-graphd定时任务清除过期session
GraphSessionManager::threadFunc() {
//回收过期会话
reclaimExpiredSessions();
//启动下次回收任务
scavenger_->addDelayTask(
FLAGS_session_reclaim_interval_secs * 1000, &GraphSessionManager::threadFunc, this);
}
GraphSessionManager::reclaimExpiredSessions() {
//过期session列表
std::vector<SessionID> expiredSessions;
// activeSessions_为当前有效session列表,在创建session的时候进行填充
for (const auto& iter : activeSessions_) {
int32_t idleSecs = iter.second->idleSeconds();
if (idleSecs < FLAGS_session_idle_timeout_secs) {
continue;
}
//空闲时间大于session_idle_timeout_secs就会加入到过期列表中
expiredSessions.emplace_back(iter.first);
}
}
//请求nebula-metad进行会话清除,根据之前创建session可知其实就是通过raft协议从rocksdb中删除sessionId为key的值
metaClient_->removeSessions(std::move(expiredSessions))
七、NebulaGraph 错误日志中未知错误码如何定位原因
nebula错误输出有时只输出错误码,未输出错误原因,此时可通过源码src/interface/common.thrift文件进行搜索即可,我这里贴一些常见错误码:
E_DISCONNECTED = -1, // Lost connection
E_FAIL_TO_CONNECT = -2, // Unable to establish connection
E_RPC_FAILURE = -3, // RPC failure
E_LEADER_CHANGED = -4, // Raft leader has been changed
// 1xxx for graphd
E_BAD_USERNAME_PASSWORD = -1001, // Authentication failed
E_SESSION_INVALID = -1002, // Invalid session
E_SESSION_TIMEOUT = -1003, // Session timeout
E_SYNTAX_ERROR = -1004, // Syntax error
E_EXECUTION_ERROR = -1005, // Execution error
E_STATEMENT_EMPTY = -1006, // Statement is empty
E_SEMANTIC_ERROR = -1009, // Semantic error
E_TOO_MANY_CONNECTIONS = -1010, // Maximum number of connections exceeded
E_PARTIAL_SUCCEEDED = -1011, // Access to storage failed (only some requests succeeded)
// 2xxx for metad
E_NO_HOSTS = -2001, // Host does not exist
E_EXISTED = -2002, // Host already exists
E_INVALID_HOST = -2003, // Invalid host
E_UNSUPPORTED = -2004, // The current command, statement, or function is not supported
E_NOT_DROP = -2005, // Not allowed to drop
E_BALANCER_RUNNING = -2006, // The balancer is running
E_CONFIG_IMMUTABLE = -2007, // Configuration items cannot be changed
E_CONFLICT = -2008, // Parameters conflict with meta data
E_SESSION_NOT_FOUND = -2069, // Session does not exist
// 3xxx for storaged
E_CONSENSUS_ERROR = -3001, // Consensus cannot be reached during an election
E_KEY_HAS_EXISTS = -3002, // Key already exists
E_DATA_TYPE_MISMATCH = -3003, // Data type mismatch
E_INVALID_FIELD_VALUE = -3004, // Invalid field value
E_INVALID_OPERATION = -3005, // Invalid operation
E_NOT_NULLABLE = -3006, // Current value is not allowed to be empty
E_FIELD_UNSET = -3007, // Field value must be set if the field value is NOT NULL or has no default value
E_OUT_OF_RANGE = -3008, // The value is out of the range of the current type
E_DATA_CONFLICT_ERROR = -3010, // Data conflict, for index write without toss.
通过深入分析 NebulaGraph 的源码,我们可以看到它在高效存储、查询、数据编码和 TTL 管理方面的设计细节。希望本文能为开发者们提供一些实用的参考,帮助大家更好地理解 NebulaGraph 的工作原理,也希望通过共同学习,推动技术进步和创新。