fulltext index问题

最近看fulltext index的支持,主要发现以下问题:

  1. Listener要保证高可用,必须部署多个。如果都往同一个 Elasticsearch集群中写入,会导致一个集群写多次数据?
  2. 不支持在已有的space上新增Listener。如果Listener挂了很长时间之后导致需要WAIT_SANPSHOT,Listener也不可用了,而且完全没法恢复Listener服务。

请问2.0里面会对这两个比较严重的问题做优化吗?

  1. Listener要保证高可用,必须部署多个。如果都往同一个 Elasticsearch集群中写入,会导致一个集群写多次数据?
    多个listener的目的不是为了高可用,另外,多个listener的话也不会写多次数据。
  2. 不支持在已有的space上新增Listener。如果Listener挂了很长时间之后导致需要WAIT_SANPSHOT,Listener也不可用了,而且完全没法恢复Listener服务。
    不知道这个理解是源于哪个文档?或是看代码猜测的?listener不会wait snapshot,当listener挂掉后,再重启的话会继续提供服务的。

所以,上边的两个疑惑是不存在的。

  1. 如果只部署一个Listener的话,是部署Listener挂了,就没有leaner往Elasticsearch里面写数据了。所以我才认为需要部署多个Listener。另外每个Listener接收到数据日志之后都会apply,往Elasticsearch集群里面写数据,这样不是写了多次吗?

  2. 对于第二个问题,我对一个已经存在的space 新增一个 Listener,等于是在Raft里面新增加一个learner,必然会导致Raft Leader向learner发送snapshot。但是目前Listener::commitSnapshot里面根本没有实现,导致Listener根本不可用。这个跟Listener挂掉很长时间,WAL里面的日志太老导致要WAIT_SNAPSHOT是一样的逻辑,都是没有实现Listener::commitSnapshot。
    std::pair<int64_t, int64_t> Listener::commitSnapshot(const std::vectorstd::string& rows,
    LogID committedLogId,
    TermID committedLogTerm,
    bool finished) {
    LOG(WARNING) << “Listener snapshot has not implemented yet”;
    UNUSED(committedLogId); UNUSED(committedLogTerm); UNUSED(finished);
    int64_t count = 0;
    int64_t size = 0;
    for (const auto& row : rows) {
    count++;
    size += row.size();
    // todo(doodle): could decode and apply
    }
    return {count, size};
    }

我理解你的疑问在哪里了,
问题1,一个space部署多个listener,这多个listener会hash分布这个space包含的parts,所以每个listener会负责属于自己的一个或多个part,listener和listener之间不会存在操作同一个part的可能。这样的话,多个listener并发写es的时候,不会重复写同一条数据,因为数据也是被hash分配到不同的part上的。
问题2,listener会自己记录lastTermId和lastLogId, 所以这里没有拉snapshot,当listener重启后,会接着上次记录的lastTermId和lastLogId继续解析wal。

如果有一个 Listener 所在的服务器故障,直接起不来了。这种情况是不是没有办法处理,因为part是跟listener绑定的。

假设是服务器故障,只要ip不变,listener重启后会继续提供服务。最坏的场景是,服务器彻底坏掉,更换了一个相同IP的新服务器后,listener启动后会从头开始同步数据,这个过程较慢,但是不会影响集群的正常服务。
如果是IP也需要更换,目前做不到。listener还不支持add host功能。

我实在没有看到全量同步数据的实现啊。 Listener::commitSnapshot里面没有实现 decode and apply.

请分析lastTermId和lastLogId的处理逻辑。

我又去分析了一下代码,我觉得您可能没有理解我说的问题。

我的问题是 listener的lastLogId 已经小于Leader的 firstLogId的情况。Leader怎么将(listener.lastLogId, leader.firstLogId) 之间的数据同步给Listener?

什么是 “Leader的 firstLogId” ? 什么情况下会出现这个问题?能举个例子吗?我对你的疑问不太理解。

我的意思是 Listener实际上就是Raft的Learner,其上的数据都是Raft Part的Leader发送过来的吧。而Raft的同步也是要依赖Leader上的WAL日志的。Leader只能增量同步WAL的(FirstLogId, LastLogId)之间的数据,如果Leader WAL的FirstLogId已经比Listener(Learner) WAL的LastLogId,此时是没法增量同步的。

我尝试理解一下你的想法,不对的话请指正。你是说leader在给listener发送wal的过程中,需要发送firstLogId 至 lastLogId这个区间的wal,但是当前的listener中的lastLogId比leader将要发送的这个区间的wal还要小,这样listener在接收wal的过程中,可能导致wal丢失,或者导致无法增量发送?对吗?

是的,可以这么理解,这种情况目前我在代码里面没有看到是怎么处理的。

listener那里自己记录了lastTermId和lastLogId,如果和leader的哪次wal同步失败了,会重试这次失败的wal,直到成功,然后再继续下一次wal同步。这个过程就确保了listener中的wal永远不会超过leader,即 listener-lastLogId 始终小于 leader 将要发送的 firstLogId。

std::pair<LogID, TermID> ESListener::lastCommittedLogId() {
    if (access(lastApplyLogFile_->c_str(), 0) != 0) {
        VLOG(3) << "Invalid or non-existent file : " << *lastApplyLogFile_;
        return {0, 0};
    }
    int32_t fd = open(lastApplyLogFile_->c_str(), O_RDONLY);
    if (fd < 0) {
        LOG(FATAL) << "Failed to open the file \"" << lastApplyLogFile_->c_str() << "\" ("
                   << errno << "): " << strerror(errno);
    }
    // read last logId from listener wal file.
    LogID logId;
    CHECK_EQ(pread(fd, reinterpret_cast<char*>(&logId), sizeof(LogID), 0),
             static_cast<ssize_t>(sizeof(LogID)));

    // read last termId from listener wal file.
    TermID termId;
    CHECK_EQ(pread(fd, reinterpret_cast<char*>(&termId), sizeof(TermID), sizeof(LogID)),
             static_cast<ssize_t>(sizeof(TermID)));
    close(fd);
    return {logId, termId};
}

LogID ESListener::lastApplyLogId() {
    if (access(lastApplyLogFile_->c_str(), 0) != 0) {
        VLOG(3) << "Invalid or non-existent file : " << *lastApplyLogFile_;
        return 0;
    }
    int32_t fd = open(lastApplyLogFile_->c_str(), O_RDONLY);
    if (fd < 0) {
        LOG(FATAL) << "Failed to open the file \"" << lastApplyLogFile_->c_str() << "\" ("
                   << errno << "): " << strerror(errno);
    }
    // read last applied logId from listener wal file.
    LogID logId;
    auto offset = sizeof(LogID) + sizeof(TermID);
    CHECK_EQ(pread(fd, reinterpret_cast<char*>(&logId), sizeof(LogID), offset),
             static_cast<ssize_t>(sizeof(LogID)));
    close(fd);
    return logId;
}

bool ESListener::writeAppliedId(LogID lastId, TermID lastTerm, LogID lastApplyLogId) {
    int32_t fd = open(
        lastApplyLogFile_->c_str(),
        O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC,
        0644);
    if (fd < 0) {
        VLOG(3) << "Failed to open file \"" << lastApplyLogFile_->c_str()
                << "\" (errno: " << errno << "): "
                << strerror(errno);
        return false;
    }
    auto raw = encodeAppliedId(lastId, lastTerm, lastApplyLogId);
    ssize_t written = write(fd, raw.c_str(), raw.size());
    if (written != (ssize_t)raw.size()) {
        VLOG(3) << idStr_ << "bytesWritten:" << written << ", expected:" << raw.size()
                << ", error:" << strerror(errno);
        close(fd);
        return false;
    }
    close(fd);
    return true;
}

leader的WAL同步失败了,确实会重试。但是这可以确保listener-lastLogId 始终小于 leader 将要发送的 firstLogId吗?

Leader的WAL不是会定时cleanWAL吗?在cleanWAL的时候也没有看到检查当前WAL是否同步到了所有的Learner的。

void NebulaStore::cleanWAL() {
SCOPE_EXIT {
    bgWorkers_->addDelayTask(FLAGS_clean_wal_interval_secs * 1000,
                             &NebulaStore::cleanWAL,
                             this);
};
// 1. 对每个在线的space
for (const auto& spaceEntry : spaces_) {
    if (FLAGS_rocksdb_disable_wal) {
        for (const auto& engine : spaceEntry.second->engines_) {
            engine->flush();
        }
    }
    // 1.1 对每个part
    for (const auto& partEntry : spaceEntry.second->parts_) {
        auto& part = partEntry.second;
        // 当前的Part是否需要clean wal?
        if (part->needToCleanWal()) {
            // clean wal by expired time
            part->wal()->cleanWAL();         // 定时删除老的WAL
        }
    }
}

void FileBasedWal::cleanWAL() {
std::lock_guard<std::mutex> g(walFilesMutex_);
if (walFiles_.empty()) {
    return;
}
auto now = time::WallClock::fastNowInSec();
// In theory we only need to keep the latest wal file because it is beging written now.
// However, sometimes will trigger raft snapshot even only a small amount of logs is missing,
// especially when we reboot all storage, so se keep one more wal.
size_t index = 0;
// 1. 第一个wal
auto it = walFiles_.begin();
auto size = walFiles_.size();
// 2. 如果wal的数量 < 2,不处理
if (size < 2) {
    return;
}
int count = 0;
int walTTL = FLAGS_wal_ttl;
while (it != walFiles_.end()) {
    // 3. 默认最少保留2个wal,并且4个小时的WAL,感觉应该修改成24小时。
    // keep at least two wal
    if (index++ < size - 2 && (now - it->second->mtime() > walTTL)) {
        VLOG(1) << "Clean wals, Remove " << it->second->path() << ", now: " << now
                << ", mtime: " << it->second->mtime();
        unlink(it->second->path());
        it = walFiles_.erase(it);
        count++;
    } else {
        ++it;
    }
}
if (count > 0) {
    LOG(INFO) << idStr_ << "Clean wals number " << count;
}
// 4. 更新当前的firstLogId_
firstLogId_ = walFiles_.begin()->second->firstId();

从代码中可以看出,Leader还是会定期删除WAL,并更新Leader的firstLogId_,这里面根本没有判断firstLogId_是否已经同步到了Learner,就会向前递增,最后肯定会大于Listener的LastLogId。

可以调整参数“ walTTL” ,另外listener的wal如果多次同步失败,主要原因是因为第三方全文索引库挂掉了,目前nebula没有提供第三方库的监听机制,需要用户自己配置监听,确保第三方全文索引的库不会长时间挂掉。