本系列的上一篇里分析了 PaxosStore 中的协议日志的实现,本篇将深入协议过程中的一些细节,这些实现细节集中在 src/EntityWorker.cpp
这个庞然大物里。
在对某个 Entity 执行读写前,需要保证该 Entity 在本地的所有 Entry 都已经提交到 DB 了,这样读写才会有意义。简而言之就是 CommittedEntry
= MaxChosenEntry
。
借此机会,过一遍样例的流程。从 example/CardTool.cpp
这个 Client 开始看:
int main(int argc, char **argv) {
...
Run(strAddr, strOper, oRequest, oResponse, &vecIPList);
...
}
grpc::Status Run(const std::string &strAddr, const std::string &strOper,
example::CardRequest &oRequest, example::CardResponse &oResponse,
std::vector<std::string> *poIPList) {
...
switch (strOper[0]) {
case 'I':
oRequest.set_entity_id(GetEntityID(oRequest.card_id()));
oRequest.mutable_card_info()->set_last_modified_time(time(0));
oStatus = oClient.Call(oRequest.entity_id(), example::eInsertCard, &oRequest, &oResponse);
break;
}
...
}
该 Client 会调用 gRPC 向 Server 发送请求,直接看 Server 端的处理函数 example/ServiceImpl.cpp
:
int clsServiceImpl::InsertCard(grpc::ServerContext &oContext, const example::CardRequest &oRequest,
example::CardResponse &oResponse) {
return BatchFunc(example::OperCode::eInsertCard, oRequest, oResponse);
}
int clsServiceImpl::BatchFunc(int iOper, const example::CardRequest &oRequest,
example::CardResponse &oResponse) {
uint64_t iStartUS = Certain::GetCurrTimeUS();
// 1. 将请求构造为 QueueItem,压入队列,压入 BatchMap
uint64_t iPushStartUS = Certain::GetCurrTimeUS();
QueueItem_t *poItem = new QueueItem_t;
Certain::clsAutoDelete<QueueItem_t> oAutoDelete(poItem);
poItem->iOper = iOper;
poItem->iEntityID = oRequest.entity_id();
poItem->poRequest = (void *)&oRequest;
poItem->poResponse = (void *)&oResponse;
poItem->iRet = BatchStatus::WAITING;
{
Certain::clsThreadLock oLock(&m_poBatchMapMutex);
m_oBatchMap[poItem->iEntityID].push(poItem);
}
uint64_t iPushEndUS = Certain::GetCurrTimeUS();
// 2. 上锁,同一个 EntityID 的请求都会在队列里
uint64_t iLockStartUS = Certain::GetCurrTimeUS();
Certain::clsAutoEntityLock oAuto(poItem->iEntityID);
if (poItem->iRet != BatchStatus::WAITING) {
return poItem->iRet;
}
clsDBImpl *poDBEngine =
dynamic_cast<clsDBImpl *>(Certain::clsCertainWrapper::GetInstance()->GetDBEngine());
dbtype::DB *poDB = poDBEngine->GetDB();
clsTemporaryTable oTable(poDB);
uint64_t iLockEndUS = Certain::GetCurrTimeUS();
// 3. 将具有相同 EntityID 的请求弹出
uint64_t iPopStartUS = Certain::GetCurrTimeUS();
std::queue<QueueItem_t *> oQueue;
{
Certain::clsThreadLock oLock(&m_poBatchMapMutex);
auto iter = m_oBatchMap.find(poItem->iEntityID);
assert(iter != m_oBatchMap.end());
while (!iter->second.empty()) {
oQueue.push(iter->second.front());
iter->second.pop();
}
}
uint64_t iPopEndUS = Certain::GetCurrTimeUS();
// 4. 该 EntityID 执行 EntityCatchUp,如果失败则统一返回
uint64_t iCatchUpStartUS = Certain::GetCurrTimeUS();
Certain::clsCertainWrapper *poCertain = Certain::clsCertainWrapper::GetInstance();
uint64_t iEntry = 0, iMaxCommitedEntry = 0;
int iRet = poCertain->EntityCatchUp(poItem->iEntityID, iMaxCommitedEntry);
if (iRet != 0) {
BatchReturn(&oQueue, iRet);
return iRet;
}
uint64_t iCatchUpEndUS = Certain::GetCurrTimeUS();
// 5. 遍历所有请求,执行 HandleSingleCommand
uint64_t iBatchHandleStartUS = Certain::GetCurrTimeUS();
std::vector<uint64_t> vecUUID;
uint64_t iQueueSize = oQueue.size();
uint64_t iRead = 0, iWrite = 0;
while (iQueueSize > 0) {
QueueItem_t *poItem = oQueue.front();
oQueue.pop();
--iQueueSize;
assert(poItem->iRet == BatchStatus::WAITING);
HandleSingleCommand(&oTable, poItem, &vecUUID);
if (poItem->iOper == example::OperCode::eSelectCard)
iRead++;
else
iWrite++;
// Re-push items since they need to RunPaxos.
oQueue.push(poItem);
}
uint64_t iBatchHandleEndUS = Certain::GetCurrTimeUS();
// 6. 跑 Paxos 协议,批量返回
uint64_t iRunPaxosStartUS = Certain::GetCurrTimeUS();
iEntry = iMaxCommitedEntry + 1;
iRet = poCertain->RunPaxos(poItem->iEntityID, iEntry, example::OperCode::eBatchFunc, vecUUID,
oTable.GetWriteBatchString());
BatchReturn(&oQueue, iRet);
uint64_t iRunPaxosEndUS = Certain::GetCurrTimeUS();
uint64_t iEndUS = Certain::GetCurrTimeUS();
// 7. 提供成功则执行提交
if (iRet == 0) {
poDBEngine->Commit(poItem->iEntityID, iEntry, oTable.GetWriteBatchString());
}
CertainLogInfo(
"iEntityID %lu iPushTime %lu iLockTime %lu iPopTime %lu "
"iCatchUpTime %lu iBatchHandleTime %lu iRunPaxos %lu iTotalUS %lu "
"iRead %lu iWrite %lu",
poItem->iEntityID, iPushEndUS - iPushStartUS, iLockEndUS - iLockStartUS,
iPopEndUS - iPopStartUS, iCatchUpEndUS - iCatchUpStartUS,
iBatchHandleEndUS - iBatchHandleStartUS, iRunPaxosEndUS - iRunPaxosStartUS, iEndUS - iStartUS,
iRead, iWrite);
iRet = poItem->iRet;
return iRet;
}
注意步骤 4 中的 EntityCatchUp
,在跑 Paxos
协议前会先确定本地所有的 Entry 已经提交到 DB。注意这里并不保证也不需要保证 Entry 是全局最新的,因为如果不是,跑协议的过程中会失败、然后触发 CatchUp,之后的博文会分析该过程。
来看 EntityCatchUp
的实现,位于 src/CertainWrapper.cpp
:
int clsCertainWrapper::EntityCatchUp(uint64_t iEntityID, uint64_t &iMaxCommitedEntry) {
const uint32_t iMaxCommitNum = GetConf()->GetMaxCommitNum();
uint32_t iCommitCnt = 0;
uint64_t iEntry = 0;
std::string strWriteBatch;
uint32_t iFlag = 0;
iMaxCommitedEntry = 0;
// 从 Data DB 中读取 MaxCommitedEntry
int iRet = m_poDBEngine->GetEntityMeta(iEntityID, iMaxCommitedEntry, iFlag);
if (iRet != 0 && iRet != eRetCodeNotFound) {
CertainLogError("iEntityID %lu GetEntityMeta ret %d", iEntityID, iRet);
return iRet;
}
// 至多尝试 iMaxCommitNum 次 Commit
while (iCommitCnt < iMaxCommitNum) {
// 检查 DB 状态
iRet = CheckDBStatus(iEntityID, iMaxCommitedEntry);
if (iRet == eRetCodeOK) {
break;
} else if (iRet != eRetCodeDBLagBehind) {
CertainLogError("CheckDBStatus iEntityID %lu ret %d", iEntityID, iRet);
return iRet;
}
iEntry = iMaxCommitedEntry + 1;
OSS::ReportGetAndCommit();
// 从 PLog 中获取 iMaxCommitedEntry 下一个的 Record
iRet = GetWriteBatch(iEntityID, iEntry, strWriteBatch);
if (iRet != 0) {
CertainLogError("E(%lu, %lu) GetWriteBatch ret %d", iEntityID, iEntry, iRet);
return iRet;
}
TIMERMS_START(iCommitUseTimeMS);
// 将该 Record 提交到 Data DB
iRet = m_poDBEngine->Commit(iEntityID, iEntry, strWriteBatch);
TIMERMS_STOP(iCommitUseTimeMS);
OSS::ReportDBCommit(iRet, iCommitUseTimeMS);
if (iRet != 0) {
CertainLogError("E(%lu, %lu) Commit ret %d", iEntityID, iEntry, iRet);
return iRet;
}
iCommitCnt++;
iMaxCommitedEntry++;
}
if (iCommitCnt == iMaxCommitNum && CheckDBStatus(iEntityID, iMaxCommitedEntry) != eRetCodeOK) {
CertainLogError("iEntityID %lu iCommitCnt == iMaxCommitNum %u", iEntityID, iMaxCommitNum);
return Certain::eRetCodeDBCommitLimited;
}
return 0;
}
int clsCertainWrapper::CheckDBStatus(uint64_t iEntityID, uint64_t iCommitedEntry) {
uint64_t iMaxContChosenEntry = 0;
uint64_t iMaxChosenEntry = 0;
uint64_t iLeaseTimeoutMS = 0;
bool bTriggleRecovered = false;
// 从内存中读取 Entity 对应的 iMaxContChosenEntry 和 iMaxChosenEntry 信息
int iRet = m_poEntityGroupMng->GetMaxChosenEntry(iEntityID, iMaxContChosenEntry, iMaxChosenEntry,
iLeaseTimeoutMS);
if (iRet == eRetCodeNotFound) {
// 内存中没有查询到该 EntityID 的信息,则通过 PLog 恢复
bTriggleRecovered = true;
// 构造一条 Recover Cmd
clsRecoverCmd *poCmd = new clsRecoverCmd(iEntityID, iCommitedEntry);
clsAutoDelete<clsRecoverCmd> oAuto(poCmd);
poCmd->SetTimestampUS(GetCurrTimeUS());
// 发送一条 Recover Cmd 并等待结果
iRet = SyncWaitCmd(poCmd);
if (iRet != 0) {
CertainLogError("iEntityID %lu SyncWaitCmd ret %d", iEntityID, iRet);
return iRet;
}
// 从 Recover Cmd 的结果中读取 iMaxContChosenEntry 和 iMaxChosenEntry
iMaxContChosenEntry = poCmd->GetMaxContChosenEntry();
iMaxChosenEntry = poCmd->GetMaxChosenEntry();
} else if (iRet != 0) {
CertainLogError("iEntityID %lu GetMaxChosenEntry iRet %d", iEntityID, iRet);
return iRet;
} else if (iLeaseTimeoutMS > 0) {
// 租约相关,以后再分析
OSS::ReportLeaseWait();
poll(NULL, 0, iLeaseTimeoutMS);
CertainLogError("iEntityID %lu wait iLeaseTimeoutMS %lu", iEntityID, iLeaseTimeoutMS);
iRet = m_poEntityGroupMng->GetMaxChosenEntry(iEntityID, iMaxContChosenEntry, iMaxChosenEntry,
iLeaseTimeoutMS);
if (iRet != 0) {
CertainLogError("iEntityID %lu iLeaseTimeoutMS %lu ret %d", iEntityID, iLeaseTimeoutMS, iRet);
return iRet;
}
}
// iMaxContChosenEntry may update posterior to iCommitedEntry.
if (iMaxContChosenEntry < iCommitedEntry) {
iMaxContChosenEntry = iCommitedEntry;
}
if (iCommitedEntry + m_poConf->GetMaxCatchUpNum() <= iMaxContChosenEntry) {
// All entrys of the entity are eliminated, help trigger db catchup.
if (iMaxContChosenEntry == iMaxChosenEntry) {
CertainLogError("notify_db iEntityID %lu entrys: %lu %lu", iEntityID, iCommitedEntry,
iMaxContChosenEntry);
// 通知 DBWorker 异步做恢复
clsDBWorker::NotifyDBWorker(iEntityID);
}
CertainLogError("iEntityID %lu entrys: %lu %lu %lu", iEntityID, iCommitedEntry,
iMaxContChosenEntry, iMaxChosenEntry);
if (!bTriggleRecovered) {
// 触发 Recover,同样也是发送一条 Recover Cmd
TriggeRecover(iEntityID, iCommitedEntry);
}
return eRetCodeCatchUp;
}
// 本地的 Chosen 小于全局的 Chosen,先追赶上全局的进度
if (iMaxContChosenEntry < iMaxChosenEntry) {
CertainLogError("iEntityID %lu entrys: %lu %lu %lu", iEntityID, iCommitedEntry,
iMaxContChosenEntry, iMaxChosenEntry);
if (!bTriggleRecovered) {
// 触发 Recover
TriggeRecover(iEntityID, iCommitedEntry);
}
return eRetCodeCatchUp;
}
if (iCommitedEntry >= iMaxChosenEntry) {
// 只有 Committed 和 Chosen 一致,DB 才是本地最新的
return eRetCodeOK;
} else {
// 否则 Committed 需要追赶
return eRetCodeDBLagBehind;
}
}
int clsCertainWrapper::GetWriteBatch(uint64_t iEntityID, uint64_t iEntry, string &strWriteBatch,
uint64_t *piValueID) {
EntryRecord_t tRecord;
// 从 PLog 中读取对应的 Record
int iRet = m_poPLogEngine->GetRecord(iEntityID, iEntry, tRecord);
if (iRet != 0) {
if (iRet != eRetCodeNotFound) {
CertainLogFatal("BUG probably E(%lu, %lu) ret %d", iEntityID, iEntry, iRet);
return iRet;
}
CertainLogInfo("E(%lu, %lu) not found", iEntityID, iEntry);
return eRetCodeNotFound;
}
// 要确定已经 Chosen 了
if (!tRecord.bChosen) {
CertainLogInfo("unchosen: %s", EntryRecordToString(tRecord).c_str());
return eRetCodeNotFound;
}
if (piValueID != NULL) {
*piValueID = tRecord.tValue.iValueID;
}
// 拿到已经 Chosen 的 WriteBatch
strWriteBatch = tRecord.tValue.strValue;
return 0;
}
这里有几个进度:
CommittedEntry
MaxContChosenEntry
MaxChosenEntry
只有 CommittedEntry
< MaxContChosenEntry
== MaxChosenEntry
时,DB 才可以直接通过读取 PLog 的方式来追赶到本地最新的进度,其他情况均需要进行先进行 Recover。来继续看看这里的 Recover 怎么实现的。
int clsCertainWrapper::SyncWaitCmd(clsClientCmd *poCmd) {
uint64_t iEntityID = poCmd->GetEntityID();
uint64_t iEntry = poCmd->GetEntry();
uint32_t iPipeIdx;
// 获取一个空的通知管道
int iRet = m_poPipeMng->GetIdlePipeIdx(iPipeIdx, iEntityID);
if (iRet != 0) {
CertainLogError("E(%lu, %lu) GetIdlePipeIdx ret %d", iEntityID, iEntry, iRet);
return iRet;
}
poCmd->SetPipeIdx(iPipeIdx);
// 分配 IOWorker
uint32_t iIOWorkerID = Hash(iEntityID) % m_poConf->GetIOWorkerNum();
poCmd->SetIOTracker(IOTracker_t(0, 0, iIOWorkerID));
uint32_t iEntityWorkerID = Hash(iEntityID) % m_poConf->GetEntityWorkerNum();
clsIOReqQueue *poIOReqQueue = m_poQueueMng->GetIOReqQueue(iEntityWorkerID);
// 将 Cmd 推入对应的 IO 队列中
iRet = poIOReqQueue->PushByMultiThread(poCmd);
if (iRet != 0) {
m_poPipeMng->PutIdlePipeIdx(iPipeIdx);
CertainLogError("PushByMultiThread ret %d", iRet);
return eRetCodeQueueFailed;
}
uintptr_t iCheck = (uintptr_t)poCmd;
CertainLogDebug("iPipeIdx %u iPtr %lu E(%lu, %lu) iUUID %lu", iPipeIdx, iCheck, iEntityID, iEntry,
poCmd->GetUUID());
bool bOneMoreTry = false;
// 等待管道通知(Cmd 完成后管道另一段有写入,这边读取就完成了等待)
iRet = m_poPipeMng->SyncWaitByPipeIdx(iPipeIdx, iCheck);
if (iRet == eRetCodePipePtrErr) {
bOneMoreTry = true;
// Try one more time, prev timeout ptr may come first.
// There's probably some BUG in certain.
iRet = m_poPipeMng->SyncWaitByPipeIdx(iPipeIdx, iCheck);
}
if (iRet != 0) {
CertainLogFatal("BUG probably ibOneMoreTry %u ret %d cmd: %s", bOneMoreTry, iRet,
poCmd->GetTextCmd().c_str());
m_poPipeMng->PutIdlePipeIdx(iPipeIdx);
return eRetCodePipeWaitFailed;
}
// 回收通知管道
m_poPipeMng->PutIdlePipeIdx(iPipeIdx);
return poCmd->GetResult();
}
塞入 IO 队列中的 Recover Cmd 会在 EntityWorker
中被处理掉,这里的调用关系非常复杂:
void clsEntityWorker::Run() {
while (1) {
...
// 2.Do with IO request.
poCmd = NULL;
iRet = m_poIOReqQueue->TakeByOneThread(&poCmd);
if (iRet == 0) {
Assert(poCmd != NULL);
bHasWork = true;
iRet = DoWithIOReq(poCmd);
if (iRet < 0) {
CertainLogError("DoWithIOReq ret %d cmd %s", iRet, poCmd->GetTextCmd().c_str());
}
if (iRet != eRetCodePtrReuse) {
delete poCmd, poCmd = NULL;
}
}
...
}
}
int clsEntityWorker::DoWithIOReq(clsCmdBase *poCmd) {
if (clsCertainWrapper::GetInstance()->GetConf()->GetEnableLearnOnly()) {
if (poCmd->GetCmdID() != kPaxosCmd) {
clsClientCmd *poClientCmd = dynamic_cast<clsClientCmd *>(poCmd);
AssertNotEqual(poCmd, NULL);
InvalidClientCmd(poClientCmd, eRetCodeRejectAll);
return eRetCodePtrReuse;
} else {
return eRetCodeRejectAll;
}
}
uint64_t iEntityID = poCmd->GetEntityID();
AssertEqual(Hash(iEntityID) % m_poConf->GetEntityWorkerNum(), m_iWorkerID);
CertainLogInfo("cmd: %s", poCmd->GetTextCmd().c_str());
clsPaxosCmd *poPaxosCmd = NULL;
clsClientCmd *poClientCmd = NULL;
clsRecoverCmd *poRecoverCmd = NULL;
switch (poCmd->GetCmdID()) {
case kWriteBatchCmd:
poClientCmd = dynamic_cast<clsClientCmd *>(poCmd);
return DoWithClientCmd(poClientCmd);
case kRecoverCmd:
poRecoverCmd = dynamic_cast<clsRecoverCmd *>(poCmd);
return DoWithRecoverCmd(poRecoverCmd);
case kPaxosCmd:
poPaxosCmd = dynamic_cast<clsPaxosCmd *>(poCmd);
return DoWithPaxosCmd(poPaxosCmd);
default:
CertainLogError("cmd: %s", poCmd->GetTextCmd().c_str());
Assert(false);
}
return 0;
}
int clsEntityWorker::DoWithRecoverCmd(clsRecoverCmd *poCmd) {
uint64_t iEntityID = poCmd->GetEntityID();
uint64_t iMaxCommitedEntry = poCmd->GetMaxCommitedEntry();
CertainLogInfo("cmd: %s", poCmd->GetTextCmd().c_str());
// 这里 Entry = 0 有特殊作用,下文会解释
EntryInfo_t *ptInfo = m_poEntryMng->FindEntryInfo(iEntityID, 0);
AssertEqual(ptInfo, NULL);
EntityInfo_t *ptEntityInfo = m_poEntityMng->FindEntityInfo(iEntityID);
if (ptEntityInfo != NULL) {
// 内存中有对应的 EntityInfo
CertainLogError("Failover E(%lu, %lu) Entrys: %lu %lu %lu ref %d %u", iEntityID,
iMaxCommitedEntry, ptEntityInfo->iMaxContChosenEntry,
ptEntityInfo->iMaxChosenEntry, ptEntityInfo->iMaxPLogEntry,
ptEntityInfo->iRefCount, ptEntityInfo->bRangeLoading);
if (poCmd->IsEvictEntity()) {...}
Assert(m_poEntityMng->Refresh(ptEntityInfo));
if (poCmd->IsCheckGetAll()) {...}
// 检查是否进行 CatchUp
CheckForCatchUp(ptEntityInfo, INVALID_ACCEPTOR_ID, 0);
CheckIfNeedNotifyDB(ptEntityInfo);
poCmd->SetMaxChosenEntry(uint64_t(ptEntityInfo->iMaxChosenEntry));
poCmd->SetMaxContChosenEntry(uint64_t(ptEntityInfo->iMaxContChosenEntry));
AssertEqual(ptEntityInfo->poClientCmd, NULL);
if (ptEntityInfo->iMaxContChosenEntry < ptEntityInfo->iMaxChosenEntry) {
// 本地晚于全局进度,返回 CatchUp 状态码
InvalidClientCmd(poCmd, eRetCodeCatchUp);
return eRetCodePtrReuse;
} else if (ptEntityInfo->iMaxPLogEntry == INVALID_ENTRY) {
// PLog 还没有加载完,如果正在加载,就给 Cmd 加上超时
CheckIfWaitRecoverCmd(ptEntityInfo, poCmd);
return eRetCodePtrReuse;
}
// 其他情况则完成 OK 状态
InvalidClientCmd(poCmd, eRetCodeOK);
} else {
// 内存中没有对应的 EntityInfo
if (poCmd->IsEvictEntity()) {...}
if (!m_poEntityMng->CheckAndEliminate()) {...}
// 创建一个
ptEntityInfo = m_poEntityMng->CreateEntityInfo(iEntityID);
if (ptEntityInfo == NULL) {
CertainLogFatal("CreateEntityInfo failed cmd: %s", poCmd->GetTextCmd().c_str());
InvalidClientCmd(poCmd, eRetCodeRouteErr);
return eRetCodePtrReuse;
}
// 检查是否进行 CatchUp
CheckForCatchUp(ptEntityInfo, INVALID_ACCEPTOR_ID, 0);
CheckIfWaitRecoverCmd(ptEntityInfo, poCmd);
return eRetCodePtrReuse;
}
return eRetCodePtrReuse;
}
bool clsEntityWorker::InvalidClientCmd(clsClientCmd *poCmd, int iResult) {
if (poCmd == NULL) {
return false;
}
// 设定 Result
poCmd->SetResult(iResult);
if (iResult != eRetCodeOK) {
CertainLogError("InvalidClientCmd cmd %u uuid %lu result: %d", poCmd->GetCmdID(),
poCmd->GetUUID(), iResult);
}
// 使用 IO Worker 通知 Cmd 完成
int iRet = m_poIOWorkerRouter->Go(poCmd);
if (iRet != 0) {
CertainLogError("BUG probably Go ret %d poCmd: %s", iRet, poCmd->GetTextCmd().c_str());
// sum of list size <= MAX_ASYNC_PIPE_NUM
m_poWaitingGoList.push_back(poCmd);
return false;
}
return true;
}
// 如果不是 bRangeLoading 状态,直接报错 eRetCodeQueueFull
// 否则新建 Entry(0),并加到 Timeout 队列里,超时的处理之后会提到
void clsEntityWorker::CheckIfWaitRecoverCmd(EntityInfo_t *ptEntityInfo, clsRecoverCmd *poCmd) {
// If the cmd is most likely timeout eventually, do fast failover.
if (!ptEntityInfo->bRangeLoading) {
// It should range load from plog, but failed as queue full.
InvalidClientCmd(poCmd, eRetCodeQueueFull);
} else {
//
ptEntityInfo->poClientCmd = poCmd;
// 再次出现 Entry(0),用来对应 RecoverCmd
EntryInfo_t *ptInfo = m_poEntryMng->CreateEntryInfo(ptEntityInfo, 0);
m_poEntryMng->AddTimeout(ptInfo, m_poConf->GetRecoverTimeoutMS());
}
}
// NotifyedEntry < MaxContChosenEntry,发通知给 DBWorker 做 CatchUp
void clsEntityWorker::CheckIfNeedNotifyDB(EntityInfo_t *ptEntityInfo) {
if (ptEntityInfo->iNotifyedEntry < ptEntityInfo->iMaxContChosenEntry) {
int iRet = clsDBWorker::NotifyDBWorker(ptEntityInfo->iEntityID);
if (iRet != 0) {
CertainLogError("NotifyDBWorker iEntityID %lu ret %d", ptEntityInfo->iEntityID, iRet);
} else {
ptEntityInfo->iNotifyedEntry = ptEntityInfo->iMaxContChosenEntry;
}
}
}
DoWithRecoverCmd
里最核心的调用就是 CheckForCatchUp
了,用以检查并实施 CatchUp。该函数在 EntityWorker
中频繁出现,涉及的逻辑非常复杂,下一节详述。
CheckForCatchUp
会有两种行为,一种是 MaxPLogEntry
== INVALID_ENTRY
,即还没有从 PLog 中加载记录,则调用 RangeLoadFromPLog
加载 PLog;另一种是 MaxContChosenEntry
< MaxChosenEntry
,本地的进度晚于全局的进度,则调用 LimitedCatchUp
做限制性恢复。
// 创建 CatchUp 的 Entry,调用 LimitedCatchUp
// CheckForCatchUp may elimate current EntryInfo_t.
// So it must be called in the end of use of EntryInfo_t.
void clsEntityWorker::CheckForCatchUp(EntityInfo_t *ptEntityInfo, uint32_t iDestAcceptorID,
uint64_t iDestMaxChosenEntry) {
uint64_t iEntityID = ptEntityInfo->iEntityID;
if (ptEntityInfo->iMaxChosenEntry < iDestMaxChosenEntry) {
CertainLogInfo("iMaxChosenEntry %lu iDestMaxChosenEntry %lu", ptEntityInfo->iMaxChosenEntry,
iDestMaxChosenEntry);
ptEntityInfo->iMaxChosenEntry = iDestMaxChosenEntry;
}
// 预先创建好 [MaxPLogEntry + 1, MaxChosenEntry] 的 Entry
if (ptEntityInfo->iMaxPLogEntry != INVALID_ENTRY &&
ptEntityInfo->iMaxPLogEntry < ptEntityInfo->iMaxChosenEntry) {
uint32_t iPreOpenCnt = 0;
for (uint32_t i = 1; i <= m_poConf->GetMaxCatchUpNum(); ++i) {
uint64_t iEntry = ptEntityInfo->iMaxPLogEntry + i;
if (iEntry > ptEntityInfo->iMaxChosenEntry) {
break;
}
if (CheckIfEntryNumLimited(iEntityID, false)) {
break;
}
EntryInfo_t *ptInfo = m_poEntryMng->FindEntryInfo(iEntityID, iEntry);
if (ptInfo == NULL) {
ptInfo = m_poEntryMng->CreateEntryInfo(ptEntityInfo, iEntry);
iPreOpenCnt++;
} else {
break;
}
}
if (iPreOpenCnt > 0) {
CertainLogError("E(%lu, %lu) iPreOpenCnt %u", iEntityID, ptEntityInfo->iMaxChosenEntry,
iPreOpenCnt);
}
}
// 正在 RangeLoad,则返回
if (ptEntityInfo->bRangeLoading) {
CertainLogInfo("iEntityID %lu entrys: %lu %lu %lu", iEntityID,
ptEntityInfo->iMaxContChosenEntry, ptEntityInfo->iCatchUpEntry,
ptEntityInfo->iMaxChosenEntry);
return;
}
// 正在 GetAll,则返回
if (ptEntityInfo->bGetAllPending) {
CertainLogError("EntityID %lu GetAllPending, not need catch up", iEntityID);
return;
}
// RangeLoadFromPLog --> iMaxPLogEntry == INVALID_ENTRY
// LimitedCatchUp --> iMaxContChosenEntry < iMaxChosenEntry
int iLoopCnt = 0, iCatchUpCnt = 0;
while (ptEntityInfo->iMaxContChosenEntry < ptEntityInfo->iMaxChosenEntry) {
int iRet = LimitedCatchUp(ptEntityInfo, iDestAcceptorID);
if (iRet < 0) {
CertainLogError("LimitedCatchUp ret %d", iRet);
break;
}
iLoopCnt++;
iCatchUpCnt += iRet;
// LimitedCatchUp 可能触发 RangeLoad,此时需要等待异步恢复完成
if (ptEntityInfo->bRangeLoading) {
break;
}
// LimitedCatchUp 的循环中 break 退出的,这里也继续 break
if (ptEntityInfo->iMaxContChosenEntry < ptEntityInfo->iCatchUpEntry) {
break;
}
// 一个都没有恢复,退出
if (iRet == 0) {
break;
}
}
// One remain group and one newly extend group is processed at most.
// But it is possible when db lags badly, and entry increases quickly.
if (iLoopCnt >= 3) {
CertainLogError("Check if db lags badly Cnt(%d, %d) E(%lu, %lu)", iLoopCnt, iCatchUpCnt,
iEntityID, ptEntityInfo->iMaxChosenEntry);
}
// 处于未加载 PLog 的状态,则调用 RangeLoadFromPLog 加载恢复数据
if (ptEntityInfo->iMaxPLogEntry == INVALID_ENTRY &&
ptEntityInfo->iMaxContChosenEntry == ptEntityInfo->iMaxChosenEntry &&
!ptEntityInfo->bRangeLoading) {
RangeLoadFromPLog(ptEntityInfo);
}
}
RangeLoadFromPLog
在其他地方也有调用,将在下一小节分析;而 LimitedCatchUp
仅在此处有调用:
// 设定 CatchUp 的目标,遍历,将已经 Chosen 的记录 PushCmdToDBWorker,实现 CatchUp
// 当找不到对应的 EntryInfo 时,会尝试 RangeLoadFromPLog / LoadFromPLogWorker恢复并退出
int clsEntityWorker::LimitedCatchUp(EntityInfo_t *ptEntityInfo, uint32_t iDestAcceptorID) {
// MaxContChosenEntry <= CatchUpEntry <= MaxChosenEntry
AssertNotMore(ptEntityInfo->iMaxContChosenEntry, ptEntityInfo->iCatchUpEntry);
AssertNotMore(ptEntityInfo->iCatchUpEntry, ptEntityInfo->iMaxChosenEntry);
// Used to make LimitedCatchUp amortized complexity O(1).
bool bExtend = false;
uint64_t iEntityID = ptEntityInfo->iEntityID;
// 设定 MaxCatchUpEntry 目标
if (ptEntityInfo->iMaxContChosenEntry == ptEntityInfo->iCatchUpEntry) {
uint64_t iOldCatchUpEntry = ptEntityInfo->iCatchUpEntry;
CertainLogInfo("extend iEntityID %lu entrys: %lu %lu %lu %lu", iEntityID,
ptEntityInfo->iMaxContChosenEntry, ptEntityInfo->iCatchUpEntry,
ptEntityInfo->iMaxChosenEntry, ptEntityInfo->iMaxPLogEntry);
// 最多 CatchUp MaxCatchUpNum 条记录
ptEntityInfo->iCatchUpEntry = min(ptEntityInfo->iCatchUpEntry + m_poConf->GetMaxCatchUpNum(),
uint64_t(ptEntityInfo->iMaxChosenEntry));
bExtend = true;
OSS::ReportBatchCatchUp(ptEntityInfo->iCatchUpEntry - iOldCatchUpEntry);
}
if (iDestAcceptorID != INVALID_ACCEPTOR_ID) {
ptEntityInfo->iActiveAcceptorID = iDestAcceptorID;
}
int iReadyCnt = 0, iCatchUpCnt = 0;
for (uint64_t iEntry = ptEntityInfo->iMaxContChosenEntry + 1;
iEntry <= ptEntityInfo->iCatchUpEntry; ++iEntry) {
AssertEqual(ptEntityInfo->bRangeLoading, false);
EntryInfo_t *ptInfo = m_poEntryMng->FindEntryInfo(iEntityID, iEntry);
if (ptInfo == NULL) {
// 内存中找不到 EntryInfo
// 刚刚拓展了 CatchUp 范围,则尝试 RangeLoad 并退出
if (bExtend) {
RangeLoadFromPLog(ptEntityInfo);
break;
}
if (CheckIfEntryNumLimited(iEntityID, false)) {
CertainLogError("E(%lu, %lu) catchup limited by entry", iEntityID, iEntry);
break;
}
// 内存中找不到,也可能是由于 LRU 缓存的自动删除
// 调用 LoadFromPLogWorker 恢复单个 entry 的记录
if (ptEntityInfo->iMaxPLogEntry == INVALID_ENTRY || iEntry <= ptEntityInfo->iMaxPLogEntry) {
ptInfo = m_poEntryMng->CreateEntryInfo(ptEntityInfo, iEntry);
if (LoadFromPLogWorker(ptInfo) != 0) {
m_poEntryMng->DestroyEntryInfo(ptInfo);
break;
}
} else {
ptInfo = m_poEntryMng->CreateEntryInfo(ptEntityInfo, iEntry);
}
CertainLogError("Check it E(%lu, %lu)", iEntityID, iEntry);
}
// 确定已经 Chosen 的
if (!ptInfo->bUncertain && ptInfo->poMachine->GetEntryState() == kEntryStateChosen) {
// 恰好是接下来需要提交的那一个
if (ptEntityInfo->iMaxContChosenEntry + 1 == iEntry) {
// 推入 DB 处理队列
PushCmdToDBWorker(ptInfo);
ptEntityInfo->iMaxContChosenEntry++;
// 成功地 CatchUp 一个
iCatchUpCnt++;
}
// 继续处理,这里很重要,和下面的 break 一起来看
continue;
}
// 还没有 Chosen 的,并且没有加入超时列表的,说明它没有在等待异步操作结果
if (!ptInfo->bUncertain && !m_poEntryMng->WaitForTimeout(ptInfo)) {
// 激活该 EntryInfo
if (!ActivateEntry(ptInfo)) {
break;
}
iReadyCnt++;
}
// 流程走到这里,并且不是刚刚拓展过范围,则直接退出
if (!bExtend) {
break;
}
}
// break 退出的,MaxContChosenEntry < CatchUpEntry
if (iCatchUpCnt > 1 || iReadyCnt > 0) {
CertainLogInfo(
"iEntityID %lu Entrys: %lu <= %lu <= %lu "
"iCatchUpCnt %d iReadyCnt %d",
ptEntityInfo->iEntityID, ptEntityInfo->iMaxContChosenEntry, ptEntityInfo->iCatchUpEntry,
ptEntityInfo->iMaxChosenEntry, iCatchUpCnt, iReadyCnt);
}
return iCatchUpCnt;
}
在 CheckForCatchUp
和 DoWithPaxosCmd
中会调用 RangeLoadFromPLog
,用来批量的恢复 PLog 中的 Record 记录到内存中。通过构造一条 RecoverCmd
启动该流程:
int clsEntityWorker::RangeLoadFromPLog(EntityInfo_t *ptEntityInfo) {
if (CheckIfEntryNumLimited(ptEntityInfo->iEntityID)) {
CertainLogError("iEntityID %lu stop loading by entry limit", ptEntityInfo->iEntityID);
return 0;
}
if (ptEntityInfo->iMaxContChosenEntry < ptEntityInfo->iMaxChosenEntry) {
if (m_poEntryMng->CheckIfCatchUpLimited(NULL)) {
CertainLogError("iEntityID %lu stop loading by catchup limit", ptEntityInfo->iEntityID);
return 0;
}
}
// 标记 RangeLoading
AssertEqual(ptEntityInfo->bRangeLoading, false);
ptEntityInfo->bRangeLoading = true;
// 构造一条 Recover Cmd
clsRecoverCmd *poCmd =
new clsRecoverCmd(ptEntityInfo->iEntityID, ptEntityInfo->iMaxContChosenEntry);
// 标记 Entity 是否已经加载过 / 当前已知的 MaxPLogEntry / 加载记录的最大长度
poCmd->SetRangeLoaded(ptEntityInfo->bRangeLoaded);
poCmd->SetMaxPLogEntry((uint64_t)ptEntityInfo->iMaxPLogEntry);
poCmd->SetMaxNum(m_poConf->GetMaxCatchUpNum());
CertainLogInfo("cmd: %s", poCmd->GetTextCmd().c_str());
// 塞入 PLog Req 队列
int iRet = clsPLogWorker::EnterPLogReqQueue(poCmd);
if (iRet != 0) {
ptEntityInfo->bRangeLoading = false;
delete poCmd, poCmd = NULL;
CertainLogError("iEntityID %lu EnterPLogReqQueue ret %d", ptEntityInfo->iEntityID, iRet);
return -2;
}
return 0;
}
PLogWorker
拿到 RecoverCmd
后,调用 FillRecoverCmd
:
// 填充 RecoverCmd,这个函数名 emmm
// 首先从 DB Engine 中读取 MaxCommittedEntry
// 如果 Entity 还没有 RangeLoaded,就从 PLogDB 中获取 Entity 的 MaxPLogEntry,直接返回
// 否则从 PLogDB 中加载从 MaxCommitedEntry 到 MaxLoadingEntry 的 Entity Record 信息
int clsPLogWorker::FillRecoverCmd(clsRecoverCmd *poRecoverCmd) {
CertainLogDebug("cmd: %s", poRecoverCmd->GetTextCmd().c_str());
uint64_t iEntityID = poRecoverCmd->GetEntityID();
// 默认是 0
uint64_t iMaxCommitedEntry = 0;
TIMERUS_START(iGetEntityMetaUseTimeUS);
uint32_t iFlag = 0;
// 在 DataDB 中获取 MaxCommitedEntry
int iRet = m_poDBEngine->GetEntityMeta(iEntityID, iMaxCommitedEntry, iFlag);
TIMERUS_STOP(iGetEntityMetaUseTimeUS);
s_poGetEntityMetaTimeStat->Update(iGetEntityMetaUseTimeUS);
if (iRet != 0 && iRet != eRetCodeNotFound) {
CertainLogFatal("GetEntityMeta iFlag %u cmd: %s ret %d", iFlag,
poRecoverCmd->GetTextCmd().c_str(), iRet);
return -1;
}
// 这里的 kDBFlagCheckGetAll 代码里没有任何地方有设定,所以忽略
if (iFlag == kDBFlagCheckGetAll) {...}
typedef vector<pair<uint64_t, EntryRecord_t> > EntryRecordList_t;
EntryRecordList_t tEntryRecordList;
// 如果启用 MaxPLogEntry(默认启用),并且该 Entity 还没有做过 RangeLoad
// 那么仅从 PLogDB 中读取 MaxPLogEntry,而后标记 HasMore 并直接返回
if (m_poConf->GetEnableMaxPLogEntry() > 0 && !poRecoverCmd->IsRangeLoaded()) {
AssertEqual(poRecoverCmd->GetMaxPLogEntry(), INVALID_ENTRY);
PLogEntityMeta_t tMeta = {0};
TIMERUS_START(iGetPLogMetaUseTimeUS);
// 从 PLogDB 中获取 Entity 的 MaxPLogEntry
iRet = m_poPLogEngine->GetPLogEntityMeta(iEntityID, tMeta);
TIMERUS_STOP(iGetPLogMetaUseTimeUS);
OSS::ReportPLogGetMetaKeyTimeMS(iRet, iGetPLogMetaUseTimeUS / 1000);
if (iRet != 0 && iRet != Certain::eRetCodeNotFound) {
CertainLogFatal("iEntityID %lu GetPLogEntityMeta ret %d", iEntityID, iRet);
return -5;
} else {
// 要么是从 DB 中读到的,要么是 tMeta 的默认值 0
uint64_t iMaxPLogEntry = tMeta.iMaxPLogEntry;
// GetAll 逻辑影响,暂时忽略
if (iMaxPLogEntry < iMaxCommitedEntry) {
iMaxPLogEntry = iMaxCommitedEntry;
}
// 读取到的 MaxPLogEntry / MaxCommitedEntry
poRecoverCmd->SetMaxPLogEntry(iMaxPLogEntry);
poRecoverCmd->SetMaxCommitedEntry(iMaxCommitedEntry);
poRecoverCmd->SetMaxLoadingEntry(iMaxCommitedEntry);
poRecoverCmd->SetEntryRecordList(tEntryRecordList);
// 标记 HasMore,说明后面还有数据,这里只是把几个 Entry 进度读出来
poRecoverCmd->SetHasMore(true);
// 直接返回
return eRetCodeOK;
}
}
bool bHasMore = false;
vector<pair<uint64_t, string> > vecRecord;
uint64_t iMaxLoadingEntry = iMaxCommitedEntry + poRecoverCmd->GetMaxNum();
TIMERUS_START(iRangeLoadUseTimeUS);
// 从 PLogDB 中加载从 MaxCommitedEntry 到 MaxLoadingEntry 的 Entity Record 信息
iRet = m_poPLogEngine->LoadUncommitedEntrys(iEntityID, iMaxCommitedEntry, iMaxLoadingEntry,
vecRecord, bHasMore);
TIMERUS_STOP(iRangeLoadUseTimeUS);
s_poLoadUncommitedEntrysTimeStat->Update(iRangeLoadUseTimeUS);
OSS::ReportPLogRangeLoadTimeMS(iRet, iRangeLoadUseTimeUS / 1000);
if (iRangeLoadUseTimeUS > 100000) {
CertainLogError("E(%lu, %lu) entrys: %lu %lu iRangeLoadUseTimeUS %lu", iEntityID,
iMaxCommitedEntry, iPLogCommitedEntry, iMaxLoadingEntry, iRangeLoadUseTimeUS);
}
if (iRet != 0) {
CertainLogFatal("LoadUncommitedEntrys E(%lu, %lu) ret %d", iEntityID, iMaxCommitedEntry, iRet);
return -2;
}
// 遍历
for (uint32_t i = 0; i < vecRecord.size(); ++i) {
uint64_t iEntry = vecRecord[i].first;
// 反序列化
EntryRecord_t tRecord;
iRet = StringToEntryRecord(vecRecord[i].second, tRecord);
if (iRet != 0) {
CertainLogFatal("E(%lu, %lu) StringToEntryRecord ret %d", iEntityID, iEntry, iRet);
return -3;
}
// 读取对应的 Value
if (tRecord.tValue.iValueID > 0 && !tRecord.tValue.bHasValue) {
TIMERUS_START(iGetValueUseTimeUS);
iRet = m_poPLogEngine->GetValue(iEntityID, iEntry, tRecord.tValue.iValueID,
tRecord.tValue.strValue);
TIMERUS_STOP(iGetValueUseTimeUS);
s_poGetTimeStat->Update(iGetValueUseTimeUS);
OSS::ReportPLogGetValueTimeMS(iRet, iGetValueUseTimeUS / 1000);
if (iGetValueUseTimeUS > 100000) {
CertainLogError("E(%lu, %lu) iGetValueUseTimeUS %lu", iEntityID, iEntry,
iGetValueUseTimeUS);
}
if (iRet != 0) {
CertainLogFatal("GetValue ret %d E(%lu, %lu) record: %s", iRet, iEntityID, iEntry,
EntryRecordToString(tRecord).c_str());
return -4;
}
tRecord.tValue.bHasValue = true;
}
tEntryRecordList.push_back(pair<uint64_t, EntryRecord_t>(iEntry, tRecord));
}
// 将读取到的信息写入 RecoverCmd
poRecoverCmd->SetMaxCommitedEntry(iMaxCommitedEntry);
// 实际读到的 Record 可能没这么多,这里语义不符
poRecoverCmd->SetMaxLoadingEntry(iMaxLoadingEntry);
// 实际读到的 Record
poRecoverCmd->SetEntryRecordList(tEntryRecordList);
poRecoverCmd->SetHasMore(bHasMore);
return eRetCodeOK;
}
而后 PLogWorker
会把这条 RecoverCmd
塞回回复队列里,EntityWorker
回调用 RangeRecoverFromPLog
处理:
// 从 PLog 回复中确认是否已经完成 RangeLoaded
// 若出错,处理错误后直接返回
// 从读取到的 EntryRecordList 恢复 EntityInfo 信息,更新 MaxChosenEntry
// 处理 WaitingMsg,检查 CatchUp,回复 RecoverCmd
int clsEntityWorker::RangeRecoverFromPLog(clsRecoverCmd *poCmd) {
uint64_t iEntityID = poCmd->GetEntityID();
uint64_t iEntry = 0;
EntityInfo_t *ptEntityInfo = m_poEntityMng->FindEntityInfo(iEntityID);
AssertNotEqual(ptEntityInfo, NULL);
// 完成 RangeLoad,关闭标记
AssertEqual(ptEntityInfo->bRangeLoading, true);
ptEntityInfo->bRangeLoading = false;
if (poCmd->GetResult() != eRetCodeOK) {
// Recover Cmd 失败,处理错误后返回
// 回复 ClientCmd 错误
if (ptEntityInfo->poClientCmd != NULL && ptEntityInfo->poClientCmd->GetCmdID() == kRecoverCmd) {
InvalidClientCmd(ptEntityInfo, poCmd->GetResult());
}
// 将失败的 IsCheckGetAll Cmd 重新塞入 GetAllWorker 中,暂时忽略
if (!ptEntityInfo->bGetAllPending && poCmd->IsCheckGetAll() &&
GetCurrTimeMS() >= ptEntityInfo->iGetAllFinishTimeMS + 1000) {
clsPaxosCmd *po = new clsPaxosCmd(GetPeerAcceptorID(ptEntityInfo), iEntityID, 0, NULL, NULL);
int iRet = clsGetAllWorker::EnterReqQueue(po);
if (iRet != 0) {
delete po, po = NULL;
}
CertainLogError("iEntityID %lu triggle getall by peer ret %d", iEntityID, iRet);
ptEntityInfo->bGetAllPending = true;
}
return 0;
}
// <Entry, Record>
typedef vector<pair<uint64_t, EntryRecord_t> > EntryRecordList_t;
const EntryRecordList_t &tEntryRecordList = poCmd->GetEntryRecordList();
AssertNotMore(tEntryRecordList.size(), m_poConf->GetMaxCatchUpNum());
// 根据 Cmd 返回的信息,更新已知的全局 Chosen 进度,这里的更新有些是冗余的
if (ptEntityInfo->iMaxChosenEntry < poCmd->GetMaxCommitedEntry()) {
ptEntityInfo->iMaxChosenEntry = poCmd->GetMaxCommitedEntry();
}
if (poCmd->IsHasMore()) {
if (ptEntityInfo->iMaxChosenEntry < poCmd->GetMaxLoadingEntry()) {
ptEntityInfo->iMaxChosenEntry = poCmd->GetMaxLoadingEntry();
}
}
if (tEntryRecordList.size() > 0) {
uint64_t iEntry = tEntryRecordList.rbegin()->first;
if (tEntryRecordList.rbegin()->second.bChosen) {
if (ptEntityInfo->iMaxChosenEntry < iEntry) {
ptEntityInfo->iMaxChosenEntry = iEntry;
}
} else {
if (ptEntityInfo->iMaxChosenEntry < iEntry - 1) {
ptEntityInfo->iMaxChosenEntry = iEntry - 1;
}
}
}
AssertNotMore(ptEntityInfo->iMaxContChosenEntry, ptEntityInfo->iCatchUpEntry);
if (ptEntityInfo->iMaxContChosenEntry < poCmd->GetMaxCommitedEntry()) {
// 本地的 Committed,一定是已知的
ptEntityInfo->iMaxContChosenEntry = poCmd->GetMaxCommitedEntry();
}
if (ptEntityInfo->iCatchUpEntry < ptEntityInfo->iMaxContChosenEntry) {
ptEntityInfo->iCatchUpEntry = ptEntityInfo->iMaxContChosenEntry;
}
if (ptEntityInfo->iNotifyedEntry < poCmd->GetMaxCommitedEntry()) {
// 已经 Committed,一定也是不需要 Notify 的
ptEntityInfo->iNotifyedEntry = poCmd->GetMaxCommitedEntry();
}
// 遍历读到的 EntryRecordList,恢复 EntryInfo
for (uint32_t i = 0; i < tEntryRecordList.size(); ++i) {
iEntry = tEntryRecordList[i].first;
AssertLess(poCmd->GetMaxCommitedEntry(), iEntry);
AssertNotMore(iEntry, poCmd->GetMaxLoadingEntry());
EntryInfo_t *ptInfo = m_poEntryMng->FindEntryInfo(iEntityID, iEntry);
if (ptInfo == NULL) {
const EntryRecord_t &tRecord = tEntryRecordList[i].second;
// 除了当前 Chosen 且 MaxContChosenEntry + 1 == iEntry 这种情况
// 其他的均需要检查数量限制,超过则直接跳过
if (!(tRecord.bChosen && ptEntityInfo->iMaxContChosenEntry + 1 == iEntry)) {
if (CheckIfEntryNumLimited(iEntityID)) {
CertainLogError("E(%lu, %lu) catchup limited by entry", iEntityID, iEntry);
continue;
}
}
ptInfo = m_poEntryMng->CreateEntryInfo(ptEntityInfo, iEntry);
// 创建并恢复 EntryInfo
AssertNotMore(0, RecoverEntry(ptInfo, tRecord));
// 上面已经更新过了,这里应该是多余的
UpdateMaxChosenEntry(ptEntityInfo, ptInfo);
} else {
CertainLogError("E(%lu, %lu) st %d maybe loaded by plog", iEntityID, iEntry,
ptInfo->poMachine->GetEntryState());
}
// Speed up applying DB.
if (!ptInfo->bUncertain && ptInfo->poMachine->GetEntryState() == kEntryStateChosen) {
// 仅 MaxContChosenEntry + 1 == iEntry 时,推入 DB Worker
if (ptEntityInfo->iMaxContChosenEntry + 1 == iEntry) {
PushCmdToDBWorker(ptInfo);
ptEntityInfo->iMaxContChosenEntry++;
if (ptEntityInfo->iCatchUpEntry < ptEntityInfo->iMaxContChosenEntry) {
AssertEqual(ptEntityInfo->iCatchUpEntry + 1, ptEntityInfo->iMaxContChosenEntry);
ptEntityInfo->iCatchUpEntry++;
}
AssertNotMore(ptEntityInfo->iCatchUpEntry, ptEntityInfo->iMaxChosenEntry);
}
}
}
if (!poCmd->IsHasMore()) {
// 没有更多可以读取的记录了
if (tEntryRecordList.size() > 0) {
// 可以知道 MaxPLogEntry 可以到最后一个 Entry
ptEntityInfo->iMaxPLogEntry = tEntryRecordList.rbegin()->first;
} else {
// 列表为空,MaxPLogEntry 可以到 Cmd->MaxCommittedEntry
ptEntityInfo->iMaxPLogEntry = poCmd->GetMaxCommitedEntry();
}
} else if (!ptEntityInfo->bRangeLoaded && poCmd->GetMaxPLogEntry() != INVALID_ENTRY) {
// 还没有完成 RangeLoaded,且 Cmd->MaxPLogEntry 有数值
CertainLogImpt("iEntityID %lu iMaxPLogEntry %lu -> %lu", iEntityID, ptEntityInfo->iMaxPLogEntry,
poCmd->GetMaxPLogEntry());
Assert(!poCmd->IsRangeLoaded());
if (ptEntityInfo->iMaxPLogEntry == INVALID_ENTRY ||
ptEntityInfo->iMaxPLogEntry < poCmd->GetMaxPLogEntry()) {
// 使用 Cmd->MaxPLogEntry 作为 MaxPLogEntry
ptEntityInfo->iMaxPLogEntry = poCmd->GetMaxPLogEntry();
} else {
CertainLogFatal("iEntityID %lu iMaxPLogEntry %lu -> %lu", iEntityID,
ptEntityInfo->iMaxPLogEntry, poCmd->GetMaxPLogEntry());
}
}
// 标记完成 Range Loaded
ptEntityInfo->bRangeLoaded = true;
CertainLogInfo("iEntityID %lu entrys: %lu %lu %lu iEmptyEntryCnt %u", iEntityID,
poCmd->GetMaxCommitedEntry(), poCmd->GetMaxLoadingEntry(),
ptEntityInfo->iMaxPLogEntry, iEmptyEntryCnt);
// 处理 WaitingMsg,以后再分析等待的 Msg 是啥
uint32_t iWaitingMsgPtrSize = sizeof(clsPaxosCmd *) * m_iAcceptorNum;
clsPaxosCmd **apWaitingMsg = (clsPaxosCmd **)malloc(iWaitingMsgPtrSize);
std::unique_ptr<char> oAutoFreeWaitingMsgPtr((char *)apWaitingMsg);
memcpy(apWaitingMsg, ptEntityInfo->apWaitingMsg, iWaitingMsgPtrSize);
memset(ptEntityInfo->apWaitingMsg, 0, iWaitingMsgPtrSize);
int iRet = DoWithWaitingMsg(apWaitingMsg, m_iAcceptorNum);
m_poMemCacheCtrl->UpdateTotalSize(ptEntityInfo);
if (iRet != 0) {
CertainLogError("DoWithWaitingMsg ret %d", iRet);
}
// 检查是否进行 CatchUp
CheckForCatchUp(ptEntityInfo, INVALID_ACCEPTOR_ID, 0);
if (ptEntityInfo->poClientCmd != NULL && ptEntityInfo->poClientCmd->GetCmdID() == kRecoverCmd) {
// ClientCmd 是 RecoverCmd,设定 MaxChosenEntry
clsRecoverCmd *po = dynamic_cast<clsRecoverCmd *>(ptEntityInfo->poClientCmd);
po->SetMaxChosenEntry(uint64_t(ptEntityInfo->iMaxChosenEntry));
if (ptEntityInfo->iMaxContChosenEntry < ptEntityInfo->iMaxChosenEntry) {
// MaxContChosenEntry < MaxChosenEntry, 返回 CatchUp 状态码
InvalidClientCmd(ptEntityInfo, eRetCodeCatchUp);
} else {
// 否则返回 OK
InvalidClientCmd(ptEntityInfo, eRetCodeOK);
}
}
return 0;
}
总的来说,RangeLoadFromPLog
会异步的启动恢复流程,经过 FillRecoverCmd
读取 Record 记录,再回到 RangeRecoverFromPLog
恢复成 EntryInfo
、更新进度信息。
EntityWorker
的 Loop 循环中也需要处理各类超时情况,包括 Entry 的超时和 Cmd 的超时:
// 处理超时的 Entry
int clsEntityWorker::DoWithTimeout(EntryInfo_t *ptInfo) {
EntityInfo_t *ptEntityInfo = ptInfo->ptEntityInfo;
if (ptEntityInfo == NULL) {
CertainLogFatal("iEntry %lu", ptInfo->iEntry);
}
AssertNotEqual(ptEntityInfo, NULL);
uint64_t iEntityID = ptEntityInfo->iEntityID;
uint64_t iEntry = ptInfo->iEntry;
AssertEqual(Hash(iEntityID) % m_poConf->GetEntityWorkerNum(), m_iWorkerID);
if (ptInfo->iEntry == 0) {
// Entry(0),用来匹配 RecoverCmd
if (ptEntityInfo->poClientCmd != NULL) {
AssertEqual(ptEntityInfo->poClientCmd->GetCmdID(), kRecoverCmd);
}
// 直接超时判定 Cmd 超时
InvalidClientCmd(ptEntityInfo, eRetCodeTimeout);
return eRetCodePtrReuse;
}
// 标记 Cmd 超时
if (ptEntityInfo->poClientCmd != NULL && ptEntityInfo->poClientCmd->GetEntry() == iEntry) {
InvalidClientCmd(ptEntityInfo, eRetCodeTimeout);
m_poEntryMng->AddTimeout(ptInfo, 10000);
return eRetCodePtrReuse;
}
// 落盘超时
if (ptInfo->bUncertain) {
CertainLogError("Check if disk busy E(%lu, %lu) st %d", iEntityID, iEntry,
ptInfo->poMachine->GetEntryState());
return eRetCodePtrReuse;
}
// GetAll 超时
if (ptEntityInfo->bGetAllPending) {
InvalidClientCmd(ptEntityInfo, eRetCodeTimeout);
m_poEntryMng->DestroyEntryInfo(ptInfo);
CertainLogError("E(%lu, %lu) GetAllPending, timeout and destroy entry", iEntityID, iEntry);
return eRetCodePtrReuse;
}
// 其他情况,重新激活一下该 EntryInfo
ActivateEntry(ptInfo);
return eRetCodePtrReuse;
}
ActivateEntry
激活操作大致分为三种情况:本地确定 Chosen 的,可以清理返回;远程 Chosen 的,找 ActiveAcceptor 同步;还没有 Chosen 的,广播一下。
bool clsEntityWorker::ActivateEntry(EntryInfo_t *ptInfo) {
EntityInfo_t *ptEntityInfo = ptInfo->ptEntityInfo;
uint64_t iEntityID = ptEntityInfo->iEntityID;
uint64_t iEntry = ptInfo->iEntry;
uint32_t iLocalAcceptorID = ptEntityInfo->iLocalAcceptorID;
clsEntryStateMachine *poMachine = ptInfo->poMachine;
AssertEqual(ptInfo->bUncertain, false);
AssertEqual(m_poEntryMng->WaitForTimeout(ptInfo), false);
// The entry is newly open and has no activity.
// 刚新建的,没有有效信息,清掉返回
if (ptEntityInfo->iMaxChosenEntry < iEntry && poMachine->IsLocalEmpty()) {
CertainLogInfo("iEntityID %lu entrys: %lu %lu %lu ref %d", iEntityID,
ptEntityInfo->iMaxContChosenEntry, ptEntityInfo->iCatchUpEntry,
ptEntityInfo->iMaxChosenEntry, ptEntityInfo->iRefCount);
CleanUpEntry(ptInfo);
return false;
}
// The entry has been sent to the DB worker.
// 已经送到 DB Worker,清掉返回
if (ptEntityInfo->iMaxContChosenEntry >= iEntry) {
CleanUpEntry(ptInfo);
return false;
}
// Not need to broadcast, wait for being elimated or commited.
// 已经 Chosen 了,不需要广播,增加被杀的概率,返回
if (poMachine->GetEntryState() == kEntryStateChosen) {
CertainLogError("E(%lu, %lu) st %d", iEntityID, iEntry, poMachine->GetEntryState());
m_poEntryMng->IncreaseEliminatePriority(ptInfo);
return false;
}
// For catch up.
// 还没有 Chosen,但全局已经 Chosen 了,和其他机器同步一下
if (iEntry <= ptEntityInfo->iMaxChosenEntry) {
if (m_poEntryMng->CheckIfCatchUpLimited(ptInfo)) {
CertainLogError("E(%lu, %lu) catchup limited", iEntityID, iEntry);
return false;
}
// Machine A fix the entry only.
// 该 Entry 已经处理两次以上
if (ptInfo->iInteractCnt > 2) {
CertainLogError("May need fix E(%lu, %lu) st %d", iEntityID, iEntry,
ptInfo->poMachine->GetEntryState());
OSS::ReportMayNeedFix();
if (m_poConf->GetEnableAutoFixEntry()) {
CertainLogZero("ProposeNoop for Fix E(%lu, %lu) st %d", iEntityID, iEntry,
ptInfo->poMachine->GetEntryState());
// 发起一个空提议
ProposeNoop(ptEntityInfo, ptInfo);
return true;
}
}
// 该 Entry 已经处理三次以上,清掉返回
if (ptInfo->iInteractCnt > 3) {
CleanUpEntry(ptInfo);
return false;
}
// 增加处理计数
ptInfo->iInteractCnt++;
uint32_t iActiveAcceptorID = GetPeerAcceptorID(ptEntityInfo);
AssertEqual(ptInfo->bRemoteUpdated, false);
ptInfo->bRemoteUpdated = true;
// 和 ActiveAcceptor 同步一下这个 Entry 的状态
SyncEntryRecord(ptInfo, iActiveAcceptorID, 0);
ptInfo->bRemoteUpdated = false;
OSS::ReportSingleCatchUp();
// 加入到超时列表,超时时间 15 秒
m_poEntryMng->AddTimeout(ptInfo, 15000);
CertainLogError("sync acceptor %u E(%lu, %lu) st %d", iActiveAcceptorID, iEntityID, iEntry,
ptInfo->poMachine->GetEntryState());
return true;
}
if (ptInfo->iInteractCnt > 3) {
CleanUpEntry(ptInfo);
return false;
}
// 还没有 Chosen,并且也没有确定全局已经 Chosen
const EntryRecord_t &tRecord = poMachine->GetRecord(iLocalAcceptorID);
CertainLogError("Broadcast E(%lu, %lu) st %u iMaxChosenEntry %lu r[%u] %s", iEntityID, iEntry,
poMachine->GetEntryState(), ptEntityInfo->iMaxChosenEntry, iLocalAcceptorID,
EntryRecordToString(tRecord).c_str());
ptInfo->iInteractCnt++;
// 广播确认一下
BroadcastToRemote(ptInfo, ptInfo->poMachine);
OSS::ReportTimeoutBroadcast();
// Wait longer, as broadcast is heavy, and many candidates to reply.
// 加入超时列表,超时时间 30 秒
m_poEntryMng->AddTimeout(ptInfo, 30000);
return true;
}
PaxosStore 的代码大多是异步的,一个长的处理流程会拆成多个小块,梳理起来确实会有些复杂。本篇把这些细碎的逻辑拎出来阅读,下一步将会把这些小流程串起来,看看完整的 Paxos / CatchUp 流程。