A: replicate
1: A message is considered \"committed\" when all in sync replicas for that partition have applied it to their log. Only committed messages are ever given out to the consumer. This means that the consumer need not worry about potentially seeing a message that could be lost if the leader fails. Producers, on the other hand, have the option of either waiting for the message to be committed or not, depending on their preference for tradeoff between latency and durability. This preference is controlled by the request.required.acks setting that the producer uses.
2: 多个follower 和leader之间数据交互是leader把数据push到follower上还是follower到leader上拉数据?
a)The fundamental guarantee a log replication algorithm must provide is that if we tell the client a message is committed, and the leader fails, the new leader we elect must also have that message. This yields a tradeoff: if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.
b)Producer在发布消息到某个Partition时,先通过Zookeeper找到该Partition的Leader,然后无论该Topic的 Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。 Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送 ACK。一旦Leader收到了ISR(i-sync replica)中的所有Replica的ACK,该消息就被认为已
经commit了,Leader将增加HW(是什么?)并且向 Producer发送ACK。
为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消 息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被 Consumer消费。但考虑到这种场景非常少见,可以认为这种方式在性能和数据持久化上做了一个比较好的平衡。(注意A中提到的:只有所有的备份中的数据写入log中,才会认为是commit的,而只有commit的message才会输出给consumer)
Consumer读消息也是从Leader读取,只有被commit过的消息(offset低于HW的消息)才会暴露给Consumer。
Kafka Replication的数据流如下图所示
3:kafka部署机制和分配Replica的算法如下:
个典型的部署方式是一个Topic的Partition数量大于Broker的数量
1. 将所有Broker(假设共n个Broker)和待分配的Partition排序
2. 将第i个Partition分配到第(i mod n)个Broker上
3. 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
4:ACK前需要保证有多少个备份
和大部分分布式系统一样,Kafka处理失败需要明 确定义一个Broker是否“活着”。对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与Zookeeper的session(这个通过 Zookeeper的Heartbeat机制来实现)。二是Follower必须能够及时将Leader的消息复制过来,不能“落后太多”。
Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。如果一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的 消息落后于Leader后的条数超过预定值(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.max.messages配置,其默认值是
4000)或者
Follower
超过一定时间(该值可在
$KAFKA_HOME/config/server.properties中通过replica.lag.time.max.ms来配置,其默认值是10000)未向Leader发送fetch请求。。
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的Follower都复制完,这条消息才会被认为 commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,Follower异步的从Leader复 制数据,数据只要被Leader写入log就被认为已经commit,这种
情况下如果Follower都复制完都落后于Leader,而如果Leader 突然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数 据,这样极大的提高复制性能(批量写磁盘),极大减少了Follower与Leader的差距。
需要说明的是,Kafka只解决 fail/recover,不处理“Byzantine”(“拜占庭”)问题。一条消息只有被ISR里的所有Follower都从Leader复制过去才 会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费 这些数据)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。
B: delivery guarantee
有这么几种可能的delivery guarantee:
• At most once 消息可能会丢,但绝不会重复传输
• At least one 消息绝不会丢,但可能会重复传输
• Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要
的。
当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果 Producer发送数据给broker后,遇到网络问题而造成通信
中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无 法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),这一Feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从 Producer到broker是确保了At least once,可通过设置Producer异步发送实现At most once)。
接下来讨论的是消息从broker到Consumer的delivery guarantee语义。(仅针对Kafka consumer high level API)。Consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中保存该Consumer在该 Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。如未commit,下一次读取 的开始位置会跟上一次commit之后的开始位置相同。当然可以将Consumer设置为autocommit,即Consumer一旦读到数据立即自动 commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际使用中应用程序并非在Consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消 息从broker和consumer的delivery guarantee semantic。
• 读完消息先commit再处理消息。这种模式下,如果Consumer在commit后还没
来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once
• 读完消息先处理再commit。这种模式下,如果在处理完消息之后commit之前
Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。在很多使用场景下,消息都有一个主键,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可
以认为是 Exactly once。(笔者认为这种说法比较牵强,毕竟它不是Kafka本身提供的机制,主键本身也并不能完全保证操作的幂等性。而且实际上我们说delivery guarantee 语义是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们不应该把处理过程的特性——如是否幂等性,当成Kafka本身的Feature)
• 如果一定要做到Exactly once,就需要协调offset和实际操作的输出。精典的做法
是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种 方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,Consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一 起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)
总之,Kafka默认保证At least once,并且允许通过设置Producer异步提交来实现At most once。而Exactly once要求与外部存储系统协作,幸运的是Kafka提供的offset可以非常直接非常容易得使用这种方式。
因篇幅问题不能全部显示,请点此查看更多更全内容