Nebula3.6内核源码探究

作者简介:

阿旺(奇富科技)
本人是一名 java 老菜鸟,工作内容并非 C++ 方向,平时也很少接触 C++ 语言,不懂的代码都是使用 GPT 作代码辅助解释,故难免存在准确性问题,但大体思路应该没太大问题,如有错误问题还望评论区指出回复,相互学习共同成长。以下内容源自我在使用 NebulaGraph 过程中遇到的一些问题和思考。我通过查阅源码、动手实践,解决了一些实际生产中的疑难问题。也希望借此与大家分享经验,探讨更多技术细节。同时,也想鼓励每一位技术人——不要给自己轻易设限,勇于突破边界,探索新的领域。

nebula源码分支: release-3.6

内容大纲

  1. NebulaGraph 目录结构说明
  2. NebulaGraph 启动会运行多少个rocksdb实例
  3. NebulaGraph 是如何查询一个点的数据
  4. NebulaGraph 中 value 是如何编码的
  5. NebulaGraph 中 TTL 是如何实现的
  6. NebulaGraph 中如何管理 session
  7. NebulaGraph 错误日志中未知错误码如何定位原因

一、NebulaGraph 源码 src 目录结构

src
|-clients : 内部服务客户端,主要用于 graphd、metad、storaged 内部服务 RPC 调用
|-codec :nebula 数据的 key-value 编码和解码
|-daemons :启动入口,graphd、metad、storaged 启动
|-graph :graphd 相关业务实现,graphd 主要负责处理客户端请求,属于 nebula 的计算层
|-interface : thrift 接口定义,nebula 采用 thrift 做异构语言 rpc 调用,nebula 客户端与 nebula 交互接口皆定义于此
|-kvstore :键值存储引擎,storaged 和 metad 都会用到,比如:rocksdb 和 hbase,nebula 默认采用 rocksdb 作为内嵌存储
|-meta :元数据相关,比如:session 会话、用户信息、分片信息等等,属于nebula元数据管理层
|-parser : 语法解析
|-storage : nebula 存储相关,提供一些简单的kv存储和查询的接口服务给graphd调用

温馨提示: nebula 通过 CMake 进行编译打包,如果想了解打包过程和依赖信息见每个目录的 CMakeLists.txt 文件

二、单个 nebula-storaged 启动时会运行多少个 rocksdb 实例

结论: 数据存储磁盘目录数 *space总数
数据存储磁盘目录: etc/nebula-storaged.conf 中 data_path
比如:有2个 space,3个数据目录,则 rocksdb 实例数=2*3=6

相关知识点:
space 和 part 分区数越多 nebula-storaged 启动越慢

大体过程:

1.StorageDaemon::main() 程序启动入口
2.StorageServer::start() 启动存储引擎
3.StorageServer::getStoreInstance() 初始化存储引擎
4.NebulaStore::init() nebula存储引擎初始化
5.NebulaStore::loadPartFromDataPath() 通过获取data_path存储目录路径,为每个目录下space启动一个rocksdb实例,并且建立了space和kvEngine的映射关系,以及partId和part的映射关系,后面查询或者更新实则是定位数据partId,然后通过partId获取到part进行操作,part中保留了kvEngine的引用

最终构建spaces_,里面包含了SpaceId和spacePart分片的映射关系,后面所有的读写都需要此映射关系操作对应的rocksdb,这里的映射关系类似于java中SpringBean
std::unordered_map<GraphSpaceID, std::shared_ptr<SpacePartInfo>> spaces_;
struct SpacePartInfo {
  //Part中保留了engin的引用,当前版本engine其实就是rocksdb
  std::unordered_map<PartitionID, std::shared_ptr<Part>> parts_;
  std::vector<std::unique_ptr<KVEngine>> engines_;
}

关键代码

// 创建storageServer并启动
StorageDaemon::main(){
    auto storageServer = std::make_unique<nebula::storage::StorageServer>(
      localhost, metaAddrsRet.value(), paths, FLAGS_wal_path, FLAGS_listener_path);
    storageServer->start()  
}

// 初始化kv存储引擎
StorageServer::start(){
  LOG(INFO) << "Init kvstore";
  kvstore_ = getStoreInstance();
}

// 实例化nebula存储引擎并初始化
StorageServer::getStoreInstance(){
    auto nbStore = std::make_unique<kvstore::NebulaStore>(
        std::move(options), ioThreadPool_, localHost_, workers_);
    nbStore->init()        
}

// 从data_path中载入part分区数据
NebulaStore::init(){
    loadPartFromDataPath()
}

//遍历data_path目录为每个space创建一个rocksdb引擎
NebulaStore::loadPartFromDataPath(){
    std::vector<folly::Future<std::pair<GraphSpaceID, std::unique_ptr<KVEngine>>>> futures;
  std::vector<std::string> enginesPath;
  for (auto& path : options_.dataPaths_) {
    //这里其实就是data_path,比如:/usr/local/nebula/data
    auto rootPath = folly::stringPrintf("%s/nebula", path.c_str());
    //获取每个data_pah子目录,其实就是space
    // 比如 /usr/local/nebula/data/1、/usr/local/nebula/data/2
    auto dirs = fs::FileUtils::listAllDirsInDir(rootPath.c_str());
    //遍历space目录,为每个space目录创建一个rocksdb
    for (auto& dir : dirs) {
      //目录名称就是spaceId,比如/usr/local/nebula/data/1中的1就是spaceId
      GraphSpaceID spaceId = folly::to<GraphSpaceID>(dir);
      // 目录为0的跳过
      if (spaceId == 0) {
        // skip the system space, only handle data space here.
        continue;
      }
      enginesPath.emplace_back(rootPath + "/" + dir);
      //异步创建rocksdb存储引擎
      futures.emplace_back(newEngineAsync(spaceId, path, options_.walPath_));
    }
    // space和kvEngine列表的映射关系
    std::unordered_map<GraphSpaceID, std::vector<std::unique_ptr<KVEngine>>> spaceEngines;
    // 等待并获取前面的异步创建引擎列表
    auto tries = folly::collectAll(futures).get();
    for (auto& t : tries) {
        //p就是个二元组,第一个元素为space,第二个元素为kvEngine
        auto&& p = t.value();
        auto spaceIt = spaceEngines.find(p.first);
        if (spaceIt == spaceEngines.end()) {
        spaceIt = spaceEngines.emplace(p.first, std::vector<std::unique_ptr<KVEngine>>()).first;
      }
        spaceIt->second.emplace_back(std::move(p.second));
    }
    // 遍历所有space存储引擎
    for (auto& spaceEngine : spaceEngines) {
        GraphSpaceID spaceId = spaceEngine.first;
        for (auto& engine : spaceEngine.second) {
          // part分片与part副本节点集合,后面raft需要用到
          std::map<PartitionID, Peers> partRaftPeers;
          partRaftPeers.emplace(partId, raftPeers);
          //建立起partId和part映射关系,后面操作某partId的数据时就是通过part进行的,part中其实保留了kvEngine的引用
          for (auto& it : partRaftPeers) {
                auto part = newPart(spaceId, partId, enginePtr, isLearner, addrs);
                auto ret = iter->second->parts_.emplace(partId, part);
          }
        }
    }
  }
}
// 创建rocksdb引擎
NebulaStore::newEngineAsync() {
      engine = std::make_unique<RocksEngine>(
          spaceId, vIdLen, dataPath, walPath, options_.mergeOp_, cfFactory);
}

三、NebulaGraph 是如何查询一个点的数据

graphd负责接受client客户端请求,并解析 nGQL,生成查询计划,再经过优化器,最终执行executor,为了突出重点省去从接受请求开始到调用executor,而直接定位executor,这边以GetVerticesExecutor.cpp为例来探究如何查找vertex点的数据,主要探究vertexId如何定位到数据

相关知识点

  1. part数和storaged查询并发数成正比
  2. vid和partId映射计算公式: partId = hash(vid) % numParts + 1

大体过程

//nebula-graphd
1.GetVerticesExecutor::getVertices() 获取多个点属性列表
2.StorageClient::getProps() 计算vertexId对应的part,然后请求对应part分片数据

//nebula-storaged
3.GraphStorageServiceHandler::future_getProps() storagd收到graphd来的请求进行处理
4.GetPropProcessor::doProcess()  构建tag属性查询计划并调用
5.TagNode::doExecute() 通过vid和tagId构建rocksdb中key,访问rocksdb获取value属性
6.GetTagPropNode::doExecute()  收集vid对应多个tag value属性列表,并过滤出查询返回结果保留的prop属性

关键源码
nebula-graphd部分

// 通过上下文获取到storageClient然后调用获取属性的方法
GetVerticesExecutor::getVertices() {
    StorageClient *storageClient = qctx()->getStorageClient();
    storageClient->getProps(param,
                 std::move(vertices),
                 gv->props(),
                 nullptr,
                 gv->exprs(),
                 gv->dedup(),
                 gv->orderBy(),
                 gv->getValidLimit(),
                 gv->filter())
}

// 通过vertexId获取到part所在节点信息
// 请求对应节点的vertexId信息
StorageRpcRespFuture<cpp2::GetPropResponse> StorageClient::getProps(){
    // 建立vertexId和part节点映射关系
    auto status = clusterIdsToHosts(param.space, input.rows, std::move(cbStatus).value());
  // 为不同的节点构建对应的request
  cpp2::GetPropRequest> requests;
  for (auto& c : clusters) {
    auto& host = c.first;
    auto& req = requests[host];
    req.space_id_ref() = param.space;
    req.parts_ref() = std::move(c.second);
  }
  //调用rpc请求storaged获取点的属性信息
  return collectResponse(
      param.evb, std::move(requests), [](ThriftClientType* client, const cpp2::GetPropRequest& r) {
        return client->future_getProps(r);
      });
}

//通过(hash(vid) % numParts + 1)计算出对应的part
// 这种方式也预示着space设定的分区数量一旦创建不可更改,否则数据就会映射不上
StorageClientBase::clusterIdsToHosts(){
    //获取space空间有多少个part
  auto status = metaClient_->partsNum(spaceId);
  std::unordered_map<PartitionID, HostAddr> leaders;
    //建立起partId和leader节点的映射关系
  for (int32_t partId = 1; partId <= numParts; ++partId) {
    auto leader = getLeader(spaceId, partId);
    leaders[partId] = std::move(leader).value();
  }
  //
   for (auto& id : ids) {
        // 计算vid对应的partId
        PartitionID part = vid % numParts + 1;
        const auto& leader = leaders[part];
        clusters[leader][part].emplace_back(std::move(id));
  }
  return clusters;
}

nebula-storaged部分

folly::Future<cpp2::GetPropResponse> GraphStorageServiceHandler::future_getProps(
    const cpp2::GetPropRequest& req) {
  auto* processor = GetPropProcessor::instance(env_, &kGetPropCounters, readerPool_.get());
  RETURN_FUTURE(processor);
}

GetPropProcessor::doProcess(){
    if (!FLAGS_query_concurrently) {
     //为了方便理解,这里直接选择单线程
      runInSingleThread(req);
    } else {
      runInMultipleThread(req);
    }
}

GetPropProcessor::runInSingleThread(){
    // 构建tag查询计划,这个计划相当于构建一个DAG
    // 里面有完整的依赖信息
    auto plan = buildTagPlan(&contexts_.front(), &resultDataSet_);
    // 遍历待查询的part分区
    for (const auto& partEntry : req.get_parts()) {
      auto partId = partEntry.first;
      for (const auto& row : partEntry.second) {
        auto vId = row.values[0].getStr();
        // 执行查询计划
        plan.go(partId, vId);
      }
    }    
}

// 构建执行计划DAG
GetPropProcessor::buildTagPlan(RuntimeContext* context,
                                                     nebula::DataSet* result) {
  StoragePlan<VertexID> plan;
  std::vector<TagNode*> tags;
  for (const auto& tc : tagContext_.propContexts_) {
    // TagNode获取vid指定tag的属性列表,TagNode will return a DataSet of specified props of tagId
    // 这里面就有从rocksdb中读取tag的value属性
    // 查询语句指定多少个tag,这里就会有多少个TagNode
    // 比如:fetch prop on tag_1,tag_2,tag_3 idxxx,那这里就有3个TagNode
    auto tag = std::make_unique<TagNode>(context, &tagContext_, tc.first, &tc.second);
    tags.emplace_back(tag.get());
    plan.addNode(std::move(tag));
  }
  // GetTagPropNode用于收集vid对应多个tag的属性
  auto output = std::make_unique<GetTagPropNode>(
      context, tags, result, filter_ == nullptr ? nullptr : filter_->clone(), limit_, &tagContext_);
  for (auto* tag : tags) {
    output->addDependency(tag);
  }
  plan.addNode(std::move(output));
  return plan;
}

TagNode::doExecute(){
    // 根据vid和tagId构建rocksdb中key
    key_ = NebulaKeyUtils::tagKey(context_->vIdLen(), partId, vId, tagId_);
    // 从响应分片的rocksdb中获取value,此value为经过RowWriter编码的value
    ret = context_->env()->kvstore_->get(context_->spaceId(), partId, key_, &value_);
    // RowReaderWrapper设置schema和对应的value,后面用于GetTagPropNode收集prop做准备
    reader_.reset(*schemas_, value_);
}


// 收集vid对应所有tag的属性值列表
GetTagPropNode::doExecute(){
    // 这个集合用于保存查询到的vid对应不同tag的属性集合
    List row;
    // vertexId is the first column
    if (context_->isIntId()) {
      row.emplace_back(*reinterpret_cast<const int64_t*>(vId.data()));
    } else {
      row.emplace_back(vId);
    }
    // 遍历本次要查询的所有tag
    for (auto* tagNode : tagNodes_) {
      ret = tagNode->collectTagPropsIfValid(
          // 收集tagNode中属性到row中
          [&row, vIdLen, isIntId, tagNode, this](){
            auto status = QueryUtils::collectVertexProps(
                key, vIdLen, isIntId, reader, props, row, expCtx_.get(), tagNode->getTagName());
          });
    }
}

QueryUtils::collectVertexProps(){
  // 遍历tag中的属性列表,挨个的取出来放入到list中,这个list就是GetTagPropNode::doExecute中的row
  for (const auto& prop : *props) {
      auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop);
      list.emplace_back(value.value());
    }
}

// 读取属性值
QueryUtils::readVertexProp(){
  switch (prop.propInKeyType_) {
      // prop in value
      case PropContext::PropInKeyType::NONE: {
        // ddl中的属性,这里的readValue就是RowReader根据schame获取字段的offset,然后获取对应的field值
        return readValue(reader, prop.name_, prop.field_);
      }
      case PropContext::PropInKeyType::VID: {
        // 属性类型为vid
        auto vId = NebulaKeyUtils::getVertexId(vIdLen, key);
        if (isIntId) {
          return *reinterpret_cast<const int64_t*>(vId.data());
        } else {
          return vId.subpiece(0, vId.find_first_of('\0')).toString();
        }
      }
      case PropContext::PropInKeyType::TAG: {
        // 属性类型为tagId
        auto tag = NebulaKeyUtils::getTagId(vIdLen, key);
        return tag;
      }
    }
}

四、NebulaGraph 中 value 是如何编码的

从2.x开始起nebula使用新的编码方式version 2,由5个部分组成:

    <header> <schema version> <NULL flags> <all properties> <string content>
       |             |             |              |
     1 byte     0 - 7 bytes     0+ bytes       N bytes

header: 占用1个字节,用来标识schema version字节长度,目前取值0~7
schema version: 0~7个字节,标识schema版本
NULL flags: NULL字段标识,跟mysql设计类似,为NULL的字段就在对应的bit位设置为1
all properties: 固定长度的属性值,根据schema进行排列,由于固定长度所以想获取某个字段的值可以直接计算offset+length就能拿到, 除 STRING 类型属性外,所有属性都就地存储。STRING 属性将字符串内容的偏移量存储在前 4 个字节中,将字符串的长度存储在后 4 个字节中。字符串内容将附加到编码字符串的末尾,相当于多了一层指针
string content: 存储非固定长度的string字符串,获取string属性内容得多次计算,先计算string字符串指针位置,然后在根据指针报错的offset+length得到完整的string内容

各类型占用内存空间大小:

        BOOL            (1 byte)
        INT8            (1 byte)
        INT16           (2 bytes)
        INT32           (4 bytes)
        INT64           (8 bytes)
        FLOAT           (4 bytes)
        DOUBLE          (8 bytes)
        STRING          (8 bytes) *
        FIXED_STRING    (Length defined in the schema)
        TIMESTAMP       (8 bytes)
        DATE            (4 bytes)
        DATETIME        (15 bytes)
        GEOGRAPHY       (8 bytes) *

关键代码:
RowWriterV2.cpp和RowWriterV2.h

五、NebulaGraph 中 TTL 是如何实现的

nebula中TTL是通过rocksdb中KVFilter实现的,KVFilter是rocksdb对外保留的扩展接口,查询和compact时用来做数据的过滤,数据每次搜索时都会经过此Filter,nebula实现此接口用来过滤TTL过期的数据,以及rocksdb做compact时筛选出过期的数据让其删除。

相关知识点:
TTL到期的数据可能并未删除,仅仅只是查询不出来,如果此时加大TTL或者删除TTL,原来查询不到的数据又会查询出来

// KVFilter在启动的rocksdb的时候就需要设置进去,所以这里直接看rocksdb启动的代码即可
StorageServer::getStoreInstance(){
	kvstore::KVOptions options;
	// 这个StorageCompactionFilter就是nebula给予VKFilter实现的
	options.compaction_filter_factory = std::make_unique<StorageCompactionFilterFactoryBuilder>(schemaMan_.get(), indexMan_.get());
	status = rocksdb::DB::Open(options, path, &db);
}

class StorageCompactionFilter final : public kvstore::KVFilter {

  //这里是filter的入口
  bool filter(int level,
              GraphSpaceID spaceId,
              const folly::StringPiece& key,
              const folly::StringPiece& val) const override {
    if (level < FLAGS_min_level_for_custom_filter) {
      // for upper level such as L0/L1, we don't go through the custom
      // validation to achieve better performance
      return false;
    }
    if (NebulaKeyUtils::isTag(vIdLen_, key)) {
      return !tagValid(spaceId, key, val);
    } else if (NebulaKeyUtils::isEdge(vIdLen_, key)) {
      return !edgeValid(spaceId, key, val);
    } else if (IndexKeyUtils::isIndexKey(key)) {
      return !indexValid(spaceId, key, val);
    } else if (!FLAGS_use_vertex_key && NebulaKeyUtils::isVertex(key)) {
      return true;
    } else if (NebulaKeyUtils::isLock(vIdLen_, key)) {
      return !lockValid(spaceId, key);
    } else {
      // skip uuid/system/operation
      VLOG(3) << "Skip the system key inside, key " << key;
    }
    return false;
  }
 
 //判断tag是否有效,这里其实就是比对ttl字段的值对比当前时间是否超过ttl
 private:
  bool tagValid(GraphSpaceID spaceId,
                const folly::StringPiece& key,
                const folly::StringPiece& val) const {
    ......
    if (ttlExpired(schema.get(), reader.get())) {
      VLOG(3) << "Ttl expired";
      return false;
    }
    return true;
  }
 
  //判断边是否过期
  bool edgeValid(GraphSpaceID spaceId,
                 const folly::StringPiece& key,
                 const folly::StringPiece& val) const {
    ....
    if (ttlExpired(schema.get(), reader.get())) {
      VLOG(3) << "Ttl expired";
      return false;
    }
    return true;
  }

 
  // TODO(panda) Optimize the method in the future
  bool ttlExpired(const meta::NebulaSchemaProvider* schema,
                  nebula::RowReaderWrapper* reader) const {
    if (schema == nullptr) {
      return true;
    }
    auto ttl = CommonUtils::ttlProps(schema);
    // Only support the specified ttl_col mode
    // Not specifying or non-positive ttl_duration behaves like ttl_duration =
    // infinity
    if (!ttl.first) {
      return false;
    }
    //这里比对数据的ttl字段的值和当前时间,如果过期就返回true
    return CommonUtils::checkDataExpiredForTTL(schema, reader, ttl.second.second, ttl.second.first);
  }

  bool ttlExpired(const meta::NebulaSchemaProvider* schema, const Value& v) const {
    if (schema == nullptr) {
      return true;
    }
    auto ttl = CommonUtils::ttlProps(schema);
    if (!ttl.first) {
      return false;
    }
    return CommonUtils::checkDataExpiredForTTL(schema, v, ttl.second.second, ttl.second.first);
  }
};

//判断ttl是否过期
bool CommonUtils::checkDataExpiredForTTL(const meta::NebulaSchemaProvider* schema,
                                         const Value& v,
                                         const std::string& ttlCol,
                                         int64_t ttlDuration) {
  const auto& ftype = schema->getFieldType(ttlCol);
  if (ftype != nebula::cpp2::PropertyType::TIMESTAMP &&
      ftype != nebula::cpp2::PropertyType::INT64) {
    return false;
  }

  int64_t now;
  // The unit of ttl expiration unit is controlled by user, we just use a gflag here.
  if (!FLAGS_ttl_use_ms) {
    now = std::time(nullptr);
  } else {
    auto t = std::chrono::system_clock::now();
    now = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch()).count();
  }

  // if the value is not INT type (sush as NULL), it will never expire.
  // TODO (sky) : DateTime
  if (v.isInt() && (now > (v.getInt() + ttlDuration))) {
    VLOG(2) << "ttl expired";
    return true;
  }
  return false;
}

//从ddl的schema里面读取ttl_duration和ttl_col字段
std::pair<bool, std::pair<int64_t, std::string>> CommonUtils::ttlProps(
    const meta::NebulaSchemaProvider* schema) {
  DCHECK(schema != nullptr);
  const auto* ns = dynamic_cast<const meta::NebulaSchemaProvider*>(schema);
  const auto sp = ns->getProp();
  int64_t duration = 0;
  if (sp.get_ttl_duration()) {
    duration = *sp.get_ttl_duration();
  }
  std::string col;
  if (sp.get_ttl_col()) {
    col = *sp.get_ttl_col();
  }
  return std::make_pair(!(duration <= 0 || col.empty()), std::make_pair(duration, col));
}

//获取ttl_col字段的值
StatusOr<Value> CommonUtils::ttlValue(const meta::NebulaSchemaProvider* schema,
                                      RowReaderWrapper* reader) {
  DCHECK(schema != nullptr);
  const auto* ns = dynamic_cast<const meta::NebulaSchemaProvider*>(schema);
  auto ttlProp = ttlProps(ns);
  if (!ttlProp.first) {
    return Status::Error();
  }
  return reader->getValueByName(std::move(ttlProp).second.second);
}


六、NebulaGraph 中是如何管理 session 的

session 是客户端和服务端 nebula-graphd 交互的凭证,session 保存在 nebula-metad 中,session 的创建和销毁都需要跟 nebula-metad 交互,后面将分析session的创建和销毁。
相关知识点

  1. client客户端通过sessionId来标识session
  2. nebula-graphd重启不会影响已创建的session
  3. nebula-metad将session保存到rocksdb中

总体步骤

  1. nebula-client创建连接并调用用户验证创建session
  2. nebula-graphd接收到client的请求验证,并调用nebula-metad进行session创建
  3. nebula-metad生成sessionId并通过raft协议进行保存

session的创建
nebula-client-3.6 客户端代码:创建nebula连接并认证

// nebula客户端创建连接都需要先进行用户名和密码验证,nebula服务端验证成功后将返回sessionId,客户端使用sessionId作为凭证操作nebula
private NebulaSession createSessionObject(SessionState state){
   SyncConnection connection = new SyncConnection();
   //建立连接
   connection.open(getAddress(), sessionPoolConfig.getTimeout());
   //用户名密码认证
   AuthResult authResult = connection.authenticate(sessionPoolConfig.getUsername(),
                    sessionPoolConfig.getPassword());
   //将认证结果sessionId放到NebulaSession中
   NebulaSession nebulaSession = new NebulaSession(connection, authResult.getSessionId(),
                authResult.getTimezoneOffset(), state);
   return nebulaSession;
}

// NebulaSession执行语句
public ResultSet execute(String stmt) throws IOErrorException {
    //传入sessionId进行RPC请求
    return new ResultSet(connection.execute(sessionID, stmt), timezoneOffset);
}

// 调用thrift接口,传入sessionId和语句进行nebula服务端请求
public ExecutionResponse executeWithParameter(long sessionId, byte[] stmt, Map parameterMap) throws TException
{
  ContextStack ctx = getContextStack("GraphService.executeWithParameter", null);
  this.setContextStack(ctx);
  send_executeWithParameter(sessionId, stmt, parameterMap);
  return recv_executeWithParameter();
}

nebula-graphd接收client的授权请求,并返回sessionId

GraphService::future_authenticate(const std::string& username,const std::string& password){
  // 请求nebula-metad校验用户名密码
  auto authResult = auth(username, password);
  if (!authResult.ok()) {
    //用户密码错误
    ctx->resp().errorCode = ErrorCode::E_BAD_USERNAME_PASSWORD;
    return future;
  }
  if (sessionManager_->isOutOfConnections()) {
    //session创建过多
    ctx->resp().errorCode = ErrorCode::E_TOO_MANY_CONNECTIONS;
    return future;
  }
  //请求nebula-metad创建会话
  sessionManager_->createSession(username, clientIp, getThreadManager())
}

GraphSessionManager::createSession(){
  std::string key = userName + clientIp;
  //nebula-graphd.conf中配置的max_sessions_per_ip_per_user参数值
  auto maxSessions = FLAGS_max_sessions_per_ip_per_user;
  //获取相同用户IP创建session数量
  auto uiscFindPtr = sessionCnt(key);
  if (uiscFindPtr->get() > maxSessions - 1) {
    return Status::Error("Create Session failed: Too many sessions created");
  }
  //通过metaClient创建会话
  return metaClient_->createSession(userName, myAddr_, clientIp)
}

MetaClient::createSession(){
  //像nebula-metad发起RPC请求创建session
  client->future_createSession(request)
}

nebula-metad接收到nebula-graphd请求进行session创建

//接收到session创建请求并进行处理
MetaServiceHandler::future_createSession(
    const cpp2::CreateSessionReq& req) {
  auto* processor = CreateSessionProcessor::instance(kvstore_);
  RETURN_FUTURE(processor);
}

CreateSessionProcessor::process(){
  cpp2::Session session;
  // 创建sessionId,以当前时间微妙作为值
  session.session_id_ref() = time::WallClock::fastNowInMicroSec();
  session.create_time_ref() = session.get_session_id();
  session.update_time_ref() = session.get_create_time();
  session.user_name_ref() = user;
  session.graph_addr_ref() = req.get_graph_addr();
  session.client_ip_ref() = req.get_client_ip();
  std::vector<kvstore::KV> data;
  //构建kv引擎键值对,key为sessionId,value为会话信息
  data.emplace_back(MetaKeyUtils::sessionKey(session.get_session_id()),
                    MetaKeyUtils::sessionVal(session));
  resp_.session_ref() = session;
  //调用kv引擎的put方法
  ret = doSyncPut(std::move(data));
}

static const PartitionID kDefaultPartId = 0;
static const GraphSpaceID kDefaultSpaceId = 0;

//调用NebulaStore的put方法
BaseProcessor<RESP>::doSyncPut(){
  kvstore_->asyncMultiPut(kDefaultSpaceId,kDefaultPartId)
}

NebulaStore::asyncMultiPut(){
  //获取session会话分片信息,spaceId和 partId都为0
  auto ret = part(spaceId, partId);
  auto part = nebula::value(ret);
  //分片数据写入
  part->asyncMultiPut(std::move(keyValues), std::move(cb));
}

Part::asyncMultiPut(){
  //简直编码
  std::string log = encodeMultiValues(OP_MULTI_PUT, keyValues);
  //通过raft协议进行数据写入
  appendLogAsync(source, LogType::NORMAL, std::move(log))
}

session的销毁
session的销毁通过两种途径:

  1. client客户端主动signout
  2. 空闲时间超过session_idle_time自动被清除。

方式一、client客户端主动quit

---------nebula-client------------------
//NebulaSession中release方法退出session
public void release() {
  connection.signout(sessionID);
  connection.close();
}
//通过thrift协议调用GraphService.signout方法,这里向nebula-graphd发起退出请求
public void signout(long sessionId)
{
  ContextStack ctx = getContextStack("GraphService.signout", null);
  this.setContextStack(ctx);
  send_signout(sessionId);
}

---------nebula-graphd------------------
//接收到nebula-client的请求进行会话清除
GraphService::signout(int64_t sessionId) {
  VLOG(2) << "Sign out session " << sessionId;
  sessionManager_->removeSession(sessionId);
}
//调用metad服务进行session的移除
 GraphSessionManager::removeMultiSessions(){
  metaClient_->removeSessions(ids)
 }

方式二、nebula-graphd定时任务清除过期session

GraphSessionManager::threadFunc() {
  //回收过期会话
  reclaimExpiredSessions();
  //启动下次回收任务
  scavenger_->addDelayTask(
      FLAGS_session_reclaim_interval_secs * 1000, &GraphSessionManager::threadFunc, this);
}

GraphSessionManager::reclaimExpiredSessions() {
  //过期session列表
  std::vector<SessionID> expiredSessions;
  // activeSessions_为当前有效session列表,在创建session的时候进行填充
  for (const auto& iter : activeSessions_) {
    int32_t idleSecs = iter.second->idleSeconds();
    if (idleSecs < FLAGS_session_idle_timeout_secs) {
      continue;
    }
    //空闲时间大于session_idle_timeout_secs就会加入到过期列表中
    expiredSessions.emplace_back(iter.first);
  }
}
//请求nebula-metad进行会话清除,根据之前创建session可知其实就是通过raft协议从rocksdb中删除sessionId为key的值
metaClient_->removeSessions(std::move(expiredSessions))

七、NebulaGraph 错误日志中未知错误码如何定位原因

nebula错误输出有时只输出错误码,未输出错误原因,此时可通过源码src/interface/common.thrift文件进行搜索即可,我这里贴一些常见错误码:

E_DISCONNECTED                    = -1,     // Lost connection
E_FAIL_TO_CONNECT                 = -2,     // Unable to establish connection
E_RPC_FAILURE                     = -3,     // RPC failure
E_LEADER_CHANGED                  = -4,     // Raft leader has been changed

// 1xxx for graphd
E_BAD_USERNAME_PASSWORD           = -1001,  // Authentication failed
E_SESSION_INVALID                 = -1002,  // Invalid session
E_SESSION_TIMEOUT                 = -1003,  // Session timeout
E_SYNTAX_ERROR                    = -1004,  // Syntax error
E_EXECUTION_ERROR                 = -1005,  // Execution error
E_STATEMENT_EMPTY                 = -1006,  // Statement is empty
E_SEMANTIC_ERROR                  = -1009,  // Semantic error
E_TOO_MANY_CONNECTIONS            = -1010,  // Maximum number of connections exceeded
E_PARTIAL_SUCCEEDED               = -1011,  // Access to storage failed (only some requests succeeded)

// 2xxx for metad
E_NO_HOSTS                        = -2001,  // Host does not exist
E_EXISTED                         = -2002,  // Host already exists
E_INVALID_HOST                    = -2003,  // Invalid host
E_UNSUPPORTED                     = -2004,  // The current command, statement, or function is not supported
E_NOT_DROP                        = -2005,  // Not allowed to drop
E_BALANCER_RUNNING                = -2006,  // The balancer is running
E_CONFIG_IMMUTABLE                = -2007,  // Configuration items cannot be changed
E_CONFLICT                        = -2008,  // Parameters conflict with meta data
E_SESSION_NOT_FOUND               = -2069,  // Session does not exist

// 3xxx for storaged
E_CONSENSUS_ERROR                 = -3001,  // Consensus cannot be reached during an election
E_KEY_HAS_EXISTS                  = -3002,  // Key already exists
E_DATA_TYPE_MISMATCH              = -3003,  // Data type mismatch
E_INVALID_FIELD_VALUE             = -3004,  // Invalid field value
E_INVALID_OPERATION               = -3005,  // Invalid operation
E_NOT_NULLABLE                    = -3006,  // Current value is not allowed to be empty
E_FIELD_UNSET                     = -3007,  // Field value must be set if the field value is NOT NULL or has no default value
E_OUT_OF_RANGE                    = -3008,  // The value is out of the range of the current type
E_DATA_CONFLICT_ERROR             = -3010,  // Data conflict, for index write without toss.

通过深入分析 NebulaGraph 的源码,我们可以看到它在高效存储、查询、数据编码和 TTL 管理方面的设计细节。希望本文能为开发者们提供一些实用的参考,帮助大家更好地理解 NebulaGraph 的工作原理,也希望通过共同学习,推动技术进步和创新。

10 个赞

某个温暖的周六下午,阳光透过窗帘洒在俺的手机上,正巧收到了阿旺发来的这篇文章。 :saluting_face:

1 个赞

什么时候再出一个insert源码探究

3 个赞

阿旺~什么时候出一系列源码探究