PaxosStore 源码分析「五、实现细节」

2020.03.26
SF-Zhou

本系列的上一篇里分析了 PaxosStore 中的协议日志的实现,本篇将深入协议过程中的一些细节,这些实现细节集中在 src/EntityWorker.cpp 这个庞然大物里。

1. EntityCatchUp

在对某个 Entity 执行读写前,需要保证该 Entity 在本地的所有 Entry 都已经提交到 DB 了,这样读写才会有意义。简而言之就是 CommittedEntry = MaxChosenEntry

借此机会,过一遍样例的流程。从 example/CardTool.cpp 这个 Client 开始看:

int main(int argc, char **argv) {
  ...
  Run(strAddr, strOper, oRequest, oResponse, &vecIPList);
  ...
}

grpc::Status Run(const std::string &strAddr, const std::string &strOper,
                 example::CardRequest &oRequest, example::CardResponse &oResponse,
                 std::vector<std::string> *poIPList) {
  ...
  switch (strOper[0]) {
    case 'I':
      oRequest.set_entity_id(GetEntityID(oRequest.card_id()));
      oRequest.mutable_card_info()->set_last_modified_time(time(0));
      oStatus = oClient.Call(oRequest.entity_id(), example::eInsertCard, &oRequest, &oResponse);
      break;
  }
  ...
}

该 Client 会调用 gRPC 向 Server 发送请求,直接看 Server 端的处理函数 example/ServiceImpl.cpp

int clsServiceImpl::InsertCard(grpc::ServerContext &oContext, const example::CardRequest &oRequest,
                               example::CardResponse &oResponse) {
  return BatchFunc(example::OperCode::eInsertCard, oRequest, oResponse);
}

int clsServiceImpl::BatchFunc(int iOper, const example::CardRequest &oRequest,
                              example::CardResponse &oResponse) {
  uint64_t iStartUS = Certain::GetCurrTimeUS();

  // 1. 将请求构造为 QueueItem,压入队列,压入 BatchMap
  uint64_t iPushStartUS = Certain::GetCurrTimeUS();
  QueueItem_t *poItem = new QueueItem_t;
  Certain::clsAutoDelete<QueueItem_t> oAutoDelete(poItem);
  poItem->iOper = iOper;
  poItem->iEntityID = oRequest.entity_id();
  poItem->poRequest = (void *)&oRequest;
  poItem->poResponse = (void *)&oResponse;
  poItem->iRet = BatchStatus::WAITING;

  {
    Certain::clsThreadLock oLock(&m_poBatchMapMutex);
    m_oBatchMap[poItem->iEntityID].push(poItem);
  }
  uint64_t iPushEndUS = Certain::GetCurrTimeUS();

  // 2. 上锁,同一个 EntityID 的请求都会在队列里
  uint64_t iLockStartUS = Certain::GetCurrTimeUS();
  Certain::clsAutoEntityLock oAuto(poItem->iEntityID);

  if (poItem->iRet != BatchStatus::WAITING) {
    return poItem->iRet;
  }

  clsDBImpl *poDBEngine =
      dynamic_cast<clsDBImpl *>(Certain::clsCertainWrapper::GetInstance()->GetDBEngine());
  dbtype::DB *poDB = poDBEngine->GetDB();
  clsTemporaryTable oTable(poDB);
  uint64_t iLockEndUS = Certain::GetCurrTimeUS();

  // 3. 将具有相同 EntityID 的请求弹出
  uint64_t iPopStartUS = Certain::GetCurrTimeUS();
  std::queue<QueueItem_t *> oQueue;
  {
    Certain::clsThreadLock oLock(&m_poBatchMapMutex);
    auto iter = m_oBatchMap.find(poItem->iEntityID);
    assert(iter != m_oBatchMap.end());
    while (!iter->second.empty()) {
      oQueue.push(iter->second.front());
      iter->second.pop();
    }
  }
  uint64_t iPopEndUS = Certain::GetCurrTimeUS();

  // 4. 该 EntityID 执行 EntityCatchUp,如果失败则统一返回
  uint64_t iCatchUpStartUS = Certain::GetCurrTimeUS();
  Certain::clsCertainWrapper *poCertain = Certain::clsCertainWrapper::GetInstance();
  uint64_t iEntry = 0, iMaxCommitedEntry = 0;
  int iRet = poCertain->EntityCatchUp(poItem->iEntityID, iMaxCommitedEntry);
  if (iRet != 0) {
    BatchReturn(&oQueue, iRet);
    return iRet;
  }
  uint64_t iCatchUpEndUS = Certain::GetCurrTimeUS();

  // 5. 遍历所有请求,执行 HandleSingleCommand
  uint64_t iBatchHandleStartUS = Certain::GetCurrTimeUS();
  std::vector<uint64_t> vecUUID;
  uint64_t iQueueSize = oQueue.size();
  uint64_t iRead = 0, iWrite = 0;
  while (iQueueSize > 0) {
    QueueItem_t *poItem = oQueue.front();
    oQueue.pop();
    --iQueueSize;

    assert(poItem->iRet == BatchStatus::WAITING);
    HandleSingleCommand(&oTable, poItem, &vecUUID);
    if (poItem->iOper == example::OperCode::eSelectCard)
      iRead++;
    else
      iWrite++;
    // Re-push items since they need to RunPaxos.
    oQueue.push(poItem);
  }
  uint64_t iBatchHandleEndUS = Certain::GetCurrTimeUS();

  // 6. 跑 Paxos 协议,批量返回
  uint64_t iRunPaxosStartUS = Certain::GetCurrTimeUS();
  iEntry = iMaxCommitedEntry + 1;
  iRet = poCertain->RunPaxos(poItem->iEntityID, iEntry, example::OperCode::eBatchFunc, vecUUID,
                             oTable.GetWriteBatchString());
  BatchReturn(&oQueue, iRet);
  uint64_t iRunPaxosEndUS = Certain::GetCurrTimeUS();

  uint64_t iEndUS = Certain::GetCurrTimeUS();

  // 7. 提供成功则执行提交
  if (iRet == 0) {
    poDBEngine->Commit(poItem->iEntityID, iEntry, oTable.GetWriteBatchString());
  }

  CertainLogInfo(
      "iEntityID %lu iPushTime %lu iLockTime %lu iPopTime %lu "
      "iCatchUpTime %lu iBatchHandleTime %lu iRunPaxos %lu iTotalUS %lu "
      "iRead %lu iWrite %lu",
      poItem->iEntityID, iPushEndUS - iPushStartUS, iLockEndUS - iLockStartUS,
      iPopEndUS - iPopStartUS, iCatchUpEndUS - iCatchUpStartUS,
      iBatchHandleEndUS - iBatchHandleStartUS, iRunPaxosEndUS - iRunPaxosStartUS, iEndUS - iStartUS,
      iRead, iWrite);

  iRet = poItem->iRet;
  return iRet;
}

注意步骤 4 中的 EntityCatchUp,在跑 Paxos 协议前会先确定本地所有的 Entry 已经提交到 DB。注意这里并不保证也不需要保证 Entry 是全局最新的,因为如果不是,跑协议的过程中会失败、然后触发 CatchUp,之后的博文会分析该过程。

来看 EntityCatchUp 的实现,位于 src/CertainWrapper.cpp

int clsCertainWrapper::EntityCatchUp(uint64_t iEntityID, uint64_t &iMaxCommitedEntry) {
  const uint32_t iMaxCommitNum = GetConf()->GetMaxCommitNum();

  uint32_t iCommitCnt = 0;

  uint64_t iEntry = 0;
  std::string strWriteBatch;

  uint32_t iFlag = 0;
  iMaxCommitedEntry = 0;
  // 从 Data DB 中读取 MaxCommitedEntry
  int iRet = m_poDBEngine->GetEntityMeta(iEntityID, iMaxCommitedEntry, iFlag);
  if (iRet != 0 && iRet != eRetCodeNotFound) {
    CertainLogError("iEntityID %lu GetEntityMeta ret %d", iEntityID, iRet);
    return iRet;
  }

  // 至多尝试 iMaxCommitNum 次 Commit
  while (iCommitCnt < iMaxCommitNum) {
    // 检查 DB 状态
    iRet = CheckDBStatus(iEntityID, iMaxCommitedEntry);
    if (iRet == eRetCodeOK) {
      break;
    } else if (iRet != eRetCodeDBLagBehind) {
      CertainLogError("CheckDBStatus iEntityID %lu ret %d", iEntityID, iRet);
      return iRet;
    }

    iEntry = iMaxCommitedEntry + 1;

    OSS::ReportGetAndCommit();

    // 从 PLog 中获取 iMaxCommitedEntry 下一个的 Record
    iRet = GetWriteBatch(iEntityID, iEntry, strWriteBatch);
    if (iRet != 0) {
      CertainLogError("E(%lu, %lu) GetWriteBatch ret %d", iEntityID, iEntry, iRet);
      return iRet;
    }

    TIMERMS_START(iCommitUseTimeMS);
    // 将该 Record 提交到 Data DB
    iRet = m_poDBEngine->Commit(iEntityID, iEntry, strWriteBatch);
    TIMERMS_STOP(iCommitUseTimeMS);
    OSS::ReportDBCommit(iRet, iCommitUseTimeMS);
    if (iRet != 0) {
      CertainLogError("E(%lu, %lu) Commit ret %d", iEntityID, iEntry, iRet);
      return iRet;
    }

    iCommitCnt++;
    iMaxCommitedEntry++;
  }

  if (iCommitCnt == iMaxCommitNum && CheckDBStatus(iEntityID, iMaxCommitedEntry) != eRetCodeOK) {
    CertainLogError("iEntityID %lu iCommitCnt == iMaxCommitNum %u", iEntityID, iMaxCommitNum);
    return Certain::eRetCodeDBCommitLimited;
  }

  return 0;
}

int clsCertainWrapper::CheckDBStatus(uint64_t iEntityID, uint64_t iCommitedEntry) {
  uint64_t iMaxContChosenEntry = 0;
  uint64_t iMaxChosenEntry = 0;
  uint64_t iLeaseTimeoutMS = 0;

  bool bTriggleRecovered = false;

  // 从内存中读取 Entity 对应的 iMaxContChosenEntry 和 iMaxChosenEntry 信息
  int iRet = m_poEntityGroupMng->GetMaxChosenEntry(iEntityID, iMaxContChosenEntry, iMaxChosenEntry,
                                                   iLeaseTimeoutMS);
  if (iRet == eRetCodeNotFound) {
    // 内存中没有查询到该 EntityID 的信息,则通过 PLog 恢复
    bTriggleRecovered = true;

    // 构造一条 Recover Cmd
    clsRecoverCmd *poCmd = new clsRecoverCmd(iEntityID, iCommitedEntry);
    clsAutoDelete<clsRecoverCmd> oAuto(poCmd);

    poCmd->SetTimestampUS(GetCurrTimeUS());

    // 发送一条 Recover Cmd 并等待结果
    iRet = SyncWaitCmd(poCmd);
    if (iRet != 0) {
      CertainLogError("iEntityID %lu SyncWaitCmd ret %d", iEntityID, iRet);
      return iRet;
    }

    // 从 Recover Cmd 的结果中读取 iMaxContChosenEntry 和 iMaxChosenEntry
    iMaxContChosenEntry = poCmd->GetMaxContChosenEntry();
    iMaxChosenEntry = poCmd->GetMaxChosenEntry();
  } else if (iRet != 0) {
    CertainLogError("iEntityID %lu GetMaxChosenEntry iRet %d", iEntityID, iRet);
    return iRet;
  } else if (iLeaseTimeoutMS > 0) {
    // 租约相关,以后再分析
    OSS::ReportLeaseWait();
    poll(NULL, 0, iLeaseTimeoutMS);

    CertainLogError("iEntityID %lu wait iLeaseTimeoutMS %lu", iEntityID, iLeaseTimeoutMS);

    iRet = m_poEntityGroupMng->GetMaxChosenEntry(iEntityID, iMaxContChosenEntry, iMaxChosenEntry,
                                                 iLeaseTimeoutMS);
    if (iRet != 0) {
      CertainLogError("iEntityID %lu iLeaseTimeoutMS %lu ret %d", iEntityID, iLeaseTimeoutMS, iRet);
      return iRet;
    }
  }

  // iMaxContChosenEntry may update posterior to iCommitedEntry.
  if (iMaxContChosenEntry < iCommitedEntry) {
    iMaxContChosenEntry = iCommitedEntry;
  }

  if (iCommitedEntry + m_poConf->GetMaxCatchUpNum() <= iMaxContChosenEntry) {
    // All entrys of the entity are eliminated, help trigger db catchup.
    if (iMaxContChosenEntry == iMaxChosenEntry) {
      CertainLogError("notify_db iEntityID %lu entrys: %lu %lu", iEntityID, iCommitedEntry,
                      iMaxContChosenEntry);
      // 通知 DBWorker 异步做恢复
      clsDBWorker::NotifyDBWorker(iEntityID);
    }

    CertainLogError("iEntityID %lu entrys: %lu %lu %lu", iEntityID, iCommitedEntry,
                    iMaxContChosenEntry, iMaxChosenEntry);
    if (!bTriggleRecovered) {
      // 触发 Recover,同样也是发送一条 Recover Cmd
      TriggeRecover(iEntityID, iCommitedEntry);
    }
    return eRetCodeCatchUp;
  }

  // 本地的 Chosen 小于全局的 Chosen,先追赶上全局的进度
  if (iMaxContChosenEntry < iMaxChosenEntry) {
    CertainLogError("iEntityID %lu entrys: %lu %lu %lu", iEntityID, iCommitedEntry,
                    iMaxContChosenEntry, iMaxChosenEntry);
    if (!bTriggleRecovered) {
      // 触发 Recover
      TriggeRecover(iEntityID, iCommitedEntry);
    }
    return eRetCodeCatchUp;
  }

  if (iCommitedEntry >= iMaxChosenEntry) {
    // 只有 Committed 和 Chosen 一致,DB 才是本地最新的
    return eRetCodeOK;
  } else {
    // 否则 Committed 需要追赶
    return eRetCodeDBLagBehind;
  }
}

int clsCertainWrapper::GetWriteBatch(uint64_t iEntityID, uint64_t iEntry, string &strWriteBatch,
                                     uint64_t *piValueID) {
  EntryRecord_t tRecord;
  // 从 PLog 中读取对应的 Record
  int iRet = m_poPLogEngine->GetRecord(iEntityID, iEntry, tRecord);
  if (iRet != 0) {
    if (iRet != eRetCodeNotFound) {
      CertainLogFatal("BUG probably E(%lu, %lu) ret %d", iEntityID, iEntry, iRet);
      return iRet;
    }

    CertainLogInfo("E(%lu, %lu) not found", iEntityID, iEntry);
    return eRetCodeNotFound;
  }

  // 要确定已经 Chosen 了
  if (!tRecord.bChosen) {
    CertainLogInfo("unchosen: %s", EntryRecordToString(tRecord).c_str());
    return eRetCodeNotFound;
  }

  if (piValueID != NULL) {
    *piValueID = tRecord.tValue.iValueID;
  }

  // 拿到已经 Chosen 的 WriteBatch
  strWriteBatch = tRecord.tValue.strValue;

  return 0;
}

这里有几个进度:

  1. 已经提交到 Data DB 的进度:CommittedEntry
  2. 获取到的最大连续 Chosen 进度:MaxContChosenEntry
  3. 已知的全局最大 Chosen 进度:MaxChosenEntry

只有 CommittedEntry < MaxContChosenEntry == MaxChosenEntry 时,DB 才可以直接通过读取 PLog 的方式来追赶到本地最新的进度,其他情况均需要进行先进行 Recover。来继续看看这里的 Recover 怎么实现的。

int clsCertainWrapper::SyncWaitCmd(clsClientCmd *poCmd) {
  uint64_t iEntityID = poCmd->GetEntityID();
  uint64_t iEntry = poCmd->GetEntry();

  uint32_t iPipeIdx;
  // 获取一个空的通知管道
  int iRet = m_poPipeMng->GetIdlePipeIdx(iPipeIdx, iEntityID);
  if (iRet != 0) {
    CertainLogError("E(%lu, %lu) GetIdlePipeIdx ret %d", iEntityID, iEntry, iRet);
    return iRet;
  }

  poCmd->SetPipeIdx(iPipeIdx);

  // 分配 IOWorker
  uint32_t iIOWorkerID = Hash(iEntityID) % m_poConf->GetIOWorkerNum();
  poCmd->SetIOTracker(IOTracker_t(0, 0, iIOWorkerID));

  uint32_t iEntityWorkerID = Hash(iEntityID) % m_poConf->GetEntityWorkerNum();
  clsIOReqQueue *poIOReqQueue = m_poQueueMng->GetIOReqQueue(iEntityWorkerID);

  // 将 Cmd 推入对应的 IO 队列中
  iRet = poIOReqQueue->PushByMultiThread(poCmd);
  if (iRet != 0) {
    m_poPipeMng->PutIdlePipeIdx(iPipeIdx);
    CertainLogError("PushByMultiThread ret %d", iRet);
    return eRetCodeQueueFailed;
  }

  uintptr_t iCheck = (uintptr_t)poCmd;

  CertainLogDebug("iPipeIdx %u iPtr %lu E(%lu, %lu) iUUID %lu", iPipeIdx, iCheck, iEntityID, iEntry,
                  poCmd->GetUUID());

  bool bOneMoreTry = false;
  // 等待管道通知(Cmd 完成后管道另一段有写入,这边读取就完成了等待)
  iRet = m_poPipeMng->SyncWaitByPipeIdx(iPipeIdx, iCheck);
  if (iRet == eRetCodePipePtrErr) {
    bOneMoreTry = true;
    // Try one more time, prev timeout ptr may come first.
    // There's probably some BUG in certain.
    iRet = m_poPipeMng->SyncWaitByPipeIdx(iPipeIdx, iCheck);
  }

  if (iRet != 0) {
    CertainLogFatal("BUG probably ibOneMoreTry %u ret %d cmd: %s", bOneMoreTry, iRet,
                    poCmd->GetTextCmd().c_str());
    m_poPipeMng->PutIdlePipeIdx(iPipeIdx);
    return eRetCodePipeWaitFailed;
  }

  // 回收通知管道
  m_poPipeMng->PutIdlePipeIdx(iPipeIdx);

  return poCmd->GetResult();
}

塞入 IO 队列中的 Recover Cmd 会在 EntityWorker 中被处理掉,这里的调用关系非常复杂:

void clsEntityWorker::Run() {
  while (1) {
    ...
    // 2.Do with IO request.
    poCmd = NULL;
    iRet = m_poIOReqQueue->TakeByOneThread(&poCmd);
    if (iRet == 0) {
      Assert(poCmd != NULL);
      bHasWork = true;

      iRet = DoWithIOReq(poCmd);
      if (iRet < 0) {
        CertainLogError("DoWithIOReq ret %d cmd %s", iRet, poCmd->GetTextCmd().c_str());
      }

      if (iRet != eRetCodePtrReuse) {
        delete poCmd, poCmd = NULL;
      }
    }
    ...
  }
}

int clsEntityWorker::DoWithIOReq(clsCmdBase *poCmd) {
  if (clsCertainWrapper::GetInstance()->GetConf()->GetEnableLearnOnly()) {
    if (poCmd->GetCmdID() != kPaxosCmd) {
      clsClientCmd *poClientCmd = dynamic_cast<clsClientCmd *>(poCmd);
      AssertNotEqual(poCmd, NULL);
      InvalidClientCmd(poClientCmd, eRetCodeRejectAll);
      return eRetCodePtrReuse;
    } else {
      return eRetCodeRejectAll;
    }
  }

  uint64_t iEntityID = poCmd->GetEntityID();
  AssertEqual(Hash(iEntityID) % m_poConf->GetEntityWorkerNum(), m_iWorkerID);
  CertainLogInfo("cmd: %s", poCmd->GetTextCmd().c_str());

  clsPaxosCmd *poPaxosCmd = NULL;
  clsClientCmd *poClientCmd = NULL;
  clsRecoverCmd *poRecoverCmd = NULL;

  switch (poCmd->GetCmdID()) {
    case kWriteBatchCmd:
      poClientCmd = dynamic_cast<clsClientCmd *>(poCmd);
      return DoWithClientCmd(poClientCmd);

    case kRecoverCmd:
      poRecoverCmd = dynamic_cast<clsRecoverCmd *>(poCmd);
      return DoWithRecoverCmd(poRecoverCmd);

    case kPaxosCmd:
      poPaxosCmd = dynamic_cast<clsPaxosCmd *>(poCmd);
      return DoWithPaxosCmd(poPaxosCmd);

    default:
      CertainLogError("cmd: %s", poCmd->GetTextCmd().c_str());
      Assert(false);
  }

  return 0;
}

int clsEntityWorker::DoWithRecoverCmd(clsRecoverCmd *poCmd) {
  uint64_t iEntityID = poCmd->GetEntityID();
  uint64_t iMaxCommitedEntry = poCmd->GetMaxCommitedEntry();
  CertainLogInfo("cmd: %s", poCmd->GetTextCmd().c_str());

  // 这里 Entry = 0 有特殊作用,下文会解释
  EntryInfo_t *ptInfo = m_poEntryMng->FindEntryInfo(iEntityID, 0);
  AssertEqual(ptInfo, NULL);

  EntityInfo_t *ptEntityInfo = m_poEntityMng->FindEntityInfo(iEntityID);
  if (ptEntityInfo != NULL) {
    // 内存中有对应的 EntityInfo
    CertainLogError("Failover E(%lu, %lu) Entrys: %lu %lu %lu ref %d %u", iEntityID,
                    iMaxCommitedEntry, ptEntityInfo->iMaxContChosenEntry,
                    ptEntityInfo->iMaxChosenEntry, ptEntityInfo->iMaxPLogEntry,
                    ptEntityInfo->iRefCount, ptEntityInfo->bRangeLoading);

    if (poCmd->IsEvictEntity()) {...}

    Assert(m_poEntityMng->Refresh(ptEntityInfo));

    if (poCmd->IsCheckGetAll()) {...}

    // 检查是否进行 CatchUp
    CheckForCatchUp(ptEntityInfo, INVALID_ACCEPTOR_ID, 0);
    CheckIfNeedNotifyDB(ptEntityInfo);

    poCmd->SetMaxChosenEntry(uint64_t(ptEntityInfo->iMaxChosenEntry));
    poCmd->SetMaxContChosenEntry(uint64_t(ptEntityInfo->iMaxContChosenEntry));

    AssertEqual(ptEntityInfo->poClientCmd, NULL);
    if (ptEntityInfo->iMaxContChosenEntry < ptEntityInfo->iMaxChosenEntry) {
      // 本地晚于全局进度,返回 CatchUp 状态码
      InvalidClientCmd(poCmd, eRetCodeCatchUp);
      return eRetCodePtrReuse;
    } else if (ptEntityInfo->iMaxPLogEntry == INVALID_ENTRY) {
      // PLog 还没有加载完,如果正在加载,就给 Cmd 加上超时
      CheckIfWaitRecoverCmd(ptEntityInfo, poCmd);
      return eRetCodePtrReuse;
    }

    // 其他情况则完成 OK 状态
    InvalidClientCmd(poCmd, eRetCodeOK);
  } else {
    // 内存中没有对应的 EntityInfo
    if (poCmd->IsEvictEntity()) {...}

    if (!m_poEntityMng->CheckAndEliminate()) {...}

    // 创建一个
    ptEntityInfo = m_poEntityMng->CreateEntityInfo(iEntityID);
    if (ptEntityInfo == NULL) {
      CertainLogFatal("CreateEntityInfo failed cmd: %s", poCmd->GetTextCmd().c_str());
      InvalidClientCmd(poCmd, eRetCodeRouteErr);
      return eRetCodePtrReuse;
    }

    // 检查是否进行 CatchUp
    CheckForCatchUp(ptEntityInfo, INVALID_ACCEPTOR_ID, 0);

    CheckIfWaitRecoverCmd(ptEntityInfo, poCmd);
    return eRetCodePtrReuse;
  }

  return eRetCodePtrReuse;
}

bool clsEntityWorker::InvalidClientCmd(clsClientCmd *poCmd, int iResult) {
  if (poCmd == NULL) {
    return false;
  }

  // 设定 Result
  poCmd->SetResult(iResult);

  if (iResult != eRetCodeOK) {
    CertainLogError("InvalidClientCmd cmd %u uuid %lu result: %d", poCmd->GetCmdID(),
                    poCmd->GetUUID(), iResult);
  }

  // 使用 IO Worker 通知 Cmd 完成
  int iRet = m_poIOWorkerRouter->Go(poCmd);
  if (iRet != 0) {
    CertainLogError("BUG probably Go ret %d poCmd: %s", iRet, poCmd->GetTextCmd().c_str());

    // sum of list size <= MAX_ASYNC_PIPE_NUM
    m_poWaitingGoList.push_back(poCmd);
    return false;
  }

  return true;
}

// 如果不是 bRangeLoading 状态,直接报错 eRetCodeQueueFull
// 否则新建 Entry(0),并加到 Timeout 队列里,超时的处理之后会提到
void clsEntityWorker::CheckIfWaitRecoverCmd(EntityInfo_t *ptEntityInfo, clsRecoverCmd *poCmd) {
  // If the cmd is most likely timeout eventually, do fast failover.
  if (!ptEntityInfo->bRangeLoading) {
    // It should range load from plog, but failed as queue full.
    InvalidClientCmd(poCmd, eRetCodeQueueFull);
  } else {
    //
    ptEntityInfo->poClientCmd = poCmd;
    // 再次出现 Entry(0),用来对应 RecoverCmd
    EntryInfo_t *ptInfo = m_poEntryMng->CreateEntryInfo(ptEntityInfo, 0);
    m_poEntryMng->AddTimeout(ptInfo, m_poConf->GetRecoverTimeoutMS());
  }
}

// NotifyedEntry < MaxContChosenEntry,发通知给 DBWorker 做 CatchUp
void clsEntityWorker::CheckIfNeedNotifyDB(EntityInfo_t *ptEntityInfo) {
  if (ptEntityInfo->iNotifyedEntry < ptEntityInfo->iMaxContChosenEntry) {
    int iRet = clsDBWorker::NotifyDBWorker(ptEntityInfo->iEntityID);
    if (iRet != 0) {
      CertainLogError("NotifyDBWorker iEntityID %lu ret %d", ptEntityInfo->iEntityID, iRet);
    } else {
      ptEntityInfo->iNotifyedEntry = ptEntityInfo->iMaxContChosenEntry;
    }
  }
}

DoWithRecoverCmd 里最核心的调用就是 CheckForCatchUp 了,用以检查并实施 CatchUp。该函数在 EntityWorker 中频繁出现,涉及的逻辑非常复杂,下一节详述。

2. CheckForCatchUp

CheckForCatchUp 会有两种行为,一种是 MaxPLogEntry == INVALID_ENTRY,即还没有从 PLog 中加载记录,则调用 RangeLoadFromPLog 加载 PLog;另一种是 MaxContChosenEntry < MaxChosenEntry,本地的进度晚于全局的进度,则调用 LimitedCatchUp 做限制性恢复。

// 创建 CatchUp 的 Entry,调用 LimitedCatchUp
// CheckForCatchUp may elimate current EntryInfo_t.
// So it must be called in the end of use of EntryInfo_t.
void clsEntityWorker::CheckForCatchUp(EntityInfo_t *ptEntityInfo, uint32_t iDestAcceptorID,
                                      uint64_t iDestMaxChosenEntry) {
  uint64_t iEntityID = ptEntityInfo->iEntityID;

  if (ptEntityInfo->iMaxChosenEntry < iDestMaxChosenEntry) {
    CertainLogInfo("iMaxChosenEntry %lu iDestMaxChosenEntry %lu", ptEntityInfo->iMaxChosenEntry,
                   iDestMaxChosenEntry);

    ptEntityInfo->iMaxChosenEntry = iDestMaxChosenEntry;
  }

  // 预先创建好 [MaxPLogEntry + 1, MaxChosenEntry] 的 Entry
  if (ptEntityInfo->iMaxPLogEntry != INVALID_ENTRY &&
      ptEntityInfo->iMaxPLogEntry < ptEntityInfo->iMaxChosenEntry) {
    uint32_t iPreOpenCnt = 0;
    for (uint32_t i = 1; i <= m_poConf->GetMaxCatchUpNum(); ++i) {
      uint64_t iEntry = ptEntityInfo->iMaxPLogEntry + i;
      if (iEntry > ptEntityInfo->iMaxChosenEntry) {
        break;
      }

      if (CheckIfEntryNumLimited(iEntityID, false)) {
        break;
      }

      EntryInfo_t *ptInfo = m_poEntryMng->FindEntryInfo(iEntityID, iEntry);
      if (ptInfo == NULL) {
        ptInfo = m_poEntryMng->CreateEntryInfo(ptEntityInfo, iEntry);
        iPreOpenCnt++;
      } else {
        break;
      }
    }
    if (iPreOpenCnt > 0) {
      CertainLogError("E(%lu, %lu) iPreOpenCnt %u", iEntityID, ptEntityInfo->iMaxChosenEntry,
                      iPreOpenCnt);
    }
  }

  // 正在 RangeLoad,则返回
  if (ptEntityInfo->bRangeLoading) {
    CertainLogInfo("iEntityID %lu entrys: %lu %lu %lu", iEntityID,
                   ptEntityInfo->iMaxContChosenEntry, ptEntityInfo->iCatchUpEntry,
                   ptEntityInfo->iMaxChosenEntry);
    return;
  }

  // 正在 GetAll,则返回
  if (ptEntityInfo->bGetAllPending) {
    CertainLogError("EntityID %lu GetAllPending, not need catch up", iEntityID);
    return;
  }

  // RangeLoadFromPLog --> iMaxPLogEntry == INVALID_ENTRY
  // LimitedCatchUp --> iMaxContChosenEntry < iMaxChosenEntry
  int iLoopCnt = 0, iCatchUpCnt = 0;
  while (ptEntityInfo->iMaxContChosenEntry < ptEntityInfo->iMaxChosenEntry) {
    int iRet = LimitedCatchUp(ptEntityInfo, iDestAcceptorID);
    if (iRet < 0) {
      CertainLogError("LimitedCatchUp ret %d", iRet);
      break;
    }

    iLoopCnt++;
    iCatchUpCnt += iRet;

    // LimitedCatchUp 可能触发 RangeLoad,此时需要等待异步恢复完成
    if (ptEntityInfo->bRangeLoading) {
      break;
    }

    // LimitedCatchUp 的循环中 break 退出的,这里也继续 break
    if (ptEntityInfo->iMaxContChosenEntry < ptEntityInfo->iCatchUpEntry) {
      break;
    }

    // 一个都没有恢复,退出
    if (iRet == 0) {
      break;
    }
  }

  // One remain group and one newly extend group is processed at most.
  // But it is possible when db lags badly, and entry increases quickly.
  if (iLoopCnt >= 3) {
    CertainLogError("Check if db lags badly Cnt(%d, %d) E(%lu, %lu)", iLoopCnt, iCatchUpCnt,
                    iEntityID, ptEntityInfo->iMaxChosenEntry);
  }

  // 处于未加载 PLog 的状态,则调用 RangeLoadFromPLog 加载恢复数据
  if (ptEntityInfo->iMaxPLogEntry == INVALID_ENTRY &&
      ptEntityInfo->iMaxContChosenEntry == ptEntityInfo->iMaxChosenEntry &&
      !ptEntityInfo->bRangeLoading) {
    RangeLoadFromPLog(ptEntityInfo);
  }
}

RangeLoadFromPLog 在其他地方也有调用,将在下一小节分析;而 LimitedCatchUp 仅在此处有调用:

// 设定 CatchUp 的目标,遍历,将已经 Chosen 的记录 PushCmdToDBWorker,实现 CatchUp
// 当找不到对应的 EntryInfo 时,会尝试 RangeLoadFromPLog / LoadFromPLogWorker恢复并退出
int clsEntityWorker::LimitedCatchUp(EntityInfo_t *ptEntityInfo, uint32_t iDestAcceptorID) {
  // MaxContChosenEntry <= CatchUpEntry <= MaxChosenEntry
  AssertNotMore(ptEntityInfo->iMaxContChosenEntry, ptEntityInfo->iCatchUpEntry);
  AssertNotMore(ptEntityInfo->iCatchUpEntry, ptEntityInfo->iMaxChosenEntry);

  // Used to make LimitedCatchUp amortized complexity O(1).
  bool bExtend = false;

  uint64_t iEntityID = ptEntityInfo->iEntityID;

  // 设定 MaxCatchUpEntry 目标
  if (ptEntityInfo->iMaxContChosenEntry == ptEntityInfo->iCatchUpEntry) {
    uint64_t iOldCatchUpEntry = ptEntityInfo->iCatchUpEntry;
    CertainLogInfo("extend iEntityID %lu entrys: %lu %lu %lu %lu", iEntityID,
                   ptEntityInfo->iMaxContChosenEntry, ptEntityInfo->iCatchUpEntry,
                   ptEntityInfo->iMaxChosenEntry, ptEntityInfo->iMaxPLogEntry);
    // 最多 CatchUp MaxCatchUpNum 条记录
    ptEntityInfo->iCatchUpEntry = min(ptEntityInfo->iCatchUpEntry + m_poConf->GetMaxCatchUpNum(),
                                      uint64_t(ptEntityInfo->iMaxChosenEntry));
    bExtend = true;

    OSS::ReportBatchCatchUp(ptEntityInfo->iCatchUpEntry - iOldCatchUpEntry);
  }

  if (iDestAcceptorID != INVALID_ACCEPTOR_ID) {
    ptEntityInfo->iActiveAcceptorID = iDestAcceptorID;
  }

  int iReadyCnt = 0, iCatchUpCnt = 0;

  for (uint64_t iEntry = ptEntityInfo->iMaxContChosenEntry + 1;
       iEntry <= ptEntityInfo->iCatchUpEntry; ++iEntry) {
    AssertEqual(ptEntityInfo->bRangeLoading, false);

    EntryInfo_t *ptInfo = m_poEntryMng->FindEntryInfo(iEntityID, iEntry);
    if (ptInfo == NULL) {
      // 内存中找不到 EntryInfo
      // 刚刚拓展了 CatchUp 范围,则尝试 RangeLoad 并退出
      if (bExtend) {
        RangeLoadFromPLog(ptEntityInfo);
        break;
      }

      if (CheckIfEntryNumLimited(iEntityID, false)) {
        CertainLogError("E(%lu, %lu) catchup limited by entry", iEntityID, iEntry);
        break;
      }

      // 内存中找不到,也可能是由于 LRU 缓存的自动删除
      // 调用 LoadFromPLogWorker 恢复单个 entry 的记录
      if (ptEntityInfo->iMaxPLogEntry == INVALID_ENTRY || iEntry <= ptEntityInfo->iMaxPLogEntry) {
        ptInfo = m_poEntryMng->CreateEntryInfo(ptEntityInfo, iEntry);
        if (LoadFromPLogWorker(ptInfo) != 0) {
          m_poEntryMng->DestroyEntryInfo(ptInfo);
          break;
        }
      } else {
        ptInfo = m_poEntryMng->CreateEntryInfo(ptEntityInfo, iEntry);
      }

      CertainLogError("Check it E(%lu, %lu)", iEntityID, iEntry);
    }

    // 确定已经 Chosen 的
    if (!ptInfo->bUncertain && ptInfo->poMachine->GetEntryState() == kEntryStateChosen) {
      // 恰好是接下来需要提交的那一个
      if (ptEntityInfo->iMaxContChosenEntry + 1 == iEntry) {
        // 推入 DB 处理队列
        PushCmdToDBWorker(ptInfo);
        ptEntityInfo->iMaxContChosenEntry++;
        // 成功地 CatchUp 一个
        iCatchUpCnt++;
      }
      // 继续处理,这里很重要,和下面的 break 一起来看
      continue;
    }

    // 还没有 Chosen 的,并且没有加入超时列表的,说明它没有在等待异步操作结果
    if (!ptInfo->bUncertain && !m_poEntryMng->WaitForTimeout(ptInfo)) {
      // 激活该 EntryInfo
      if (!ActivateEntry(ptInfo)) {
        break;
      }
      iReadyCnt++;
    }

    // 流程走到这里,并且不是刚刚拓展过范围,则直接退出
    if (!bExtend) {
      break;
    }
  }
  // break 退出的,MaxContChosenEntry < CatchUpEntry

  if (iCatchUpCnt > 1 || iReadyCnt > 0) {
    CertainLogInfo(
        "iEntityID %lu Entrys: %lu <= %lu <= %lu "
        "iCatchUpCnt %d iReadyCnt %d",
        ptEntityInfo->iEntityID, ptEntityInfo->iMaxContChosenEntry, ptEntityInfo->iCatchUpEntry,
        ptEntityInfo->iMaxChosenEntry, iCatchUpCnt, iReadyCnt);
  }

  return iCatchUpCnt;
}

3. RangeLoadFromPLog

CheckForCatchUpDoWithPaxosCmd 中会调用 RangeLoadFromPLog,用来批量的恢复 PLog 中的 Record 记录到内存中。通过构造一条 RecoverCmd 启动该流程:

int clsEntityWorker::RangeLoadFromPLog(EntityInfo_t *ptEntityInfo) {
  if (CheckIfEntryNumLimited(ptEntityInfo->iEntityID)) {
    CertainLogError("iEntityID %lu stop loading by entry limit", ptEntityInfo->iEntityID);
    return 0;
  }

  if (ptEntityInfo->iMaxContChosenEntry < ptEntityInfo->iMaxChosenEntry) {
    if (m_poEntryMng->CheckIfCatchUpLimited(NULL)) {
      CertainLogError("iEntityID %lu stop loading by catchup limit", ptEntityInfo->iEntityID);
      return 0;
    }
  }

  // 标记 RangeLoading
  AssertEqual(ptEntityInfo->bRangeLoading, false);
  ptEntityInfo->bRangeLoading = true;

  // 构造一条 Recover Cmd
  clsRecoverCmd *poCmd =
      new clsRecoverCmd(ptEntityInfo->iEntityID, ptEntityInfo->iMaxContChosenEntry);

  // 标记 Entity 是否已经加载过 / 当前已知的 MaxPLogEntry / 加载记录的最大长度
  poCmd->SetRangeLoaded(ptEntityInfo->bRangeLoaded);
  poCmd->SetMaxPLogEntry((uint64_t)ptEntityInfo->iMaxPLogEntry);
  poCmd->SetMaxNum(m_poConf->GetMaxCatchUpNum());

  CertainLogInfo("cmd: %s", poCmd->GetTextCmd().c_str());

  // 塞入 PLog Req 队列
  int iRet = clsPLogWorker::EnterPLogReqQueue(poCmd);
  if (iRet != 0) {
    ptEntityInfo->bRangeLoading = false;
    delete poCmd, poCmd = NULL;

    CertainLogError("iEntityID %lu EnterPLogReqQueue ret %d", ptEntityInfo->iEntityID, iRet);
    return -2;
  }

  return 0;
}

PLogWorker 拿到 RecoverCmd 后,调用 FillRecoverCmd

// 填充 RecoverCmd,这个函数名 emmm
// 首先从 DB Engine 中读取 MaxCommittedEntry
// 如果 Entity 还没有 RangeLoaded,就从 PLogDB 中获取 Entity 的 MaxPLogEntry,直接返回
// 否则从 PLogDB 中加载从 MaxCommitedEntry 到 MaxLoadingEntry 的 Entity Record 信息
int clsPLogWorker::FillRecoverCmd(clsRecoverCmd *poRecoverCmd) {
  CertainLogDebug("cmd: %s", poRecoverCmd->GetTextCmd().c_str());

  uint64_t iEntityID = poRecoverCmd->GetEntityID();

  // 默认是 0
  uint64_t iMaxCommitedEntry = 0;
  TIMERUS_START(iGetEntityMetaUseTimeUS);
  uint32_t iFlag = 0;
  // 在 DataDB 中获取 MaxCommitedEntry
  int iRet = m_poDBEngine->GetEntityMeta(iEntityID, iMaxCommitedEntry, iFlag);
  TIMERUS_STOP(iGetEntityMetaUseTimeUS);
  s_poGetEntityMetaTimeStat->Update(iGetEntityMetaUseTimeUS);
  if (iRet != 0 && iRet != eRetCodeNotFound) {
    CertainLogFatal("GetEntityMeta iFlag %u cmd: %s ret %d", iFlag,
                    poRecoverCmd->GetTextCmd().c_str(), iRet);
    return -1;
  }

	// 这里的 kDBFlagCheckGetAll 代码里没有任何地方有设定,所以忽略
  if (iFlag == kDBFlagCheckGetAll) {...}

  typedef vector<pair<uint64_t, EntryRecord_t> > EntryRecordList_t;
  EntryRecordList_t tEntryRecordList;

  // 如果启用 MaxPLogEntry(默认启用),并且该 Entity 还没有做过 RangeLoad
  // 那么仅从 PLogDB 中读取 MaxPLogEntry,而后标记 HasMore 并直接返回
  if (m_poConf->GetEnableMaxPLogEntry() > 0 && !poRecoverCmd->IsRangeLoaded()) {
    AssertEqual(poRecoverCmd->GetMaxPLogEntry(), INVALID_ENTRY);
    PLogEntityMeta_t tMeta = {0};
    TIMERUS_START(iGetPLogMetaUseTimeUS);
    // 从 PLogDB 中获取 Entity 的 MaxPLogEntry
    iRet = m_poPLogEngine->GetPLogEntityMeta(iEntityID, tMeta);
    TIMERUS_STOP(iGetPLogMetaUseTimeUS);
    OSS::ReportPLogGetMetaKeyTimeMS(iRet, iGetPLogMetaUseTimeUS / 1000);
    if (iRet != 0 && iRet != Certain::eRetCodeNotFound) {
      CertainLogFatal("iEntityID %lu GetPLogEntityMeta ret %d", iEntityID, iRet);
      return -5;
    } else {
      // 要么是从 DB 中读到的,要么是 tMeta 的默认值 0
      uint64_t iMaxPLogEntry = tMeta.iMaxPLogEntry;

      // GetAll 逻辑影响,暂时忽略
      if (iMaxPLogEntry < iMaxCommitedEntry) {
        iMaxPLogEntry = iMaxCommitedEntry;
      }

      // 读取到的 MaxPLogEntry / MaxCommitedEntry
      poRecoverCmd->SetMaxPLogEntry(iMaxPLogEntry);
      poRecoverCmd->SetMaxCommitedEntry(iMaxCommitedEntry);
      poRecoverCmd->SetMaxLoadingEntry(iMaxCommitedEntry);
      poRecoverCmd->SetEntryRecordList(tEntryRecordList);
      // 标记 HasMore,说明后面还有数据,这里只是把几个 Entry 进度读出来
      poRecoverCmd->SetHasMore(true);

      // 直接返回
      return eRetCodeOK;
    }
  }

  bool bHasMore = false;
  vector<pair<uint64_t, string> > vecRecord;
  uint64_t iMaxLoadingEntry = iMaxCommitedEntry + poRecoverCmd->GetMaxNum();
  TIMERUS_START(iRangeLoadUseTimeUS);
  // 从 PLogDB 中加载从 MaxCommitedEntry 到 MaxLoadingEntry 的 Entity Record 信息
  iRet = m_poPLogEngine->LoadUncommitedEntrys(iEntityID, iMaxCommitedEntry, iMaxLoadingEntry,
                                              vecRecord, bHasMore);
  TIMERUS_STOP(iRangeLoadUseTimeUS);
  s_poLoadUncommitedEntrysTimeStat->Update(iRangeLoadUseTimeUS);
  OSS::ReportPLogRangeLoadTimeMS(iRet, iRangeLoadUseTimeUS / 1000);
  if (iRangeLoadUseTimeUS > 100000) {
    CertainLogError("E(%lu, %lu) entrys: %lu %lu iRangeLoadUseTimeUS %lu", iEntityID,
                    iMaxCommitedEntry, iPLogCommitedEntry, iMaxLoadingEntry, iRangeLoadUseTimeUS);
  }
  if (iRet != 0) {
    CertainLogFatal("LoadUncommitedEntrys E(%lu, %lu) ret %d", iEntityID, iMaxCommitedEntry, iRet);
    return -2;
  }

  // 遍历
  for (uint32_t i = 0; i < vecRecord.size(); ++i) {
    uint64_t iEntry = vecRecord[i].first;

    // 反序列化
    EntryRecord_t tRecord;
    iRet = StringToEntryRecord(vecRecord[i].second, tRecord);
    if (iRet != 0) {
      CertainLogFatal("E(%lu, %lu) StringToEntryRecord ret %d", iEntityID, iEntry, iRet);
      return -3;
    }

    // 读取对应的 Value
    if (tRecord.tValue.iValueID > 0 && !tRecord.tValue.bHasValue) {
      TIMERUS_START(iGetValueUseTimeUS);
      iRet = m_poPLogEngine->GetValue(iEntityID, iEntry, tRecord.tValue.iValueID,
                                      tRecord.tValue.strValue);
      TIMERUS_STOP(iGetValueUseTimeUS);
      s_poGetTimeStat->Update(iGetValueUseTimeUS);
      OSS::ReportPLogGetValueTimeMS(iRet, iGetValueUseTimeUS / 1000);

      if (iGetValueUseTimeUS > 100000) {
        CertainLogError("E(%lu, %lu) iGetValueUseTimeUS %lu", iEntityID, iEntry,
                        iGetValueUseTimeUS);
      }
      if (iRet != 0) {
        CertainLogFatal("GetValue ret %d E(%lu, %lu) record: %s", iRet, iEntityID, iEntry,
                        EntryRecordToString(tRecord).c_str());
        return -4;
      }
      tRecord.tValue.bHasValue = true;
    }

    tEntryRecordList.push_back(pair<uint64_t, EntryRecord_t>(iEntry, tRecord));
  }

  // 将读取到的信息写入 RecoverCmd
  poRecoverCmd->SetMaxCommitedEntry(iMaxCommitedEntry);
  // 实际读到的 Record 可能没这么多,这里语义不符
  poRecoverCmd->SetMaxLoadingEntry(iMaxLoadingEntry);
  // 实际读到的 Record
  poRecoverCmd->SetEntryRecordList(tEntryRecordList);
  poRecoverCmd->SetHasMore(bHasMore);

  return eRetCodeOK;
}

而后 PLogWorker 会把这条 RecoverCmd 塞回回复队列里,EntityWorker 回调用 RangeRecoverFromPLog 处理:

// 从 PLog 回复中确认是否已经完成 RangeLoaded
// 若出错,处理错误后直接返回
// 从读取到的 EntryRecordList 恢复 EntityInfo 信息,更新 MaxChosenEntry
// 处理 WaitingMsg,检查 CatchUp,回复 RecoverCmd
int clsEntityWorker::RangeRecoverFromPLog(clsRecoverCmd *poCmd) {
  uint64_t iEntityID = poCmd->GetEntityID();
  uint64_t iEntry = 0;

  EntityInfo_t *ptEntityInfo = m_poEntityMng->FindEntityInfo(iEntityID);
  AssertNotEqual(ptEntityInfo, NULL);

  // 完成 RangeLoad,关闭标记
  AssertEqual(ptEntityInfo->bRangeLoading, true);
  ptEntityInfo->bRangeLoading = false;

  if (poCmd->GetResult() != eRetCodeOK) {
    // Recover Cmd 失败,处理错误后返回
    // 回复 ClientCmd 错误
    if (ptEntityInfo->poClientCmd != NULL && ptEntityInfo->poClientCmd->GetCmdID() == kRecoverCmd) {
      InvalidClientCmd(ptEntityInfo, poCmd->GetResult());
    }

    // 将失败的 IsCheckGetAll Cmd 重新塞入 GetAllWorker 中,暂时忽略
    if (!ptEntityInfo->bGetAllPending && poCmd->IsCheckGetAll() &&
        GetCurrTimeMS() >= ptEntityInfo->iGetAllFinishTimeMS + 1000) {
      clsPaxosCmd *po = new clsPaxosCmd(GetPeerAcceptorID(ptEntityInfo), iEntityID, 0, NULL, NULL);

      int iRet = clsGetAllWorker::EnterReqQueue(po);
      if (iRet != 0) {
        delete po, po = NULL;
      }

      CertainLogError("iEntityID %lu triggle getall by peer ret %d", iEntityID, iRet);

      ptEntityInfo->bGetAllPending = true;
    }
    return 0;
  }

  // <Entry, Record>
  typedef vector<pair<uint64_t, EntryRecord_t> > EntryRecordList_t;
  const EntryRecordList_t &tEntryRecordList = poCmd->GetEntryRecordList();
  AssertNotMore(tEntryRecordList.size(), m_poConf->GetMaxCatchUpNum());

  // 根据 Cmd 返回的信息,更新已知的全局 Chosen 进度,这里的更新有些是冗余的
  if (ptEntityInfo->iMaxChosenEntry < poCmd->GetMaxCommitedEntry()) {
    ptEntityInfo->iMaxChosenEntry = poCmd->GetMaxCommitedEntry();
  }
  if (poCmd->IsHasMore()) {
    if (ptEntityInfo->iMaxChosenEntry < poCmd->GetMaxLoadingEntry()) {
      ptEntityInfo->iMaxChosenEntry = poCmd->GetMaxLoadingEntry();
    }
  }
  if (tEntryRecordList.size() > 0) {
    uint64_t iEntry = tEntryRecordList.rbegin()->first;
    if (tEntryRecordList.rbegin()->second.bChosen) {
      if (ptEntityInfo->iMaxChosenEntry < iEntry) {
        ptEntityInfo->iMaxChosenEntry = iEntry;
      }
    } else {
      if (ptEntityInfo->iMaxChosenEntry < iEntry - 1) {
        ptEntityInfo->iMaxChosenEntry = iEntry - 1;
      }
    }
  }

  AssertNotMore(ptEntityInfo->iMaxContChosenEntry, ptEntityInfo->iCatchUpEntry);
  if (ptEntityInfo->iMaxContChosenEntry < poCmd->GetMaxCommitedEntry()) {
    // 本地的 Committed,一定是已知的
    ptEntityInfo->iMaxContChosenEntry = poCmd->GetMaxCommitedEntry();
  }
  if (ptEntityInfo->iCatchUpEntry < ptEntityInfo->iMaxContChosenEntry) {
    ptEntityInfo->iCatchUpEntry = ptEntityInfo->iMaxContChosenEntry;
  }
  if (ptEntityInfo->iNotifyedEntry < poCmd->GetMaxCommitedEntry()) {
    // 已经 Committed,一定也是不需要 Notify 的
    ptEntityInfo->iNotifyedEntry = poCmd->GetMaxCommitedEntry();
  }

  // 遍历读到的 EntryRecordList,恢复 EntryInfo
  for (uint32_t i = 0; i < tEntryRecordList.size(); ++i) {
    iEntry = tEntryRecordList[i].first;
    AssertLess(poCmd->GetMaxCommitedEntry(), iEntry);
    AssertNotMore(iEntry, poCmd->GetMaxLoadingEntry());

    EntryInfo_t *ptInfo = m_poEntryMng->FindEntryInfo(iEntityID, iEntry);
    if (ptInfo == NULL) {
      const EntryRecord_t &tRecord = tEntryRecordList[i].second;

      // 除了当前 Chosen 且 MaxContChosenEntry + 1 == iEntry 这种情况
      // 其他的均需要检查数量限制,超过则直接跳过
      if (!(tRecord.bChosen && ptEntityInfo->iMaxContChosenEntry + 1 == iEntry)) {
        if (CheckIfEntryNumLimited(iEntityID)) {
          CertainLogError("E(%lu, %lu) catchup limited by entry", iEntityID, iEntry);
          continue;
        }
      }

      ptInfo = m_poEntryMng->CreateEntryInfo(ptEntityInfo, iEntry);

      // 创建并恢复 EntryInfo
      AssertNotMore(0, RecoverEntry(ptInfo, tRecord));
      // 上面已经更新过了,这里应该是多余的
      UpdateMaxChosenEntry(ptEntityInfo, ptInfo);
    } else {
      CertainLogError("E(%lu, %lu) st %d maybe loaded by plog", iEntityID, iEntry,
                      ptInfo->poMachine->GetEntryState());
    }

    // Speed up applying DB.
    if (!ptInfo->bUncertain && ptInfo->poMachine->GetEntryState() == kEntryStateChosen) {
      // 仅 MaxContChosenEntry + 1 == iEntry 时,推入 DB Worker
      if (ptEntityInfo->iMaxContChosenEntry + 1 == iEntry) {
        PushCmdToDBWorker(ptInfo);

        ptEntityInfo->iMaxContChosenEntry++;

        if (ptEntityInfo->iCatchUpEntry < ptEntityInfo->iMaxContChosenEntry) {
          AssertEqual(ptEntityInfo->iCatchUpEntry + 1, ptEntityInfo->iMaxContChosenEntry);
          ptEntityInfo->iCatchUpEntry++;
        }
        AssertNotMore(ptEntityInfo->iCatchUpEntry, ptEntityInfo->iMaxChosenEntry);
      }
    }
  }

  if (!poCmd->IsHasMore()) {
    // 没有更多可以读取的记录了
    if (tEntryRecordList.size() > 0) {
      // 可以知道 MaxPLogEntry 可以到最后一个 Entry
      ptEntityInfo->iMaxPLogEntry = tEntryRecordList.rbegin()->first;
    } else {
      // 列表为空,MaxPLogEntry 可以到 Cmd->MaxCommittedEntry
      ptEntityInfo->iMaxPLogEntry = poCmd->GetMaxCommitedEntry();
    }
  } else if (!ptEntityInfo->bRangeLoaded && poCmd->GetMaxPLogEntry() != INVALID_ENTRY) {
    // 还没有完成 RangeLoaded,且 Cmd->MaxPLogEntry 有数值
    CertainLogImpt("iEntityID %lu iMaxPLogEntry %lu -> %lu", iEntityID, ptEntityInfo->iMaxPLogEntry,
                   poCmd->GetMaxPLogEntry());
    Assert(!poCmd->IsRangeLoaded());

    if (ptEntityInfo->iMaxPLogEntry == INVALID_ENTRY ||
        ptEntityInfo->iMaxPLogEntry < poCmd->GetMaxPLogEntry()) {
      // 使用 Cmd->MaxPLogEntry 作为 MaxPLogEntry
      ptEntityInfo->iMaxPLogEntry = poCmd->GetMaxPLogEntry();
    } else {
      CertainLogFatal("iEntityID %lu iMaxPLogEntry %lu -> %lu", iEntityID,
                      ptEntityInfo->iMaxPLogEntry, poCmd->GetMaxPLogEntry());
    }
  }

  // 标记完成 Range Loaded
  ptEntityInfo->bRangeLoaded = true;

  CertainLogInfo("iEntityID %lu entrys: %lu %lu %lu iEmptyEntryCnt %u", iEntityID,
                 poCmd->GetMaxCommitedEntry(), poCmd->GetMaxLoadingEntry(),
                 ptEntityInfo->iMaxPLogEntry, iEmptyEntryCnt);

  // 处理 WaitingMsg,以后再分析等待的 Msg 是啥
  uint32_t iWaitingMsgPtrSize = sizeof(clsPaxosCmd *) * m_iAcceptorNum;
  clsPaxosCmd **apWaitingMsg = (clsPaxosCmd **)malloc(iWaitingMsgPtrSize);
  std::unique_ptr<char> oAutoFreeWaitingMsgPtr((char *)apWaitingMsg);

  memcpy(apWaitingMsg, ptEntityInfo->apWaitingMsg, iWaitingMsgPtrSize);
  memset(ptEntityInfo->apWaitingMsg, 0, iWaitingMsgPtrSize);

  int iRet = DoWithWaitingMsg(apWaitingMsg, m_iAcceptorNum);
  m_poMemCacheCtrl->UpdateTotalSize(ptEntityInfo);
  if (iRet != 0) {
    CertainLogError("DoWithWaitingMsg ret %d", iRet);
  }

  // 检查是否进行 CatchUp
  CheckForCatchUp(ptEntityInfo, INVALID_ACCEPTOR_ID, 0);

  if (ptEntityInfo->poClientCmd != NULL && ptEntityInfo->poClientCmd->GetCmdID() == kRecoverCmd) {
    // ClientCmd 是 RecoverCmd,设定 MaxChosenEntry
    clsRecoverCmd *po = dynamic_cast<clsRecoverCmd *>(ptEntityInfo->poClientCmd);

    po->SetMaxChosenEntry(uint64_t(ptEntityInfo->iMaxChosenEntry));

    if (ptEntityInfo->iMaxContChosenEntry < ptEntityInfo->iMaxChosenEntry) {
      // MaxContChosenEntry < MaxChosenEntry, 返回 CatchUp 状态码
      InvalidClientCmd(ptEntityInfo, eRetCodeCatchUp);
    } else {
      // 否则返回 OK
      InvalidClientCmd(ptEntityInfo, eRetCodeOK);
    }
  }

  return 0;
}

总的来说,RangeLoadFromPLog 会异步的启动恢复流程,经过 FillRecoverCmd 读取 Record 记录,再回到 RangeRecoverFromPLog 恢复成 EntryInfo、更新进度信息。

4. DoWithTimeout + ActivateEntry

EntityWorker 的 Loop 循环中也需要处理各类超时情况,包括 Entry 的超时和 Cmd 的超时:

// 处理超时的 Entry
int clsEntityWorker::DoWithTimeout(EntryInfo_t *ptInfo) {
  EntityInfo_t *ptEntityInfo = ptInfo->ptEntityInfo;
  if (ptEntityInfo == NULL) {
    CertainLogFatal("iEntry %lu", ptInfo->iEntry);
  }
  AssertNotEqual(ptEntityInfo, NULL);

  uint64_t iEntityID = ptEntityInfo->iEntityID;
  uint64_t iEntry = ptInfo->iEntry;
  AssertEqual(Hash(iEntityID) % m_poConf->GetEntityWorkerNum(), m_iWorkerID);

  if (ptInfo->iEntry == 0) {
    // Entry(0),用来匹配 RecoverCmd
    if (ptEntityInfo->poClientCmd != NULL) {
      AssertEqual(ptEntityInfo->poClientCmd->GetCmdID(), kRecoverCmd);
    }

    // 直接超时判定 Cmd 超时
    InvalidClientCmd(ptEntityInfo, eRetCodeTimeout);

    return eRetCodePtrReuse;
  }

  // 标记 Cmd 超时
  if (ptEntityInfo->poClientCmd != NULL && ptEntityInfo->poClientCmd->GetEntry() == iEntry) {
    InvalidClientCmd(ptEntityInfo, eRetCodeTimeout);
    m_poEntryMng->AddTimeout(ptInfo, 10000);

    return eRetCodePtrReuse;
  }

  // 落盘超时
  if (ptInfo->bUncertain) {
    CertainLogError("Check if disk busy E(%lu, %lu) st %d", iEntityID, iEntry,
                    ptInfo->poMachine->GetEntryState());

    return eRetCodePtrReuse;
  }

  // GetAll 超时
  if (ptEntityInfo->bGetAllPending) {
    InvalidClientCmd(ptEntityInfo, eRetCodeTimeout);
    m_poEntryMng->DestroyEntryInfo(ptInfo);

    CertainLogError("E(%lu, %lu) GetAllPending, timeout and destroy entry", iEntityID, iEntry);

    return eRetCodePtrReuse;
  }

  // 其他情况,重新激活一下该 EntryInfo
  ActivateEntry(ptInfo);

  return eRetCodePtrReuse;
}

ActivateEntry 激活操作大致分为三种情况:本地确定 Chosen 的,可以清理返回;远程 Chosen 的,找 ActiveAcceptor 同步;还没有 Chosen 的,广播一下。

bool clsEntityWorker::ActivateEntry(EntryInfo_t *ptInfo) {
  EntityInfo_t *ptEntityInfo = ptInfo->ptEntityInfo;
  uint64_t iEntityID = ptEntityInfo->iEntityID;
  uint64_t iEntry = ptInfo->iEntry;

  uint32_t iLocalAcceptorID = ptEntityInfo->iLocalAcceptorID;
  clsEntryStateMachine *poMachine = ptInfo->poMachine;
  AssertEqual(ptInfo->bUncertain, false);
  AssertEqual(m_poEntryMng->WaitForTimeout(ptInfo), false);

  // The entry is newly open and has no activity.
  // 刚新建的,没有有效信息,清掉返回
  if (ptEntityInfo->iMaxChosenEntry < iEntry && poMachine->IsLocalEmpty()) {
    CertainLogInfo("iEntityID %lu entrys: %lu %lu %lu ref %d", iEntityID,
                   ptEntityInfo->iMaxContChosenEntry, ptEntityInfo->iCatchUpEntry,
                   ptEntityInfo->iMaxChosenEntry, ptEntityInfo->iRefCount);
    CleanUpEntry(ptInfo);
    return false;
  }

  // The entry has been sent to the DB worker.
  // 已经送到 DB Worker,清掉返回
  if (ptEntityInfo->iMaxContChosenEntry >= iEntry) {
    CleanUpEntry(ptInfo);
    return false;
  }

  // Not need to broadcast, wait for being elimated or commited.
  // 已经 Chosen 了,不需要广播,增加被杀的概率,返回
  if (poMachine->GetEntryState() == kEntryStateChosen) {
    CertainLogError("E(%lu, %lu) st %d", iEntityID, iEntry, poMachine->GetEntryState());
    m_poEntryMng->IncreaseEliminatePriority(ptInfo);
    return false;
  }

  // For catch up.
  // 还没有 Chosen,但全局已经 Chosen 了,和其他机器同步一下
  if (iEntry <= ptEntityInfo->iMaxChosenEntry) {
    if (m_poEntryMng->CheckIfCatchUpLimited(ptInfo)) {
      CertainLogError("E(%lu, %lu) catchup limited", iEntityID, iEntry);
      return false;
    }

    // Machine A fix the entry only.
    // 该 Entry 已经处理两次以上
    if (ptInfo->iInteractCnt > 2) {
      CertainLogError("May need fix E(%lu, %lu) st %d", iEntityID, iEntry,
                      ptInfo->poMachine->GetEntryState());
      OSS::ReportMayNeedFix();

      if (m_poConf->GetEnableAutoFixEntry()) {
        CertainLogZero("ProposeNoop for Fix E(%lu, %lu) st %d", iEntityID, iEntry,
                       ptInfo->poMachine->GetEntryState());

        // 发起一个空提议
        ProposeNoop(ptEntityInfo, ptInfo);
        return true;
      }
    }

    // 该 Entry 已经处理三次以上,清掉返回
    if (ptInfo->iInteractCnt > 3) {
      CleanUpEntry(ptInfo);
      return false;
    }

    // 增加处理计数
    ptInfo->iInteractCnt++;
    uint32_t iActiveAcceptorID = GetPeerAcceptorID(ptEntityInfo);

    AssertEqual(ptInfo->bRemoteUpdated, false);
    ptInfo->bRemoteUpdated = true;
    // 和 ActiveAcceptor 同步一下这个 Entry 的状态
    SyncEntryRecord(ptInfo, iActiveAcceptorID, 0);
    ptInfo->bRemoteUpdated = false;

    OSS::ReportSingleCatchUp();

    // 加入到超时列表,超时时间 15 秒
    m_poEntryMng->AddTimeout(ptInfo, 15000);

    CertainLogError("sync acceptor %u E(%lu, %lu) st %d", iActiveAcceptorID, iEntityID, iEntry,
                    ptInfo->poMachine->GetEntryState());

    return true;
  }

  if (ptInfo->iInteractCnt > 3) {
    CleanUpEntry(ptInfo);
    return false;
  }

  // 还没有 Chosen,并且也没有确定全局已经 Chosen
  const EntryRecord_t &tRecord = poMachine->GetRecord(iLocalAcceptorID);
  CertainLogError("Broadcast E(%lu, %lu) st %u iMaxChosenEntry %lu r[%u] %s", iEntityID, iEntry,
                  poMachine->GetEntryState(), ptEntityInfo->iMaxChosenEntry, iLocalAcceptorID,
                  EntryRecordToString(tRecord).c_str());

  ptInfo->iInteractCnt++;
  // 广播确认一下
  BroadcastToRemote(ptInfo, ptInfo->poMachine);
  OSS::ReportTimeoutBroadcast();

  // Wait longer, as broadcast is heavy, and many candidates to reply.
  // 加入超时列表,超时时间 30 秒
  m_poEntryMng->AddTimeout(ptInfo, 30000);

  return true;
}

5. 总结

PaxosStore 的代码大多是异步的,一个长的处理流程会拆成多个小块,梳理起来确实会有些复杂。本篇把这些细碎的逻辑拎出来阅读,下一步将会把这些小流程串起来,看看完整的 Paxos / CatchUp 流程。