41

Kafka源码阅读最最最简单的入门方法

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzI0NTIxNzE1Ng%3D%3D&%3Bmid=2651218594&%3Bidx=4&%3Bsn=88523400e8ad4522d82d18636d0f3361
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1 消息处理入口

以下是Kafka消息处理的入口,即客户端发送到服务端消息处理方法。

/**
* Top-level method that handles all requests and multiplexes to the right api
*/

def handle(request: RequestChannel.Request) {
try{
trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal))
request.requestId match {
case RequestKeys.ProduceKey => handleProducerRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)
case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
case e: Throwable =>
if ( request.requestObj != null)
request.requestObj.handleError(e, requestChannel, request)
else {
val response = request.body.getErrorResponse(request.header.apiVersion, e)
val respHeader = new ResponseHeader(request.header.correlationId)

/* If request doesn't have a default error response, we just close the connection.
For example, when produce request has acks set to 0 */

if (response == null)
requestChannel.closeConnection(request.processor, request)
else
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))
}
error("error when handling request %s".format(request.requestObj), e)
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}

2 内存中offset信息来源

/**
* The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log (and returns an error code).
*/

def getOffsets(group: String, topicPartitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
trace("Getting offsets %s for group %s.".format(topicPartitions, group))

if (isGroupLocal(group)) {
if (topicPartitions.isEmpty) {
// Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) =>
(groupTopicPartition.topicPartition, OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError))
}.toMap
} else {
topicPartitions.map { topicAndPartition =>
val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
(groupTopicPartition.topicPartition, getOffset(groupTopicPartition))
}.toMap
}
} else {
debug("Could not fetch offsets for group %s (not offset coordinator).".format(group))
topicPartitions.map { topicAndPartition =>
val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
(groupTopicPartition.topicPartition, OffsetMetadataAndError.NotCoordinatorForGroup)
}.toMap
}
}

从上面代码中可以看出,拉取的offset是从 offsetsCache 中获取。而在提交offset以及初始化group是会将对应的offset信息加入到该缓存中。

//该方法是在commitoffset中执行
/**
* Store offsets by appending it to the replicated log and then inserting to cache
*/

def prepareStoreOffsets(groupId: String,
consumerId: String,
generationId: Int,
offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicAndPartition, Short] => Unit): DelayedStore = {
// first filter out partitions with offset metadata size exceeding limit
val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) =>
validateOffsetMetadataLength(offsetAndMetadata.metadata)
}

// construct the message set to append
val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
new Message(
key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition),
bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
)
}.toSeq

val offsetTopicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId))

val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))

// set the callback function to insert offsets into cache after log append completed
def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
// the append response should only contain the topics partition
if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
throw new IllegalStateException("Append status %s should only have one partition %s"
.format(responseStatus, offsetTopicPartition))

// construct the commit response status and insert
// the offset and metadata to cache if the append status has no error
val status = responseStatus(offsetTopicPartition)

val responseCode =
if (status.error == ErrorMapping.NoError) {
filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
//将offset信息加入到缓存中
putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata)
}
ErrorMapping.NoError
} else {
debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s"
.format(filteredOffsetMetadata, groupId, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error)))

// transform the log append error code to the corresponding the commit status error code
if (status.error == ErrorMapping.UnknownTopicOrPartitionCode)
ErrorMapping.ConsumerCoordinatorNotAvailableCode
else if (status.error == ErrorMapping.NotLeaderForPartitionCode)
ErrorMapping.NotCoordinatorForConsumerCode
else if (status.error == ErrorMapping.MessageSizeTooLargeCode
|| status.error == ErrorMapping.MessageSetSizeTooLargeCode
|| status.error == ErrorMapping.InvalidFetchSizeCode)
Errors.INVALID_COMMIT_OFFSET_SIZE.code
else
status.error
}


// compute the final error codes for the commit response
val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
(topicAndPartition, responseCode)
else
(topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
}

// finally trigger the callback logic passed from the API layer
responseCallback(commitStatus)
}

DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback)
}

//该方法启动时异步执行
/**
* Asynchronously read the partition from the offsets topic and populate the cache
*/

def loadGroupsForPartition(offsetsPartition: Int,
onGroupLoaded: GroupMetadata => Unit) {
val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)

def loadGroupsAndOffsets() {
info("Loading offsets and group metadata from " + topicPartition)

loadingPartitions synchronized {
if (loadingPartitions.contains(offsetsPartition)) {
info("Offset load from %s already in progress.".format(topicPartition))
return
} else {
loadingPartitions.add(offsetsPartition)
}
}

val startMs = SystemTime.milliseconds
try {
replicaManager.logManager.getLog(topicPartition) match {
case Some(log) =>
var currOffset = log.logSegments.head.baseOffset
val buffer = ByteBuffer.allocate(config.loadBufferSize)
// loop breaks if leader changes at any time during the load, since getHighWatermark is -1
inWriteLock(offsetExpireLock) {
val loadedGroups = mutable.Map[String, GroupMetadata]()
val removedGroups = mutable.Set[String]()

while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
buffer.clear()
val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
messages.readInto(buffer, 0)
val messageSet = new ByteBufferMessageSet(buffer)
messageSet.foreach { msgAndOffset =>
require(msgAndOffset.message.key != null, "Offset entry key should not be null")
val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key)

if (baseKey.isInstanceOf[OffsetKey]) {
// load offset
val key = baseKey.key.asInstanceOf[GroupTopicPartition]
if (msgAndOffset.message.payload == null) {
if (offsetsCache.remove(key) != null)
trace("Removed offset for %s due to tombstone entry.".format(key))
else
trace("Ignoring redundant tombstone for %s.".format(key))
} else {
// special handling for version 0:
// set the expiration time stamp as commit time stamp + server default retention time
val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload)
//添加offset信息到缓存中
putOffset(key, value.copy (
expireTimestamp = {
if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
value.commitTimestamp + config.offsetsRetentionMs
else
value.expireTimestamp
}
))
trace("Loaded offset %s for %s.".format(value, key))
}
} else {
// load group metadata
val groupId = baseKey.key.asInstanceOf[String]
val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
if (groupMetadata != null) {
trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
removedGroups.remove(groupId)
loadedGroups.put(groupId, groupMetadata)
} else {
loadedGroups.remove(groupId)
removedGroups.add(groupId)
}
}

currOffset = msgAndOffset.nextOffset
}
}

loadedGroups.values.foreach { group =>
val currentGroup = addGroup(group)
if (group != currentGroup)
debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " +
s"because there is already a cached group with generation ${currentGroup.generationId}")
else
onGroupLoaded(group)
}

removedGroups.foreach { groupId =>
val group = groupsCache.get(groupId)
if (group != null)
throw new IllegalStateException(s"Unexpected unload of acitve group ${group.groupId} while " +
s"loading partition ${topicPartition}")
}
}

if (!shuttingDown.get())
info("Finished loading offsets from %s in %d milliseconds."
.format(topicPartition, SystemTime.milliseconds - startMs))
case None =>
warn("No log found for " + topicPartition)
}
}
catch {
case t: Throwable =>
error("Error in loading offsets from " + topicPartition, t)
}
finally {
loadingPartitions synchronized {
ownedPartitions.add(offsetsPartition)
loadingPartitions.remove(offsetsPartition)
}
}
}
}

3 Offset Commit实现

当消费端消费消息是会将offset提交,即offset提交信息,broker端把接收到的offset提交信息当做一个正常的生产请求,对offset请求的处理和正常的生产者请求处理方式是一样的。下面是内置生产者的一些属性:

  • property.type=sync

  • request.required.acks=-1

  • key.serializer.class=StringEncoder

一旦将数据追加到leader的本地日志中,并且所有replicas都赶上leader,leader检查生产请求是“offset topic”,(因为broker端的处理逻辑针对offset请求和普通生产请求是一样的,如果是offset请求,还需要有不同的处理分支),它就会要求offset manager添加这个offset(对于延迟的生产请求,更新操作会在延迟的生产请求被完成的时候)。因为设置了acks=-1,只有当这些offsets成功地复制到ISR的所有brokers,才会被提交给offset manager。

IV3imiy.jpg!web

4 Offset Fetch实现

消费端在启动时会向broker端请求offset信息,一个Offset请求中包含多个topic-partitions,在consumer客户端中根据缓存的metadata信息区分哪些partition到哪个broker上请求,在返回中会根据不同状态反馈,如当前broker正在加载offset,则返回Loading状态。

对”offsets topic”的某些partition而言,broker状态发生改变,即被当做partition的leader或者follower时,LeaderAndIsrRequest请求会被触发:

  • 如果broker是”offsets topic”中一些partitions的leader, broker会读取指定partition的logs文件,
    并将offsets加载到offset table缓存中.

  1. 任何对这些partition的提交请求仍然会更新offsets表.我们会防止日志文件中过期的offsets覆盖最近的提交请求的offsets.

  2. 被”offsets topic”中partition管理的offset抓取请求的keys直到加载成功之前是不会被使用的.
    broker会返回OffsetLoadingCode的OffsetFetchResponse给消费者客户端.

如果broker是follower: 和其他正常的kafka topic一样,follower会从leader中抓取数据.
由于follower的offset manager不再负责partitions,它们会在cleanup方法被调用时清理数据.

5 kafka的文件存储

在Kafka中,消息是按Topic组织的。

  • Partition:topic物理上的分组,一个topic可以划分为多个partition,每个partition是一个有序的队列。

  • Segment:partition物理上由多个segment组成

  • offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中每个消息都由一个连续的序列号叫做offset,用于partition唯一标识一条消息。

├── data0
│ ├── cleaner-offset-checkpoint
│ ├── client_mblogduration-35
│ │ ├── 00000000000004909731.index
│ │ ├── 00000000000004909731.log // 1G文件--Segment
│ │ ├── 00000000000005048975.index // 数字是Offset
│ │ ├── 00000000000005048975.log
│ ├── client_mblogduration-37
│ │ ├── 00000000000004955629.index
│ │ ├── 00000000000004955629.log
│ │ ├── 00000000000005098290.index
│ │ ├── 00000000000005098290.log
│ ├── __consumer_offsets-33
│ │ ├── 00000000000000105157.index
│ │ └── 00000000000000105157.log
│ ├── meta.properties
│ ├── recovery-point-offset-checkpoint
│ └── replication-offset-checkpoint

  • cleaner-offset-checkpoint:存了每个log的最后清理offset

  • meta.properties: broker.id信息

  • recovery-point-offset-checkpoint:表示已经刷写到磁盘的记录。recoveryPoint以下的数据都是已经刷到磁盘上的了。

  • replication-offset-checkpoint: 用来存储每个replica的HighWatermark的(high watermark (HW),表示已经被commited的message,HW以下的数据都是各个replicas间同步的,一致的。)

6 Leader和Follower同步机制

举例说明,我们假设有一个topic,单分区,副本因子是2,即一个leader副本和一个follower副本。我们看下当producer发送一条消息时,broker端的副本到底会发生什么事情以及分区HW是如何被更新的。

下图是初始状态,我们稍微解释一下:初始时leader和follower的HW和LEO都是0(严格来说源代码会初始化LEO为-1,不过这不影响之后的讨论)。leader中的remote LEO指的就是leader端保存的follower LEO,也被初始化成0。此时,producer没有发送任何消息给leader,而follower已经开始不断地给leader发送FETCH请求了,但因为没有数据因此什么都不会发生。值得一提的是,follower发送过来的FETCH请求因为无数据而暂时会被寄存到leader端的purgatory中,待500ms(replica.fetch.wait.max.ms参数)超时后会强制完成。倘若在寄存期间producer端发送过来数据,那么会Kafka会自动唤醒该FETCH请求,让leader继续处理之。

mMnaIfy.jpg!web

第一种情况:follower发送FETCH请求在leader处理完PRODUCE请求之后,producer给该topic分区发送了一条消息。此时的状态如下图所示:

YFBrmqJ.jpg!web

如图所示,leader接收到PRODUCE请求主要做两件事情:

  • 把消息写入写底层log(同时也就自动地更新了leader的LEO)

  • 尝试更新leader HW值(前面leader副本何时更新HW值一节中的第三个条件触发)。我们已经假设此时follower尚未发送FETCH请求,那么leader端保存的remote LEO依然是0,因此leader会比较它自己的LEO值和remote LEO值,发现最小值是0,与当前HW值相同,故不会更新分区HW值

所以,PRODUCE请求处理完成后leader端的HW值依然是0,而LEO是1,remote LEO是1。假设此时follower发送了FETCH请求(或者说follower早已发送了FETCH请求,只不过在broker的请求队列中排队),那么状态变更如下图所示:

JRVVVzr.jpg!web

本例中当follower发送FETCH请求时,leader端的处理依次是:

  • 读取底层log数据

  • 更新remote LEO = 0(为什么是0? 因为此时follower还没有写入这条消息。leader如何确认follower还未写入呢?这是通过follower发来的FETCH请求中的fetch offset来确定的)

  • 尝试更新分区HW——此时leader LEO = 1,remote LEO = 0,故分区HW值= min(leader LEO, follower remote LEO) = 0

  • 把数据和当前分区HW值(依然是0)发送给follower副本

而follower副本接收到FETCH response后依次执行下列操作:

  • 写入本地log(同时更新follower LEO)

  • 更新follower HW——比较本地LEO和当前leader HW取小者,故follower HW = 0

此时,第一轮FETCH RPC结束,我们会发现虽然leader和follower都已经在log中保存了这条消息,但分区HW值尚未被更新。实际上,它是在第二轮FETCH RPC中被更新的,如下图所示:

j6vYnue.jpg!web

上图中,follower发来了第二轮FETCH请求,leader端接收到后仍然会依次执行下列操作:

  • 读取底层log数据

  • 更新remote LEO = 1(这次为什么是1了? 因为这轮FETCH RPC携带的fetch offset是1,那么为什么这轮携带的就是1了呢,因为上一轮结束后follower LEO被更新为1了)

  • 尝试更新分区HW——此时leader LEO = 1,remote LEO = 1,故分区HW值= min(leader LEO, follower remote LEO) = 1。注意分区HW值此时被更新了!!!

  • 把数据(实际上没有数据)和当前分区HW值(已更新为1)发送给follower副本

同样地,follower副本接收到FETCH response后依次执行下列操作:

  • 写入本地log,当然没东西可写,故follower LEO也不会变化,依然是1

  • 更新follower HW——比较本地LEO和当前leader LEO取小者。由于此时两者都是1,故更新follower HW = 1

producer端发送消息后broker端完整的处理流程就讲完了。此时消息已经成功地被复制到leader和follower的log中且分区HW是1,表明consumer能够消费offset = 0的这条消息。

以上所有的东西其实就想说明一件事情:Kafka使用HW值来决定副本备份的进度,而HW值的更新通常需要额外一轮FETCH RPC才能完成,故而这种设计是有问题的。它们可能引起的问题包括:

  • 备份数据丢失

  • 备份数据不一致

这部分这里自己思考下就能够明白,不在赘述。

References

  • 1.Kafka水位(high watermark)与leader epoch的讨论

欢迎点赞+收藏+转发朋友圈素质三连

文章不错? 点个【 在看 】吧!   :point_down:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK