最近看fulltext index的支持,主要发现以下问题:
- Listener要保证高可用,必须部署多个。如果都往同一个 Elasticsearch集群中写入,会导致一个集群写多次数据?
- 不支持在已有的space上新增Listener。如果Listener挂了很长时间之后导致需要WAIT_SANPSHOT,Listener也不可用了,而且完全没法恢复Listener服务。
请问2.0里面会对这两个比较严重的问题做优化吗?
最近看fulltext index的支持,主要发现以下问题:
请问2.0里面会对这两个比较严重的问题做优化吗?
所以,上边的两个疑惑是不存在的。
如果只部署一个Listener的话,是部署Listener挂了,就没有leaner往Elasticsearch里面写数据了。所以我才认为需要部署多个Listener。另外每个Listener接收到数据日志之后都会apply,往Elasticsearch集群里面写数据,这样不是写了多次吗?
对于第二个问题,我对一个已经存在的space 新增一个 Listener,等于是在Raft里面新增加一个learner,必然会导致Raft Leader向learner发送snapshot。但是目前Listener::commitSnapshot里面根本没有实现,导致Listener根本不可用。这个跟Listener挂掉很长时间,WAL里面的日志太老导致要WAIT_SNAPSHOT是一样的逻辑,都是没有实现Listener::commitSnapshot。
std::pair<int64_t, int64_t> Listener::commitSnapshot(const std::vectorstd::string& rows,
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
LOG(WARNING) << “Listener snapshot has not implemented yet”;
UNUSED(committedLogId); UNUSED(committedLogTerm); UNUSED(finished);
int64_t count = 0;
int64_t size = 0;
for (const auto& row : rows) {
count++;
size += row.size();
// todo(doodle): could decode and apply
}
return {count, size};
}
我理解你的疑问在哪里了,
问题1,一个space部署多个listener,这多个listener会hash分布这个space包含的parts,所以每个listener会负责属于自己的一个或多个part,listener和listener之间不会存在操作同一个part的可能。这样的话,多个listener并发写es的时候,不会重复写同一条数据,因为数据也是被hash分配到不同的part上的。
问题2,listener会自己记录lastTermId和lastLogId, 所以这里没有拉snapshot,当listener重启后,会接着上次记录的lastTermId和lastLogId继续解析wal。
如果有一个 Listener 所在的服务器故障,直接起不来了。这种情况是不是没有办法处理,因为part是跟listener绑定的。
假设是服务器故障,只要ip不变,listener重启后会继续提供服务。最坏的场景是,服务器彻底坏掉,更换了一个相同IP的新服务器后,listener启动后会从头开始同步数据,这个过程较慢,但是不会影响集群的正常服务。
如果是IP也需要更换,目前做不到。listener还不支持add host功能。
我实在没有看到全量同步数据的实现啊。 Listener::commitSnapshot里面没有实现 decode and apply.
请分析lastTermId和lastLogId的处理逻辑。
我又去分析了一下代码,我觉得您可能没有理解我说的问题。
我的问题是 listener的lastLogId 已经小于Leader的 firstLogId的情况。Leader怎么将(listener.lastLogId, leader.firstLogId) 之间的数据同步给Listener?
什么是 “Leader的 firstLogId” ? 什么情况下会出现这个问题?能举个例子吗?我对你的疑问不太理解。
我的意思是 Listener实际上就是Raft的Learner,其上的数据都是Raft Part的Leader发送过来的吧。而Raft的同步也是要依赖Leader上的WAL日志的。Leader只能增量同步WAL的(FirstLogId, LastLogId)之间的数据,如果Leader WAL的FirstLogId已经比Listener(Learner) WAL的LastLogId,此时是没法增量同步的。
我尝试理解一下你的想法,不对的话请指正。你是说leader在给listener发送wal的过程中,需要发送firstLogId 至 lastLogId这个区间的wal,但是当前的listener中的lastLogId比leader将要发送的这个区间的wal还要小,这样listener在接收wal的过程中,可能导致wal丢失,或者导致无法增量发送?对吗?
是的,可以这么理解,这种情况目前我在代码里面没有看到是怎么处理的。
listener那里自己记录了lastTermId和lastLogId,如果和leader的哪次wal同步失败了,会重试这次失败的wal,直到成功,然后再继续下一次wal同步。这个过程就确保了listener中的wal永远不会超过leader,即 listener-lastLogId 始终小于 leader 将要发送的 firstLogId。
std::pair<LogID, TermID> ESListener::lastCommittedLogId() {
if (access(lastApplyLogFile_->c_str(), 0) != 0) {
VLOG(3) << "Invalid or non-existent file : " << *lastApplyLogFile_;
return {0, 0};
}
int32_t fd = open(lastApplyLogFile_->c_str(), O_RDONLY);
if (fd < 0) {
LOG(FATAL) << "Failed to open the file \"" << lastApplyLogFile_->c_str() << "\" ("
<< errno << "): " << strerror(errno);
}
// read last logId from listener wal file.
LogID logId;
CHECK_EQ(pread(fd, reinterpret_cast<char*>(&logId), sizeof(LogID), 0),
static_cast<ssize_t>(sizeof(LogID)));
// read last termId from listener wal file.
TermID termId;
CHECK_EQ(pread(fd, reinterpret_cast<char*>(&termId), sizeof(TermID), sizeof(LogID)),
static_cast<ssize_t>(sizeof(TermID)));
close(fd);
return {logId, termId};
}
LogID ESListener::lastApplyLogId() {
if (access(lastApplyLogFile_->c_str(), 0) != 0) {
VLOG(3) << "Invalid or non-existent file : " << *lastApplyLogFile_;
return 0;
}
int32_t fd = open(lastApplyLogFile_->c_str(), O_RDONLY);
if (fd < 0) {
LOG(FATAL) << "Failed to open the file \"" << lastApplyLogFile_->c_str() << "\" ("
<< errno << "): " << strerror(errno);
}
// read last applied logId from listener wal file.
LogID logId;
auto offset = sizeof(LogID) + sizeof(TermID);
CHECK_EQ(pread(fd, reinterpret_cast<char*>(&logId), sizeof(LogID), offset),
static_cast<ssize_t>(sizeof(LogID)));
close(fd);
return logId;
}
bool ESListener::writeAppliedId(LogID lastId, TermID lastTerm, LogID lastApplyLogId) {
int32_t fd = open(
lastApplyLogFile_->c_str(),
O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC,
0644);
if (fd < 0) {
VLOG(3) << "Failed to open file \"" << lastApplyLogFile_->c_str()
<< "\" (errno: " << errno << "): "
<< strerror(errno);
return false;
}
auto raw = encodeAppliedId(lastId, lastTerm, lastApplyLogId);
ssize_t written = write(fd, raw.c_str(), raw.size());
if (written != (ssize_t)raw.size()) {
VLOG(3) << idStr_ << "bytesWritten:" << written << ", expected:" << raw.size()
<< ", error:" << strerror(errno);
close(fd);
return false;
}
close(fd);
return true;
}
leader的WAL同步失败了,确实会重试。但是这可以确保listener-lastLogId 始终小于 leader 将要发送的 firstLogId吗?
Leader的WAL不是会定时cleanWAL吗?在cleanWAL的时候也没有看到检查当前WAL是否同步到了所有的Learner的。
void NebulaStore::cleanWAL() {
SCOPE_EXIT {
bgWorkers_->addDelayTask(FLAGS_clean_wal_interval_secs * 1000,
&NebulaStore::cleanWAL,
this);
};
// 1. 对每个在线的space
for (const auto& spaceEntry : spaces_) {
if (FLAGS_rocksdb_disable_wal) {
for (const auto& engine : spaceEntry.second->engines_) {
engine->flush();
}
}
// 1.1 对每个part
for (const auto& partEntry : spaceEntry.second->parts_) {
auto& part = partEntry.second;
// 当前的Part是否需要clean wal?
if (part->needToCleanWal()) {
// clean wal by expired time
part->wal()->cleanWAL(); // 定时删除老的WAL
}
}
}
void FileBasedWal::cleanWAL() {
std::lock_guard<std::mutex> g(walFilesMutex_);
if (walFiles_.empty()) {
return;
}
auto now = time::WallClock::fastNowInSec();
// In theory we only need to keep the latest wal file because it is beging written now.
// However, sometimes will trigger raft snapshot even only a small amount of logs is missing,
// especially when we reboot all storage, so se keep one more wal.
size_t index = 0;
// 1. 第一个wal
auto it = walFiles_.begin();
auto size = walFiles_.size();
// 2. 如果wal的数量 < 2,不处理
if (size < 2) {
return;
}
int count = 0;
int walTTL = FLAGS_wal_ttl;
while (it != walFiles_.end()) {
// 3. 默认最少保留2个wal,并且4个小时的WAL,感觉应该修改成24小时。
// keep at least two wal
if (index++ < size - 2 && (now - it->second->mtime() > walTTL)) {
VLOG(1) << "Clean wals, Remove " << it->second->path() << ", now: " << now
<< ", mtime: " << it->second->mtime();
unlink(it->second->path());
it = walFiles_.erase(it);
count++;
} else {
++it;
}
}
if (count > 0) {
LOG(INFO) << idStr_ << "Clean wals number " << count;
}
// 4. 更新当前的firstLogId_
firstLogId_ = walFiles_.begin()->second->firstId();
从代码中可以看出,Leader还是会定期删除WAL,并更新Leader的firstLogId_,这里面根本没有判断firstLogId_是否已经同步到了Learner,就会向前递增,最后肯定会大于Listener的LastLogId。
可以调整参数“ walTTL” ,另外listener的wal如果多次同步失败,主要原因是因为第三方全文索引库挂掉了,目前nebula没有提供第三方库的监听机制,需要用户自己配置监听,确保第三方全文索引的库不会长时间挂掉。