博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka服务端源代码分析之ReplicaManager
阅读量:2429 次
发布时间:2019-05-10

本文共 29524 字,大约阅读时间需要 98 分钟。

ReplicaManager

ReplicaManager负责管理和操作集群中Broker的副本,还承担了一部分的分区管理工作。

class ReplicaManager(val config: KafkaConfig, //kafka 配置                     metrics: Metrics, //监控指标                     time: Time, //定时器                     val zkClient: KafkaZkClient, // zk 客户端                     scheduler: Scheduler, // kafka 调度器                     val logManager: LogManager, // 日志管理器                     val isShuttingDown: AtomicBoolean, // 是否已经关闭                     quotaManagers: QuotaManagers, // 配额管理器                     val brokerTopicStats: BrokerTopicStats, // broker topic监控指标                     val metadataCache: MetadataCache,  // broker 元数据缓存                     logDirFailureChannel: LogDirFailureChannel,                     // 处理延时PRODUCE请求的Purgatory                     val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],                     // 处理延时FETCH请求的Purgatory                     val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],                     // 处理延时DELETE_RECORDS请求的Purgatory                     val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],                     // 处理延时ELECT_LEADERS请求的Purgatory                     val delayedElectPreferredLeaderPurgatory: DelayedOperationPurgatory[DelayedElectPreferredLeader],                     threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup {
... // 隔离过期 Controller 发送的请求 @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch // 负责从leader向follower拉取数据 val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower) ...}

ReplicaManager主要实现以下功能:

  1. 副本写入
  2. 分区及副本管理
  3. ISR管理

1.副本读写

appendRecords负责副本写入,具体流程如下:

在这里插入图片描述

def appendRecords(timeout: Long,  // 请求处理超时时间                    requiredAcks: Short, // 请求acks设置                    internalTopicsAllowed: Boolean, // 是否允许写入内部主题                    isFromClient: Boolean, // 写入方是否是client,有可能其他broker                    entriesPerPartition: Map[TopicPartition, MemoryRecords], // 待写入消息                    responseCallback: Map[TopicPartition, PartitionResponse] => Unit, // 回调逻辑                    delayedProduceLock: Option[Lock] = None,                    recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()) {
// requiredAcks合法取值是-1,0,1,否则视为非法 if (isValidRequiredAcks(requiredAcks)) {
val sTime = time.milliseconds // 调用appendToLocalLog方法写入消息集合到本地日志 val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, isFromClient = isFromClient, entriesPerPartition, requiredAcks) debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) val produceStatus = localProduceResults.map {
case (topicPartition, result) => topicPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // 设置下一条待写入消息的位移值 // 构建PartitionResponse封装写入结果 new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, result.info.logStartOffset)) // response status } recordConversionStatsCallback(localProduceResults.mapValues(_.info.recordConversionStats)) // 需要等待其他副本完成写入 if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) // 创建DelayedProduce延时请求对象 val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock) val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) } else {
// 无需等待其他副本写入完成,可以立即发送Response val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) // 调用回调逻辑然后返回即可 responseCallback(produceResponseStatus) } } else {
val responseStatus = entriesPerPartition.map {
case (topicPartition, _) => topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS, LogAppendInfo.UnknownLogAppendInfo.firstOffset.getOrElse(-1), RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset) } responseCallback(responseStatus) } }

fetchMessages负责读取副本数据,具体流程如下:

在这里插入图片描述

def fetchMessages(timeout: Long,  // 请求超时时间                    replicaId: Int, // 副本id                    fetchMinBytes: Int, // 能获取的最小字节数                    fetchMaxBytes: Int, // 能获取的最大字节数                    hardMaxBytesLimit: Boolean,                    fetchInfos: Seq[(TopicPartition, PartitionData)], // 读取信息                    quota: ReplicaQuota = UnboundedQuota,                    // Response回调函数                    responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,                    isolationLevel: IsolationLevel) {
// 判断该读取请求是否来自于Follower副本或Consumer val isFromFollower = Request.isValidBrokerId(replicaId) val fetchOnlyFromLeader = replicaId != Request.DebuggingConsumerId && replicaId != Request.FutureLocalReplicaId // 根据请求发送方判断可读取范围 // 如果请求来自于普通消费者,那么可以读到高水位值 // 如果请求来自于配置了READ_COMMITTED的消费者,那么可以读到Log Stable Offset值 // 如果请求来自于Follower副本,那么可以读到LEO值 val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId) FetchLogEnd else if (isolationLevel == IsolationLevel.READ_COMMITTED) FetchTxnCommitted else FetchHighWatermark def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
val result = readFromLocalLog( replicaId = replicaId, fetchOnlyFromLeader = fetchOnlyFromLeader, fetchIsolation = fetchIsolation, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, quota = quota) if (isFromFollower) updateFollowerLogReadResults(replicaId, result) else result } // 从本地读取日志信息 val logReadResults = readFromLog() // check if this fetch request can be satisfied right away var bytesReadable: Long = 0 var errorReadingData = false val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult] // 统计总共可读取的字节数 logReadResults.foreach {
case (topicPartition, logReadResult) => if (logReadResult.error != Errors.NONE) errorReadingData = true bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes logReadResultMap.put(topicPartition, logReadResult) } // 判断是否能够立即返回Reponse,满足以下4个条件中的任意一个即可: // 1. 请求没有设置超时时间,说明请求方想让请求被处理后立即返回 // 2. 未获取到任何数据 // 3. 已累积到足够多的数据 // 4. 读取过程中出错 if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
// 构建返回结果 val fetchPartitionData = logReadResults.map {
case (tp, result) => tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, result.lastStableOffset, result.info.abortedTransactions) } // 调用回调函数 responseCallback(fetchPartitionData) } else {
// 如果无法立即完成请求 val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)] fetchInfos.foreach {
case (topicPartition, partitionData) => logReadResultMap.get(topicPartition).foreach(logReadResult => {
val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) // 构建DelayedFetch延时请求对象 val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback) val delayedFetchKeys = fetchPartitionStatus.map {
case (tp, _) => new TopicPartitionOperationKey(tp) } // 再一次尝试完成请求,如果依然不能完成,则交由Purgatory等待后续处理 delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) } }

2.分区副本管理

分区变更是Controller给Broker发送LeaderAndIsrRequest 请求来实现的。当 Broker 端收到这类请求后,会调用副本管理器的 becomeLeaderOrFollower 方法来处理,并依次执行“成为 Leader 副本”和“成为 Follower 副本”的逻辑,令当前 Broker 互换分区 A、B 副本的角色。具体流程如下:

在这里插入图片描述

becomeLeaderOrFollower 方法

def becomeLeaderOrFollower(correlationId: Int,                             leaderAndIsrRequest: LeaderAndIsrRequest,                             onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
leaderAndIsrRequest.partitionStates.asScala.foreach {
case (topicPartition, stateInfo) => stateChangeLogger.trace(s"Received LeaderAndIsr request $stateInfo " + s"correlation id $correlationId from controller ${leaderAndIsrRequest.controllerId} " + s"epoch ${leaderAndIsrRequest.controllerEpoch} for partition $topicPartition") } replicaStateChangeLock synchronized {
// 如果LeaderAndIsrRequest携带的Controller Epoch // 小于当前Controller的Epoch值 if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller ${leaderAndIsrRequest.controllerId} with " + s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " + s"Latest known controller epoch is $controllerEpoch") // 说明Controller已经易主,抛出相应异常 leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception) } else {
val responseMap = new mutable.HashMap[TopicPartition, Errors] val controllerId = leaderAndIsrRequest.controllerId // 更新当前Controller Epoch值 controllerEpoch = leaderAndIsrRequest.controllerEpoch // First check partition's leader epoch val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]() val newPartitions = new mutable.HashSet[Partition] // 对leaderAndIsrRequest请求中的tp逐个处理 leaderAndIsrRequest.partitionStates.asScala.foreach {
case (topicPartition, stateInfo) => val partition = getPartition(topicPartition).getOrElse {
val createdPartition = getOrCreatePartition(topicPartition) newPartitions.add(createdPartition) createdPartition } val currentLeaderEpoch = partition.getLeaderEpoch val requestLeaderEpoch = stateInfo.basePartitionState.leaderEpoch // 如果分区是Offline状态 if (partition eq ReplicaManager.OfflinePartition) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " + "partition is in an offline log directory") responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) } else if (requestLeaderEpoch > currentLeaderEpoch) {
if(stateInfo.basePartitionState.replicas.contains(localBrokerId)) partitionState.put(partition, stateInfo) else {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " + s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " + s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}") responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) } } else {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition since its associated " + s"leader epoch $requestLeaderEpoch is not higher than the current " + s"leader epoch $currentLeaderEpoch") responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH) } } // 确定Broker上副本是哪些分区的Leader副本 val partitionsTobeLeader = partitionState.filter {
case (_, stateInfo) => stateInfo.basePartitionState.leader == localBrokerId } // 确定Broker上副本是哪些分区的Follower副本 val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys // 调用makeLeaders方法为partitionsToBeLeader所有分区 // 执行"成为Leader副本"的逻辑 val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap) else Set.empty[Partition] // 调用makeFollowers方法为令partitionsToBeFollower所有分区 // 执行"成为Follower副本"的逻辑 val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty) makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap) else Set.empty[Partition] leaderAndIsrRequest.partitionStates.asScala.keys.foreach {
topicPartition => if (localReplica(topicPartition).isEmpty && (allPartitions.get(topicPartition) ne ReplicaManager.OfflinePartition)) allPartitions.put(topicPartition, ReplicaManager.OfflinePartition) } if (!hwThreadInitialized) {
startHighWaterMarksCheckPointThread() hwThreadInitialized = true } val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState] for (partition <- newPartitions) {
val topicPartition = partition.topicPartition if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
partition.localReplica.foreach {
replica => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) // Add future replica to partition's map partition.getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false) // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move // replica from source dir to destination dir logManager.abortAndPauseCleaning(topicPartition) futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(leader, partition.getLeaderEpoch, replica.highWatermark.messageOffset)) } } } replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset) replicaFetcherManager.shutdownIdleFetcherThreads() replicaAlterLogDirsManager.shutdownIdleFetcherThreads() onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower) new LeaderAndIsrResponse(Errors.NONE, responseMap.asJava) } } }

makeLeaders方法

private def makeLeaders(controllerId: Int, // controller 所在的broker id                          epoch: Int, // controller epoch值                          partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState], // LeaderAndIsrRequest请求中携带的分区信息                          correlationId: Int, // 请求的Correlation字段,只用于日志调试                          // 按照主题分区分组的异常错误集合                          responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = {
partitionState.keys.foreach {
partition => stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from " + s"controller $controllerId epoch $epoch starting the become-leader transition for " + s"partition ${partition.topicPartition}") } // 使用Errors.NONE初始化ResponseMap for (partition <- partitionState.keys) responseMap.put(partition.topicPartition, Errors.NONE) val partitionsToMakeLeaders = mutable.Set[Partition]() try {
// First stop fetchers for all the partitions // 停止partition对应的fetch线程 replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition)) // Update the partition information to be the leader partitionState.foreach{
case (partition, partitionStateInfo) => try {
// 对每个partition分别调用makeLeader()方法,使得当前partition成为leader if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) {
partitionsToMakeLeaders += partition stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " + s"controller $controllerId epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} " + s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch})") } else stateChangeLogger.info(s"Skipped the become-leader state change after marking its " + s"partition as leader with correlation id $correlationId from controller $controllerId epoch $epoch for " + s"partition ${partition.topicPartition} (last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " + s"since it is already the leader for the partition.") } catch {
case e: KafkaStorageException => stateChangeLogger.error(s"Skipped the become-leader state change with " + s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " + s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) since " + s"the replica for the partition is offline due to disk error $e") val dirOpt = getLogDir(partition.topicPartition) error(s"Error while making broker the leader for partition $partition in dir $dirOpt", e) responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR) } } } catch {
case e: Throwable => partitionState.keys.foreach {
partition => stateChangeLogger.error(s"Error while processing LeaderAndIsr request correlationId $correlationId received " + s"from controller $controllerId epoch $epoch for partition ${partition.topicPartition}", e) } // Re-throw the exception for it to be caught in KafkaApis throw e } partitionState.keys.foreach {
partition => stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " + s"epoch $epoch for the become-leader transition for partition ${partition.topicPartition}") } partitionsToMakeLeaders }

makeFollowers方法

private def makeFollowers(controllerId: Int, // controller所在的broker id                            epoch: Int, // Controller Epoch值                            partitionStates: Map[Partition, LeaderAndIsrRequest.PartitionState], // 当前Broker是Follower副本的所有分区的详细信息                            correlationId: Int, // 连接请求与响应的关联字段                            responseMap: mutable.Map[TopicPartition, Errors] // 封装LeaderAndIsrRequest请求处理结果的字段                           ) : Set[Partition] = {
partitionStates.foreach {
case (partition, partitionState) => stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " + s"epoch $epoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + s"${partitionState.basePartitionState.leader}") } // 将所有分区的处理结果的状态初始化为Errors.NONE for (partition <- partitionStates.keys) responseMap.put(partition.topicPartition, Errors.NONE) val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set() try {
// 遍历partitionStates所有分区 partitionStates.foreach {
case (partition, partitionStateInfo) => val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader try {
// 在元数据缓存中找到Leader Broke对象 metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
case Some(_) => //当副本leader可用时,通过partition.makeFollower()方法进行设置leader,清空isr的工作 if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) partitionsToMakeFollower += partition else stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " + s"follower with correlation id $correlationId from controller $controllerId epoch $epoch " + s"for partition ${partition.topicPartition} (last update " + s"controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " + s"since the new leader $newLeaderBrokerId is the same as the old leader") case None => stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " + s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " + s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " + s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.") // 依然创建出分区Follower副本的日志对象 partition.getOrCreateReplica(localBrokerId, isNew = partitionStateInfo.isNew) } } catch {
case e: KafkaStorageException => stateChangeLogger.error(s"Skipped the become-follower state change with correlation id $correlationId from " + s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " + s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) with leader " + s"$newLeaderBrokerId since the replica for the partition is offline due to disk error $e") val dirOpt = getLogDir(partition.topicPartition) error(s"Error while making broker the follower for partition $partition with leader " + s"$newLeaderBrokerId in dir $dirOpt", e) responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR) } } // 移除现有Fetcher线程 replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition)) partitionsToMakeFollower.foreach {
partition => stateChangeLogger.trace(s"Stopped fetchers as part of become-follower request from controller $controllerId " + s"epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} with leader " + s"${partitionStates(partition).basePartitionState.leader}") } // 尝试完成延迟请求 partitionsToMakeFollower.foreach {
partition => val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition) tryCompleteDelayedProduce(topicPartitionOperationKey) tryCompleteDelayedFetch(topicPartitionOperationKey) } partitionsToMakeFollower.foreach {
partition => stateChangeLogger.trace(s"Truncated logs and checkpointed recovery boundaries for partition " + s"${partition.topicPartition} as part of become-follower request with correlation id $correlationId from " + s"controller $controllerId epoch $epoch with leader ${partitionStates(partition).basePartitionState.leader}") } if (isShuttingDown.get()) {
partitionsToMakeFollower.foreach {
partition => stateChangeLogger.trace(s"Skipped the adding-fetcher step of the become-follower state " + s"change with correlation id $correlationId from controller $controllerId epoch $epoch for " + s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader} " + "since it is shutting down") } } else {
// 为需要将当前Broker设置为Follower副本的分区 // 确定Leader Broker和起始读取位移值fetchOffset // we do not need to check if the leader exists again since this has been done at the beginning of this process val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map {
partition => val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get .brokerEndPoint(config.interBrokerListenerName) val fetchOffset = partition.localReplicaOrException.highWatermark.messageOffset partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset) }.toMap // 使用上一步确定的Leader Broker和fetchOffset添加新的Fetcher线程 replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) partitionsToMakeFollower.foreach {
partition => stateChangeLogger.trace(s"Started fetcher to new leader as part of become-follower " + s"request from controller $controllerId epoch $epoch with correlation id $correlationId for " + s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader}") } } } catch {
case e: Throwable => stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " + s"received from controller $controllerId epoch $epoch", e) throw e } partitionStates.keys.foreach {
partition => stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " + s"epoch $epoch for the become-follower transition for partition ${partition.topicPartition} with leader " + s"${partitionStates(partition).basePartitionState.leader}") } partitionsToMakeFollower }

3.ISR 管理

除了以上两个功能,副本管理器还有一个重要的功能,那就是管理 ISR。
主要体现在两个方法:

  • maybeShrinkIsr 方法:阶段性地查看 ISR 中的副本集合是否需要收缩。
  • maybePropagateIsrChanges方法:定期向集群 Broker 传播 ISR 的变更。

ReplicaManager startup方法中会创建两个定时线程,分别是isr-expiration和isr-change-propagation

def startup() {
scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS) scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS) ...}

maybeShrinkIsr会调用Partition类的maybeShrinkIsr方法来缩减ISR

private def maybeShrinkIsr(): Unit = {
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") // Shrink ISRs for non offline partitions allPartitions.keys.foreach {
topicPartition => nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs)) } }

Partition类maybeShrinkIsr方法会调用getOutOfSyncReplicas方法获取需要从ISR列表中移除的副本。

def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
case Some(leaderReplica) => // 获取需要从isr列表中移除的副本 val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs) if (outOfSyncReplicas.nonEmpty) {
// 从isr列表中移除副本 val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas assert(newInSyncReplicas.nonEmpty) info("Shrinking ISR from %s to %s. Leader: (highWatermark: %d, endOffset: %d). Out of sync replicas: %s." .format(inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","), leaderReplica.highWatermark.messageOffset, leaderReplica.logEndOffset, outOfSyncReplicas.map {
replica => s"(brokerId: ${replica.brokerId}, endOffset: ${replica.logEndOffset})" }.mkString(" ") ) ) // 更新isr列表缓存 updateIsr(newInSyncReplicas) replicaManager.isrShrinkRate.mark() maybeIncrementLeaderHW(leaderReplica) } else {
false } case None => false // do nothing if no longer leader } } if (leaderHWIncremented) tryCompleteDelayedRequests() }

getOutOfSyncReplicas判断延迟的副本的条件是当前时间减去上一次同步数据的时间大于replica.lag.time.max.ms参数的值。

def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
val candidateReplicas = inSyncReplicas - leaderReplica // 过滤延迟的副本,条件是当前时间减去上一次同步数据的时间大于maxLagMs // 如果副本长时间没有从leader副本拉取数据,则会被踢出isr val laggingReplicas = candidateReplicas.filter(r => r.logEndOffset != leaderReplica.logEndOffset && (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs) if (laggingReplicas.nonEmpty) debug("Lagging replicas are %s".format(laggingReplicas.map(_.brokerId).mkString(","))) laggingReplicas }

ISR 收缩之后,ReplicaManager 还需要将这个操作的结果传递给集群的其他 Broker,以同步这个操作的结果。这个是由maybePropagateIsrChanges完成的。

def maybePropagateIsrChanges() {
val now = System.currentTimeMillis() isrChangeSet synchronized {
// ISR变更传播的条件,需要同时满足: // 1. 存在尚未被传播的ISR变更 // 2. 最近5秒没有任何ISR变更,或者自上次ISR变更已经有超过1分钟的时间 if (isrChangeSet.nonEmpty && (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now || lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
// 创建ZooKeeper相应的Znode节点 zkClient.propagateIsrChanges(isrChangeSet) // 清空isrChangeSet集合 isrChangeSet.clear() // 更新最近ISR变更时间戳 lastIsrPropagationMs.set(now) } } }

转载地址:http://rrcmb.baihongyu.com/

你可能感兴趣的文章
【C语言】c/c++程序的内存是如何分配的?
查看>>
【C语言】深入理解C语言的函数调用过程
查看>>
【C语言】C语言中格式化字符的具体用法(C语言中%的那些事)
查看>>
【java】十大经典排序算法(动图演示)
查看>>
【代码规范】google开源c\c++项目代码规范
查看>>
【C语言】c语言常用的几个函数源代码【strlen,strcpy,strcat,strstr】
查看>>
【C语言】杨辉三角问题
查看>>
【C语言】size与strlen的区别解析
查看>>
【C语言】指针深入理解-指针与数组的关系
查看>>
【C语言】C语言中常用函数源代码【strncpy ,strncat ,strncmp】
查看>>
【linux】入门学习Linux常用必会命令实例详解
查看>>
【java】java高级开发之泛型
查看>>
【java】sting和stringbuilder与stringbuffer的区别辨析
查看>>
【Java】【算法练习】题目描述:输入一个整数数组,判断该数组是不是某二叉搜索树的后续遍历的结果。如果是输出yes,不是输出no,数组任意两个数字不相同。
查看>>
【Java】给定一个二叉树和其中的一个节点,请找出中序遍历的下一个节点且返回, 注意:树中的节点不仅包含左右子节点,同时包含父节点的指针。
查看>>
【Java】内存泄漏与内存溢出 学习总结
查看>>
【Java】命名规范
查看>>
【Java】—— java基础篇
查看>>
【Java】—— JVM篇
查看>>
【Java】【Linux】【操作系统】知识重点——操作系统篇
查看>>