[转]-分布式系统的事务处理

当我们在生产线上用一台服务器来提供数据服务的时候,我会遇到如下的两个问题:

1)一台服务器的性能不足以提供足够的能力服务于所有的网络请求。

2)我们总是害怕我们的这台服务器停机,造成服务不可用或是数据丢失。

于是我们不得不对我们的服务器进行扩展,加入更多的机器来分担性能上的问题,以及来解决单点故障问题。 通常,我们会通过两种手段来扩展我们的数据服务:

1)数据分区:就是把数据分块放在不同的服务器上(如:uid % 16,一致性哈希等)。

2)数据镜像:让所有的服务器都有相同的数据,提供相当的服务。

对于第一种情况,我们无法解决数据丢失的问题,单台服务器出问题时,会有部分数据丢失。所以,数据服务的高可用性只能通过第二种方法来完成——数据的冗余存储(一般工业界认为比较安全的备份数应该是3份,如:Hadoop和Dynamo)。 但是,加入更多的机器,会让我们的数据服务变得很复杂,尤其是跨服务器的事务处理,也就是跨服务器的数据一致性。这个是一个很难的问题。 让我们用最经典的Use Case:“A帐号向B帐号汇钱”来说明一下,熟悉RDBMS事务的都知道从帐号A到帐号B需要6个操作:

  1. 从A帐号中把余额读出来。
  2. 对A帐号做减法操作。
  3. 把结果写回A帐号中。
  4. 从B帐号中把余额读出来。
  5. 对B帐号做加法操作。
  6. 把结果写回B帐号中。

为了数据的一致性,这6件事,要么都成功做完,要么都不成功,而且这个操作的过程中,对A、B帐号的其它访问必需锁死,所谓锁死就是要排除其它的读写操作,不然会有脏数据的问题,这就是事务。那么,我们在加入了更多的机器后,这个事情会变得复杂起来:

1)在数据分区的方案中:如果A帐号和B帐号的数据不在同一台服务器上怎么办?我们需要一个跨机器的事务处理。也就是说,如果A的扣钱成功了,但B的加钱不成功,我们还要把A的操作给回滚回去。这在跨机器的情况下,就变得比较复杂了。

2)在数据镜像的方案中:A帐号和B帐号间的汇款是可以在一台机器上完成的,但是别忘了我们有多台机器存在A帐号和B帐号的副本。如果对A帐号的汇钱有两个并发操作(要汇给B和C),这两个操作发生在不同的两台服务器上怎么办?也就是说,在数据镜像中,在不同的服务器上对同一个数据的写操作怎么保证其一致性,保证数据不冲突?

同时,我们还要考虑性能的因素,如果不考虑性能的话,事务得到保证并不困难,系统慢一点就行了。除了考虑性能外,我们还要考虑可用性,也就是说,一台机器没了,数据不丢失,服务可由别的机器继续提供。 于是,我们需要重点考虑下面的这么几个情况:

1)容灾:数据不丢、结点的Failover

2)数据的一致性:事务处理

3)性能:吞吐量 、 响应时间

前面说过,要解决数据不丢,只能通过数据冗余的方法,就算是数据分区,每个区也需要进行数据冗余处理。这就是数据副本:当出现某个节点的数据丢失时可以从副本读到,数据副本是分布式系统解决数据丢失异常的唯一手段。所以,在这篇文章中,简单起见,我们只讨论在数据冗余情况下考虑数据的一致性和性能的问题。简单说来:

1)要想让数据有高可用性,就得写多份数据。

2)写多份的问题会导致数据一致性的问题。

3)数据一致性的问题又会引发性能问题

这就是软件开发,按下了葫芦起了瓢。

一致性模型

说起数据一致性来说,简单说有三种类型(当然,如果细分的话,还有很多一致性模型,如:顺序一致性,FIFO一致性,会话一致性,单读一致性,单写一致性,但为了本文的简单易读,我只说下面三种):

1)Weak 弱一致性:当你写入一个新值后,读操作在数据副本上可能读出来,也可能读不出来。比如:某些cache系统,网络游戏其它玩家的数据和你没什么关系,VOIP这样的系统,或是百度搜索引擎(呵呵)。

2)Eventually 最终一致性:当你写入一个新值后,有可能读不出来,但在某个时间窗口之后保证最终能读出来。比如:DNS,电子邮件、Amazon S3,Google搜索引擎这样的系统。

3)Strong 强一致性:新的数据一旦写入,在任意副本任意时刻都能读到新值。比如:文件系统,RDBMS,Azure Table都是强一致性的。

从这三种一致型的模型上来说,我们可以看到,Weak和Eventually一般来说是异步冗余的,而Strong一般来说是同步冗余的,异步的通常意味着更好的性能,但也意味着更复杂的状态控制。同步意味着简单,但也意味着性能下降。 好,让我们由浅入深,一步一步地来看有哪些技术:

Master-Slave

首先是Master-Slave结构,对于这种加构,Slave一般是Master的备份。在这样的系统中,一般是如下设计的:

1)读写请求都由Master负责。

2)写请求写到Master上后,由Master同步到Slave上。

从Master同步到Slave上,你可以使用异步,也可以使用同步,可以使用Master来push,也可以使用Slave来pull。 通常来说是Slave来周期性的pull,所以,是最终一致性。这个设计的问题是,如果Master在pull周期内垮掉了,那么会导致这个时间片内的数据丢失。如果你不想让数据丢掉,Slave只能成为Read-Only的方式等Master恢复。

当然,如果你可以容忍数据丢掉的话,你可以马上让Slave代替Master工作(对于只负责计算的结点来说,没有数据一致性和数据丢失的问题,Master-Slave的方式就可以解决单点问题了) 当然,Master Slave也可以是强一致性的, 比如:当我们写Master的时候,Master负责先写自己,等成功后,再写Slave,两者都成功后返回成功,整个过程是同步的,如果写Slave失败了,那么两种方法,一种是标记Slave不可用报错并继续服务(等Slave恢复后同步Master的数据,可以有多个Slave,这样少一个,还有备份,就像前面说的写三份那样),另一种是回滚自己并返回写失败。(注:一般不先写Slave,因为如果写Master自己失败后,还要回滚Slave,此时如果回滚Slave失败,就得手工订正数据了)你可以看到,如果Master-Slave需要做成强一致性有多复杂。

Master-Master

Master-Master,又叫Multi-master[1],是指一个系统存在两个或多个Master,每个Master都提供read-write服务。这个模型是Master-Slave的加强版,数据间同步一般是通过Master间的异步完成,所以是最终一致性。 Master-Master的好处是,一台Master挂了,别的Master可以正常做读写服务,他和Master-Slave一样,当数据没有被复制到别的Master上时,数据会丢失。很多数据库都支持Master-Master的Replication的机制。

另外,如果多个Master对同一个数据进行修改的时候,这个模型的恶梦就出现了——对数据间的冲突合并,这并不是一件容易的事情。看看Dynamo的Vector Clock的设计(记录数据的版本号和修改者)就知道这个事并不那么简单,而且Dynamo对数据冲突这个事是交给用户自己搞的。就像我们的SVN源码冲突一样,对于同一行代码的冲突,只能交给开发者自己来处理。(在本文后后面会讨论一下Dynamo的Vector Clock)

Two/Three Phase Commit

这个协议的缩写又叫2PC,中文叫两阶段提交。在分布式系统中,每个节点虽然可以知晓自己的操作时成功或者失败,却无法知道其他节点的操作的成功或失败。当一个事务跨越多个节点时,为了保持事务的ACID特性,需要引入一个作为协调者的组件来统一掌控所有节点(称作参与者)的操作结果并最终指示这些节点是否要把操作结果进行真正的提交(比如将更新后的数据写入磁盘等等)。 两阶段提交的算法如下:

第一阶段

  1. 协调者会问所有的参与者结点,是否可以执行提交操作。
  2. 各个参与者开始事务执行的准备工作:如:为资源上锁,预留资源,写undo/redo log……
  3. 参与者响应协调者,如果事务的准备工作成功,则回应“可以提交”,否则回应“拒绝提交”。

第二阶段

  • 如果所有的参与者都回应“可以提交”,那么,协调者向所有的参与者发送“正式提交”的命令。参与者完成正式提交,并释放所有资源,然后回应“完成”,协调者收集各结点的“完成”回应后结束这个Global Transaction。
  • 如果有一个参与者回应“拒绝提交”,那么,协调者向所有的参与者发送“回滚操作”,并释放所有资源,然后回应“回滚完成”,协调者收集各结点的“回滚”回应后,取消这个Global Transaction。

我们可以看到,2PC说白了就是第一阶段做Vote,第二阶段做决定的一个算法,也可以看到2PC这个事是强一致性的算法。在前面我们讨论过Master-Slave的强一致性策略,和2PC有点相似,只不过2PC更为保守一些——先尝试再提交。 2PC用的是比较多的,在一些系统设计中,会串联一系列的调用,比如:A -> B -> C -> D,每一步都会分配一些资源或改写一些数据。比如我们B2C网上购物的下单操作在后台会有一系列的流程需要做。如果我们一步一步地做,就会出现这样的问题,如果某一步做不下去了,那么前面每一次所分配的资源需要做反向操作把他们都回收掉,所以,操作起来比较复杂。现在很多处理流程(Workflow)都会借鉴2PC这个算法,使用 try -> confirm的流程来确保整个流程的能够成功完成。 举个通俗的例子,西方教堂结婚的时候,都有这样的桥段:

1)牧师分别问新郎和新娘:你是否愿意……不管生老病死……(询问阶段)

2)当新郎和新娘都回答愿意后(锁定一生的资源),牧师就会说:我宣布你们……(事务提交)

这是多么经典的一个两阶段提交的事务处理。 另外,我们也可以看到其中的一些问题, A)其中一个是同步阻塞操作,这个事情必然会非常大地影响性能。 B)另一个主要的问题是在TimeOut上,比如,

1)如果第一阶段中,参与者没有收到询问请求,或是参与者的回应没有到达协调者。那么,需要协调者做超时处理,一旦超时,可以当作失败,也可以重试。

2)如果第二阶段中,正式提交发出后,如果有的参与者没有收到,或是参与者提交/回滚后的确认信息没有返回,一旦参与者的回应超时,要么重试,要么把那个参与者标记为问题结点剔除整个集群,这样可以保证服务结点都是数据一致性的。

3)糟糕的情况是,第二阶段中,如果参与者收不到协调者的commit/fallback指令,参与者将处于“状态未知”阶段,参与者完全不知道要怎么办,比如:如果所有的参与者完成第一阶段的回复后(可能全部yes,可能全部no,可能部分yes部分no),如果协调者在这个时候挂掉了。那么所有的结点完全不知道怎么办(问别的参与者都不行)。为了一致性,要么死等协调者,要么重发第一阶段的yes/no命令。

两段提交最大的问题就是第3)项,如果第一阶段完成后,参与者在第二阶没有收到决策,那么数据结点会进入“不知所措”的状态,这个状态会block住整个事务。也就是说,协调者Coordinator对于事务的完成非常重要,Coordinator的可用性是个关键。 因些,我们引入三段提交,三段提交在Wikipedia[2]上的描述如下,他把二段提交的第一个段break成了两段:询问,然后再锁资源。最后真正提交。三段提交的示意图如下:

三段提交的核心理念是:在询问的时候并不锁定资源,除非所有人都同意了,才开始锁资源

理论上来说,如果第一阶段所有的结点返回成功,那么有理由相信成功提交的概率很大。这样一来,可以降低参与者Cohorts的状态未知的概率。也就是说,一旦参与者收到了PreCommit,意味他知道大家其实都同意修改了。这一点很重要。下面我们来看一下3PC的状态迁移图:(注意图中的虚线,那些F,T是Failuer或Timeout,其中的:状态含义是 q – Query,a – Abort,w – Wait,p – PreCommit,c – Commit)

从上图的状态变化图我们可以从虚线(那些F,T是Failuer或Timeout)看到——如果结点处在P状态(PreCommit)的时候发生了F/T的问题,三段提交比两段提交的好处是,三段提交可以继续直接把状态变成C状态(Commit),而两段提交则不知所措

其实,三段提交是一个很复杂的事情,实现起来相当难,而且也有一些问题。

看到这里,我相信你有很多很多的问题,你一定在思考2PC/3PC中各种各样的失败场景,你会发现Timeout是个非常难处理的事情,因为网络上的Timeout在很多时候让你无所事从,你也不知道对方是做了还是没有做。于是你好好的一个状态机就因为Timeout成了个摆设

一个网络服务会有三种状态:1)Success,2)Failure,3)Timeout,第三个绝对是恶梦,尤其在你需要维护状态的时候

Two Generals Problem(两将军问题)

Two Generals Problem[3] 两将军问题是这么一个思维性实验问题: 有两支军队,它们分别有一位将军领导,现在准备攻击一座修筑了防御工事的城市。这两支军队都驻扎在那座城市的附近,分占一座山头。一道山谷把两座山分隔开来,并且两位将军唯一的通信方式就是派各自的信使来往于山谷两边。不幸的是,这个山谷已经被那座城市的保卫者占领,并且存在一种可能,那就是任何被派出的信使通过山谷是会被捕。 请注意,虽然两位将军已经就攻击那座城市达成共识,但在他们各自占领山头阵地之前,并没有就进攻时间达成共识。两位将军必须让自己的军队同时进攻城市才能取得成功。因此,他们必须互相沟通,以确定一个时间来攻击,并同意就在那时攻击。如果只有一个将军进行攻击,那么这将是一个灾难性的失败。 这个思维实验就包括考虑他们如何去做这件事情。下面是我们的思考:

1)第一位将军先发送一段消息“让我们在上午9点开始进攻”。然而,一旦信使被派遣,他是否通过了山谷,第一位将军就不得而知了。任何一点的不确定性都会使得第一位将军攻击犹豫,因为如果第二位将军不能在同一时刻发动攻击,那座城市的驻军就会击退他的军队的进攻,导致他的军对被摧毁。

2)知道了这一点,第二位将军就需要发送一个确认回条:“我收到您的邮件,并会在9点的攻击。”但是,如果带着确认消息的信使被抓怎么办?所以第二位将军会犹豫自己的确认消息是否能到达。

3)于是,似乎我们还要让第一位将军再发送一条确认消息——“我收到了你的确认”。然而,如果这位信使被抓怎么办呢?

4)这样一来,是不是我们还要第二位将军发送一个“确认收到你的确认”的信息。

靠,于是你会发现,这事情很快就发展成为不管发送多少个确认消息,都没有办法来保证两位将军有足够的自信自己的信使没有被敌军捕获。

这个问题是无解的。两个将军问题和它的无解证明首先由E.A.Akkoyunlu,K.Ekanadham和R.V.Huber于1975年在《一些限制与折衷的网络通信设计》一文中发表,就在这篇文章的第73页中一段描述两个黑帮之间的通信中被阐明。 1978年,在Jim Gray的《数据库操作系统注意事项》一书中(从第465页开始)被命名为两个将军悖论。作为两个将军问题的定义和无解性的证明的来源,这一参考被广泛提及。

这个实验意在阐明:试图通过建立在一个不可靠的连接上的交流来协调一项行动的隐患和设计上的巨大挑战。

从工程上来说,一个解决两个将军问题的实际方法是使用一个能够承受通信信道不可靠性的方案,并不试图去消除这个不可靠性,但要将不可靠性削减到一个可以接受的程度。比如,第一位将军排出了100位信使并预计他们都被捕的可能性很小。在这种情况下,不管第二位将军是否会攻击或者受到任何消息,第一位将军都会进行攻击。另外,第一位将军可以发送一个消息流,而第二位将军可以对其中的每一条消息发送一个确认消息,这样如果每条消息都被接收到,两位将军会感觉更好。然而我们可以从证明中看出,他们俩都不能肯定这个攻击是可以协调的。他们没有算法可用(比如,收到4条以上的消息就攻击)能够确保防止仅有一方攻击。再者,第一位将军还可以为每条消息编号,说这是1号,2号……直到n号。这种方法能让第二位将军知道通信信道到底有多可靠,并且返回合适的数量的消息来确保最后一条消息被接收到。如果信道是可靠的话,只要一条消息就行了,其余的就帮不上什么忙了。最后一条和第一条消息丢失的概率是相等的。

两将军问题可以扩展成更变态的拜占庭将军问题 (Byzantine Generals Problem),其故事背景是这样的:拜占庭位于现在土耳其的伊斯坦布尔,是东罗马帝国的首都。由于当时拜占庭罗马帝国国土辽阔,为了防御目的,因此每个军队都分隔很远,将军与将军之间只能靠信差传消息。 在战争的时候,拜占庭军队内所有将军必需达成一致的共识,决定是否有赢的机会才去攻打敌人的阵营。但是,军队可能有叛徒和敌军间谍,这些叛徒将军们会扰乱或左右决策的过程。这时候,在已知有成员谋反的情况下,其余忠诚的将军在不受叛徒的影响下如何达成一致的协议,这就是拜占庭将军问题。

Paxos算法

Wikipedia上的各种Paxos算法[4]的描述非常详细,大家可以去围观一下。

Paxos 算法解决的问题是在一个可能发生上述异常的分布式系统中如何就某个值达成一致,保证不论发生以上任何异常,都不会破坏决议的一致性。一个典型的场景是,在一个分布式数据库系统中,如果各节点的初始状态一致,每个节点都执行相同的操作序列,那么他们最后能得到一个一致的状态。为保证每个节点执行相同的命令序列,需要在每一条指令上执行一个「一致性算法」以保证每个节点看到的指令一致。一个通用的一致性算法可以应用在许多场景中,是分布式计算中的重要问题。从20世纪80年代起对于一致性算法的研究就没有停止过。

Notes:Paxos算法是莱斯利·兰伯特(Leslie Lamport,就是 LaTeX 中的”La”,此人现在在微软研究院)于1990年提出的一种基于消息传递的一致性算法。由于算法难以理解起初并没有引起人们的重视,使Lamport在八年后1998年重新发表到ACM Transactions on Computer Systems上(The Part-Time Parliament[5])。即便如此paxos算法还是没有得到重视,2001年Lamport 觉得同行无法接受他的幽默感,于是用容易接受的方法重新表述了一遍(Paxos Made Simple[6])。可见Lamport对Paxos算法情有独钟。近几年Paxos算法的普遍使用也证明它在分布式一致性算法中的重要地位。2006年Google的三篇论文初现“云”的端倪,其中的Chubby Lock服务使用Paxos作为Chubby Cell中的一致性算法,Paxos的人气从此一路狂飙。(Lamport 本人在 他的blog 中描写了他用9年时间发表这个算法的前前后后)

注:Amazon的AWS中,所有的云服务都基于一个ALF(Async Lock Framework)的框架实现的,这个ALF用的就是Paxos算法。我在Amazon的时候,看内部的分享视频时,设计者在内部的Principle Talk里说他参考了ZooKeeper的方法,但他用了另一种比ZooKeeper更易读的方式实现了这个算法。

简单说来,Paxos的目的是让整个集群的结点对某个值的变更达成一致。Paxos算法基本上来说是个民主选举的算法——大多数的决定会成个整个集群的统一决定。任何一个点都可以提出要修改某个数据的提案,是否通过这个提案取决于这个集群中是否有超过半数的结点同意(所以Paxos算法需要集群中的结点是单数)。

这个算法有两个阶段(假设这个有三个结点:A,B,C):

第一阶段:Prepare阶段

A把申请修改的请求Prepare Request发给所有的结点A,B,C。注意,Paxos算法会有一个Sequence Number(你可以认为是一个提案号,这个数不断递增,而且是唯一的,也就是说A和B不可能有相同的提案号),这个提案号会和修改请求一同发出,任何结点在“Prepare阶段”时都会拒绝其值小于当前提案号的请求。所以,结点A在向所有结点申请修改请求的时候,需要带一个提案号,越新的提案,这个提案号就越是是最大的。

如果接收结点收到的提案号n大于其它结点发过来的提案号,这个结点会回应Yes(本结点上最新的被批准提案号),并保证不接收其它<n的提案。这样一来,结点上在Prepare阶段里总是会对最新的提案做承诺。

优化:在上述 prepare 过程中,如果任何一个结点发现存在一个更高编号的提案,则需要通知 提案人,提醒其中断这次提案。

第二阶段:Accept阶段

如果提案者A收到了超过半数的结点返回的Yes,然后他就会向所有的结点发布Accept Request(同样,需要带上提案号n),如果没有超过半数的话,那就返回失败。

当结点们收到了Accept Request后,如果对于接收的结点来说,n是最大的了,那么,它就会修改这个值,如果发现自己有一个更大的提案号,那么,结点就会拒绝修改。

我们可以看以,这似乎就是一个“两段提交”的优化。其实,2PC/3PC都是分布式一致性算法的残次版本,Google Chubby的作者Mike Burrows说过这个世界上只有一种一致性算法,那就是Paxos,其它的算法都是残次品。

我们还可以看到:对于同一个值的在不同结点的修改提案就算是在接收方被乱序收到也是没有问题的。

关于一些实例,你可以看一下Wikipedia中文中的“Paxos样例”一节,我在这里就不再多说了。对于Paxos算法中的一些异常示例,大家可以自己推导一下。你会发现基本上来说只要保证有半数以上的结点存活,就没有什么问题。

多说一下,自从Lamport在1998年发表Paxos算法后,对Paxos的各种改进工作就从未停止,其中动作最大的莫过于2005年发表的Fast Paxos[7]。无论何种改进,其重点依然是在消息延迟与性能、吞吐量之间作出各种权衡。为了容易地从概念上区分二者,称前者Classic Paxos,改进后的后者为Fast Paxos。

前面,我们说过,要想让数据有高可用性,就需要冗余数据写多份。写多份的问题会带来一致性的问题,而一致性的问题又会带来性能问题。从上图我们可以看到,我们基本上来说不可以让所有的项都绿起来,这就是著名的CAP理论:一致性,可用性,分区容忍性,你只可能要其中的两个。

NWR模型

最后我还想提一下Amazon Dynamo的NWR模型。这个NWR模型把CAP的选择权交给了用户,让用户自己的选择你的CAP中的哪两个

所谓NWR模型。N代表N个备份,W代表要写入至少W份才认为成功,R表示至少读取R个备份。配置的时候要求W+R > N。 因为W+R > N, 所以 R > N-W 这个是什么意思呢?就是读取的份数一定要比总备份数减去确保写成功的倍数的差值要大。

也就是说,每次读取,都至少读取到一个最新的版本。从而不会读到一份旧数据。当我们需要高可写的环境的时候,我们可以配置W = 1 如果N=3 那么R = 3。 这个时候只要写任何节点成功就认为成功,但是读的时候必须从所有的节点都读出数据。如果我们要求读的高效率,我们可以配置 W=N R=1。这个时候任何一个节点读成功就认为成功,但是写的时候必须写所有三个节点成功才认为成功。

NWR模型的一些设置会造成脏数据的问题,因为这很明显不是像Paxos一样是一个强一致的东西,所以,可能每次的读写操作都不在同一个结点上,于是会出现一些结点上的数据并不是最新版本,但却进行了最新的操作。

所以,Amazon Dynamo引了数据版本的设计。也就是说,如果你读出来数据的版本是v1,当你计算完成后要回填数据后,却发现数据的版本号已经被人更新成了v2,那么服务器就会拒绝你。版本这个事就像“乐观锁”一样。

但是,对于分布式和NWR模型来说,版本也会有恶梦的时候——就是版本冲的问题,比如:我们设置了N=3 W=1,如果A结点上接受了一个值,版本由v1 -> v2,但还没有来得及同步到结点B上(异步的,应该W=1,写一份就算成功),B结点上还是v1版本,此时,B结点接到写请求,按道理来说,他需要拒绝掉,但是他一方面并不知道别的结点已经被更新到v2,另一方面他也无法拒绝,因为W=1,所以写一分就成功了。于是,出现了严重的版本冲突。

Amazon的Dynamo把版本冲突这个问题巧妙地回避掉了——版本冲这个事交给用户自己来处理。

于是,Dynamo引入了Vector Clock(矢量钟?!)这个设计。这个设计让每个结点各自记录自己的版本信息,也就是说,对于同一个数据,需要记录两个事:1)谁更新的我,2)我的版本号是什么。

下面,我们来看一个操作序列:

1)一个写请求,第一次被节点A处理了。节点A会增加一个版本信息(A,1)。我们把这个时候的数据记做D1(A,1)。 然后另外一个对同样key的请求还是被A处理了于是有D2(A,2)。这个时候,D2是可以覆盖D1的,不会有冲突产生。

2)现在我们假设D2传播到了所有节点(B和C),B和C收到的数据不是从客户产生的,而是别人复制给他们的,所以他们不产生新的版本信息,所以现在B和C所持有的数据还是D2(A,2)。于是A,B,C上的数据及其版本号都是一样的。

3)如果我们有一个新的写请求到了B结点上,于是B结点生成数据D3(A,2; B,1),意思是:数据D全局版本号为3,A升了两新,B升了一次。这不就是所谓的代码版本的log么?

4)如果D3没有传播到C的时候又一个请求被C处理了,于是,以C结点上的数据是D4(A,2; C,1)。

5)好,最精彩的事情来了:如果这个时候来了一个读请求,我们要记得,我们的W=1 那么R=N=3,所以R会从所有三个节点上读,此时,他会读到三个版本:

    • A结点:D2(A,2)
    • B结点:D3(A,2;  B,1);
    • C结点:D4(A,2;  C,1)

6)这个时候可以判断出,D2已经是旧版本(已经包含在D3/D4中),可以舍弃。

7)但是D3和D4是明显的版本冲突。于是,交给调用方自己去做版本冲突处理。就像源代码版本管理一样。

很明显,上述的Dynamo的配置用的是CAP里的A和P。

我非常推大家都去看看这篇论文:《Dynamo:Amazon’s Highly Available Key-Value Store[8]》,如果英文痛苦,你可以看看译文[9](译者不详)。

(全文完)

(转载自 酷 壳 – CoolShell.cn 分布式系统的事务处理[10] ,请勿用于任何商业用途)

Posted in 系统架构 | Leave a comment

Announcing Snowflake

A while back we announced on our API developers list that we would change the way we generate unique ID numbers for tweets.

While we’re not quite ready to make this change, we’ve been hard at work on Snowflake which is the internal service to generate these ids. To give everyone a chance to familiarize themselves with the techniques we’re employing and how it’ll affect anyone building on top of the Twitter platform we are open sourcing the Snowflake code base today.

Before I go further, let me provide some context.

The Problem
We currently use MySQL to store most of our online data. In the beginning, the data was in one small database instance which in turn became one large database instance and eventually many large database clusters. For various reasons, the details of which merit a whole blog post, we’re working to replace many of these systems with the Cassandra distributed database or horizontally sharded MySQL (using gizzard).

Unlike MySQL, Cassandra has no built-in way of generating unique ids – nor should it, since at the scale where Cassandra becomes interesting, it would be difficult to provide a one-size-fits-all solution for ids. Same goes for sharded MySQL.

Our requirements for this system were pretty simple, yet demanding:

We needed something that could generate tens of thousands of ids per second in a highly available manner. This naturally led us to choose an uncoordinated approach.

These ids need to be roughly sortable, meaning that if tweets A and B are posted around the same time, they should have ids in close proximity to one another since this is how we and most Twitter clients sort tweets.[1]

Additionally, these numbers have to fit into 64 bits. We’ve been through the painful process of growing the number of bits used to store tweet ids before. It’s unsurprisingly hard to do when you have over 100,000 different codebases involved.

Options
We considered a number of approaches: MySQL-based ticket servers (like flickr uses), but those didn’t give us the ordering guarantees we needed without building some sort of re-syncing routine. We also considered various UUIDs, but all the schemes we could find required 128 bits. After that we looked at Zookeeper sequential nodes, but were unable to get the performance characteristics we needed and we feared that the coordinated approach would lower our availability for no real payoff.

Solution
To generate the roughly-sorted 64 bit ids in an uncoordinated manner, we settled on a composition of: timestamp, worker number and sequence number.

64 位大致排序的数字,由时间戳、节点号和序列编号组成。

Sequence numbers are per-thread and worker numbers are chosen at startup via zookeeper (though that’s overridable via a config file).

序列编号是每个节点(每个线程和进程)本地启动时选择的序号(节点号),而节点号则由 ZooKeeper 维护(尽管可以通过配置文件覆盖)

We encourage you to peruse and play with the code: you’ll find it on github. Please remember, however, that it is currently alpha-quality software that we aren’t yet running in production and is very likely to change.

还没有在生产上使用 Tuesday, June 1, 2010 | By Ryan King (@rk) [22:33 UTC],不知道现在在生产上使用了没

Feedback
If you find bugs, please report them on github. If you are having trouble understanding something, come ask in the #twinfra IRC channel on freenode. If you find anything that you think may be a security problem, please email security@twitter.com (and cc myself: ryan@twitter.com).

[1] In mathematical terms, although the tweets will no longer be sorted, they will be k-sorted. We’re aiming to keep our k below 1 second, meaning that tweets posted within a second of one another will be within a second of one another in the id space too.

转自:Announcing Snowflake

Posted in 数据结构与算法, 系统架构 | Leave a comment

maven中设置代理服务器

开发中经常遇到国外的maven服务器无法访问,还有就是服务器没有外网地址,这将依赖包无法下载,解决这个问题很是简单,只需要设置代理服务器就可以了,让maven使用代理服务器去访问库服务。

第一步:需要找一个代理服务器(废话)
第二步:修改(没有的话添加)~/.m2/settings.xml
比如

<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
                      http://maven.apache.org/xsd/settings-1.0.0.xsd">
    <localRepository/>
    <interactiveMode/>
    <usePluginRegistry/>
    <offline/>
    <pluginGroups/>
    <servers/>
    <mirrors/>
    <proxies>
        <proxy>
            <id>ssh</id>
            <active>true</active>
            <protocol>http</protocol>
            <username></username>
            <password></password>
            <host>127.0.0.1</host>
            <port>8118</port>
            <nonProxyHosts>localhost:127.0.0.1</nonProxyHosts>
        </proxy>
        <proxy>
            <id>ssh</id>
            <active>true</active>
            <protocol>https</protocol>
            <username></username>
            <password></password>
            <host>127.0.0.1</host>
            <port>8118</port>
            <nonProxyHosts>localhost:127.0.0.1</nonProxyHosts>
        </proxy>
    </proxies>
    <profiles/>
    <activeProfiles/>
</settings>

第三步:使用
/data/bin/apache-maven-3.1.1/bin/mvn -Djavax.net.ssl.trustStore=/home/jiaguotian/.m2/jssecacerts -s /home/jiaguotian/_m2/settings2.xml -DskipTests=true clean install

第四步:查看依赖库下载是否正常

[INFO] ------------------------------------------------------------------------
Downloading: http://maven.twttr.com/com/twitter/maven-finagle-thrift-plugin/0.0.7/maven-finagle-thrift-plugin-0.0.7.jar
500/15012 KB   

Posted in 猿の生活 | Leave a comment

Java 类的加载、链接和初始化

一个JAVA类从字节代码到能够被JVM中使用,需要经过加载、链接和初始化的步骤,通过类加载器可以动态的去加载JAVA类,但是链接和初始化是在JAVA类之前会发生的动作,考虑一个事情,如果一个类一开始没有被使用(只是声明了一个变量,并没有创建其实例或者是访问其中的静态域),然后我们把编译好的类的字节代码删除,再起一个LOADER去加载另一个类,这个类使用刚删除字节码的类,那么程序这时会抛出异常,因为被删除的类真正被使用到了,这人之前在hadoop的一个项目中遇到过,后来采用访问静态域的方法,去让JVM初始化它。

我们看一下加载器的分类:启动加载器(JVM原生代码实现)和用户自定义加载器(继承自java.lang.ClassLoader类)。JVM中最常使用到的是系统类加载器,它用来启动JAVA应用程序,在上一个BLOG中我们就是用system类加载器去完成字节码到加载的。我们可以通过java.lang.ClassLoader.getSystemClassLoader()方法获取它。

层次结构:每个类加载器都有一个父类加载器(getParent()得到)
代理模式:类加载器即可以自己完成类的加载工作,也可以人把这个任务代理给其它类加载器去做。(初始类加载器,定义类加载器)比如类A中IMPORT了类B,那么类A的定义类加载器负责启动类B的加载过程

流程一般是:1:代理给父类加载器 2:尝试自己加载(ClassLoader.loadClass),但也有例外的情况。

同样的类被不同的类加载器加载后,得到的JAVA类是不同的,所以在同一个JVM中同样名称的类是可以同时存在的。

下面我们看一下链接的情况:
根据实现的不同可能会有不同的策略,策略1,链接时递归把所有依赖的形式引用都解析。策略2,只在真正需要的时候才进行解析

JAVA类第一次被使用到时,JVM会对它进行初始化。主要是执行静态代码块和初始化静态域。在它被初始化之前,如果它有直接父类,直接父类也需要被初始化,但是接口的初始化,不会引起父接口的初始化。
在下面情况下类和接口才会被初始化,创建实例、调用静态方法、给JAVA类或接口中声名的静态域赋值、访问类或接口的静态域,并且该域不是常值变量时声明这个域的类或接口会被初始化,顶层类执行ASSERT语句。

关于类的加载实现,可以参考上一篇BOGE的CLassLoaderUtil的实现,另外创建自己的加载器并不复杂,只需要从ClassLoader类继承并实现相应的方法就可以了。

Posted in Java语言 | Leave a comment

java动态生成字节码

java的字节代码的增加在很多的java框架中都可以看到,java开发人员最熟悉的spring框架也在使用它对字节的代码进行增加。这类增强字节码的技术基本都应用在减少冗余和屏蔽实现细节上,比如spring使用cglib对字节码的增强实现面向方面的编程aop和自动代理类的生成等上。

一个编译好了的class文件的字节码结构(从asm4-guide中截取):

structure_of_a_compiled_class

现在有不少开源的库可以对字节码进行修改或者是生成字节代码内容,最有名的可能就是cglib和aspactJ了吧,这两个库底层使用的是asm的框架去生成和修改字节码(原本它们是使用apache的bcel来处理的,但这个框架已经N年没有更新过了,以至于项目页面上列出的那些使用它的框架都已经另寻别法了,基本上都转移到ASM上去了),使用这些类库可以在一定的程度上降低开发的复杂度。

比如,我们用spring的jdbc查询出实体数据后,把值赋给java bean的过程会用到一个RowMapper的对象,这个对象就比较适合去动态生成,下面请看我在annotate-dal项目中的代码,使用它做为例子吧。

EntitySupport.java

private RowMapper initRowMapper(EntityMeta entityMeta) {
    final String rowMapperClass = entityMeta.getEntityClass().getName() + "RowMapper";
    try {
        return (RowMapper) Class.forName(rowMapperClass).newInstance();
    } catch (ClassNotFoundException e) {
        DalClassLoader.loadRowMapperClass(entityMeta);
        try {
            return (RowMapper) Class.forName(rowMapperClass).newInstance();
        } catch (Exception e1) {
            throw new DalException("can not init mapper instance");
        }
    } catch (InstantiationException e) {
        throw new DalException("object cannot be instantiated.", e);
    } catch (IllegalAccessException e) {
        throw new DalException("can not access class constructor.", e);
    }
}

DalClassLoader.java


/**
 * Created by jiaguotian on 14-3-20.
 */
public final class DalClassLoader implements Opcodes {
    private static final Logger LOGGER = LoggerFactory.getLogger(DalClassLoader.class);

    // 这里省略了其它的代码....

    protected static Class<RowMapper> loadRowMapperClass(EntityMeta entityMeta) {
        ClassWriter cw = new ClassWriter(0);
        MethodVisitor mv;

        final String entityClassName = entityMeta.getEntityClass().getName().replace('.', '/');
        final String entityAsmClassDesc = "L" + entityClassName + ";";

        final String rowMapperClass = entityMeta.getEntityClass().getName() + "RowMapper";
        final String rowMapperClassName = rowMapperClass.replace('.', '/');
        final String rowMapperClassAsmDesc = "L" + rowMapperClassName + ";";

        final String classNameResultSet = ResultSet.class.getName().replace('.', '/');
        final String asmDescResultSet = "L" + classNameResultSet + ";";
        /**
         * com/tianjiaguo/site
         */
        final String classNameRowMapper = RowMapper.class.getName().replace('.', '/');

        cw.visit(V1_7, ACC_PUBLIC + ACC_SUPER, rowMapperClassName, null, CLASS_NAME_OBJECT, new String[]{classNameRowMapper});

        cw.visitSource("$$EntityHelper.java", null);
        int line = 1;

        {
            mv = cw.visitMethod(ACC_PUBLIC, "<init>", "()V", null, null);
            mv.visitCode();
            Label l0 = new Label();
            mv.visitLabel(l0);
            mv.visitLineNumber(line++, l0);
            mv.visitVarInsn(ALOAD, 0);
            mv.visitMethodInsn(INVOKESPECIAL, CLASS_NAME_OBJECT, "<init>", "()V");
            mv.visitInsn(RETURN);
            Label l1 = new Label();
            mv.visitLabel(l1);
            mv.visitLocalVariable("this", rowMapperClassAsmDesc, null, l0, l1, 0);
            mv.visitMaxs(1, 1);
            mv.visitEnd();
        }
        {
            mv = cw.visitMethod(ACC_PUBLIC, "mapRow", String.format("(%sI)%s", asmDescResultSet, ASM_DESC_OBJECT), null, new String[]{CLASS_NAME_SQL_EXCEPTION});
            mv.visitCode();
            Label l0 = new Label();
            mv.visitLabel(l0);
            mv.visitLineNumber(line++, l0);
            mv.visitTypeInsn(NEW, entityClassName);
            mv.visitInsn(DUP);
            mv.visitMethodInsn(INVOKESPECIAL, entityClassName, "<init>", "()V");
            mv.visitVarInsn(ASTORE, 3);

            Label l1 = new Label();
            {
                ImmutableMap.Builder<String, ColumnMeta> builder = ImmutableMap.builder();
                builder.putAll(entityMeta.getPrimaryMetas());
                builder.putAll(entityMeta.getNormalMetas());
                ImmutableMap<String, ColumnMeta> metaMap = builder.build();
                ImmutableSet<Map.Entry<String, ColumnMeta>> entries = metaMap.entrySet();
                for (Map.Entry<String, ColumnMeta> entry : entries) {
                    ColumnMeta columnMeta = entry.getValue();

                    Label label = new Label();
                    mv.visitLabel(label);
                    mv.visitLineNumber(line++, label);
                    mv.visitVarInsn(ALOAD, 3);
                    mv.visitVarInsn(ALOAD, 1);
                    mv.visitLdcInsn(columnMeta.getColumnName());
                    TypeConstant.TypeWrapper typeWrapper = TypeConstant.TYPE_WRAPPERS.get(columnMeta.getFieldType());
                    mv.visitMethodInsn(INVOKEINTERFACE, classNameResultSet, typeWrapper.getMethodName, String.format("(%s)%s", ASM_DESC_STRING, typeWrapper.asmDesc));
                    mv.visitMethodInsn(INVOKEVIRTUAL, entityClassName, columnMeta.getFieldSetMethod().getName(), String.format("(%s)V", typeWrapper.asmDesc));
                }
            }
            Label l4 = new Label();
            mv.visitLabel(l4);
            mv.visitLineNumber(line++, l4);
            mv.visitVarInsn(ALOAD, 3);
            mv.visitInsn(ARETURN);
            Label l5 = new Label();
            mv.visitLabel(l5);
            mv.visitLocalVariable("this", rowMapperClassAsmDesc, null, l0, l5, 0);
            mv.visitLocalVariable("rs", asmDescResultSet, null, l0, l5, 1);
            mv.visitLocalVariable("rowNum", TypeConstant.getAsmDesc(Integer.TYPE), null, l0, l5, 2);
            mv.visitLocalVariable("entity", entityAsmClassDesc, null, l1, l5, 3);
            mv.visitMaxs(3, 4);
            mv.visitEnd();
        }
        cw.visitEnd();
        return ClassLoaderUtil.toClass(cw.toByteArray(), rowMapperClass, java.lang.ClassLoader.getSystemClassLoader(), null);
    }

}

在这里生成class的字节代码中定义出相应的java类,下面给出从字节代码生成class文件的classLoader的程序的源码,主要原理是使用反射去调用系统的ClassLoaderUtil方法


/**
 * TODO:类说明
 * <p/>
 * Date: 14-3-17
 * Time: 下午2:48
 * Author: jiaguo.tian
 */
public class ClassLoaderUtil {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClassLoaderUtil.class);

    private static final String CLASS_DIR = "/tmp/campaign-dal/" + System.currentTimeMillis();
    private static Method DEFINECLASS1, DEFINECLASS2;

    static {
        try {
            AccessController.doPrivileged(new PrivilegedExceptionAction() {
                public Object run() throws Exception {
                    Class cl = Class.forName("java.lang.ClassLoader");
                    DEFINECLASS1 = cl.getDeclaredMethod("defineClass", new Class[]{String.class, byte[].class, int.class, int.class});

                    DEFINECLASS2 = cl.getDeclaredMethod("defineClass", new Class[]{String.class, byte[].class, int.class, int.class, ProtectionDomain.class});
                    return null;
                }
            });
        } catch (PrivilegedActionException pae) {
            throw new DalException("cannot initialize class loader", pae.getException());
        }
    }

    public static Class toClass(byte[] bytes, String name, ClassLoader loader, ProtectionDomain domain)
            throws DalException {
        try {
            if (LOGGER.isDebugEnabled()) {
                FileOutputStream fos = null;
                try {
                    File file = new File(CLASS_DIR, name.replace('.', '/') + ".class");
                    file.getParentFile().mkdirs();
                    fos = new FileOutputStream(file, true);
                    fos.write(bytes);
                    fos.flush();
                } catch (Exception e) {
                    throw new DalException(e);
                } finally {
                    try {
                        fos.close();
                    } catch (IOException iex) {
                        LOGGER.warn(iex.getMessage(), iex);
                    }
                }
            }
            Method method;
            Object[] args;
            if (domain == null) {
                method = DEFINECLASS1;
                args = new Object[]{name, bytes, new Integer(0), new Integer(bytes.length)};
            } else {
                method = DEFINECLASS2;
                args = new Object[]{name, bytes, new Integer(0), new Integer(bytes.length), domain};
            }
            return (Class) toClass2(method, loader, args);
        } catch (RuntimeException e) {
            throw new DalException(e);
        } catch (InvocationTargetException e) {
            throw new DalException(new CompileException(e.getTargetException()));
        } catch (Exception e) {
            throw new DalException(new CompileException(e));
        }
    }

    private static synchronized Object toClass2(Method method, ClassLoader loader, Object[] args) throws Exception {
        method.setAccessible(true);
        try {
            return method.invoke(loader, args);
        } finally {
            method.setAccessible(false);
        }
    }
}

在做这个东西时本来是想使用apache的bcel类来动态生成class文件的,无奈啊,它提供的api太怪异,搞了半天都失败了,又鉴于其开发活跃程度不高,曾经的使用者也都转投别的项目的旗下,放弃之,转而使用asm框架去做这个事情,用asm框架还是比较容易的。

ASM框架的USER GUIDE


Posted in Java语言 | Leave a comment

kafka监控

监控数据源

JMX RMI方式启动Broker,Consumer,Producer

-ea -Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.port=9996

通过JMX RMI方式连接

service:jmx:rmi:///jndi/rmi://127.0.0.1:9998/jmxrmi

监控数据

broker

bean name: kafka:type=kafka.SocketServerStats(每次启动都会清空这部分数据)

def getProduceRequestsPerSecond: Double
  def getFetchRequestsPerSecond: Double
  def getAvgProduceRequestMs: Double
  def getMaxProduceRequestMs: Double
  def getAvgFetchRequestMs: Double
  def getMaxFetchRequestMs: Double
  def getBytesReadPerSecond: Double
  def getBytesWrittenPerSecond: Double
  def getNumFetchRequests: Long
  def getNumProduceRequests: Long
  def getTotalBytesRead: Long
  def getTotalBytesWritten: Long
  def getTotalFetchRequestMs: Long
  def getTotalProduceRequestMs: Long

bean name: kafka:type=kafka.BrokerAllTopicStat(每次启动都会清空这部分数据)
bean name: kafka:type=kafka.BrokerTopicStat.topic(每次启动都会清空这部分数据)

def getMessagesIn: Long  写入消息的数量
  def getBytesIn: Long   写入的byte数量
  def getBytesOut: Long   读出byte的数量
  def getFailedProduceRequest: Long   失败的生产数量
  def getFailedFetchRequest: Long  失败的读取操作数量

不是太重要的属性
bean name: kafka:type=kafka.LogFlushStats

def getFlushesPerSecond: Double
  def getAvgFlushMs: Double
  def getTotalFlushMs: Long
  def getMaxFlushMs: Double
  def getNumFlushes: Long

bean name: kafka:type=logs.topic-pattern

def getName: String    监控项目的名字,格式  topic+”-”+分区ID,比如 guoguo_t_1-0,guoguo_t_1-1
  def getSize: Long 执久化文件的大小
  def getNumberOfSegments: Int  执久化文件的数量
  def getCurrentOffset: Long   基于当前写入kafka的文件的byte偏移量
  def getNumAppendedMessages: Long    追加数据,每次重启清空

其它的需要监控的数据项目:

java堆(堆的内存使用情况,非堆的内存使用情况等)
GC信息(GC次数,GC总时间等)

consumer


消费者的状态
bean name: kafka:type=kafka.ConsumerStats

def getPartOwnerStats: String
比如:guoguo_t_1: [
    {
        0-1,  //  broker+分区的信息
        fetchoffset: 58246,  取的offset,已经消费的offset
        consumeroffset: 58246
    }{ 0-0,  fetchoffset: 2138747,consumeroffset: 2138747}]
  def getConsumerGroup: String    消费者的组,比如guoguo_group_1
  def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long  有多少byte消息没有读取
  def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long 已经消费了多少byte的数据
  def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long

bean name: kafka:type=kafka.ConsumerAllTopicStat (所有topic的数据的汇总,重启数据也会被清空)
kafka:type=kafka.ConsumerTopicStat.topic(重启数据也会被清空)

def getMessagesPerTopic: Long
  def getBytesPerTopic: Long

bean name: kafka:type=kafka.SimpleConsumerStats

def getFetchRequestsPerSecond: Double 每秒种发起的取数据请求数
  def getAvgFetchRequestMs: Double 平均取数据请求用的ms数
  def getMaxFetchRequestMs: Double 最大取数据请求用的ms数
  def getNumFetchRequests: Long 取数据请求的数量
  def getConsumerThroughput: Double 消费者的吞吐量,字节每秒

Producer

bean name: kafka:type=kafka.KafkaProducerStats

def getProduceRequestsPerSecond: Double
  def getAvgProduceRequestMs: Double
  def getMaxProduceRequestMs: Double
  def getNumProduceRequests: Long

bean name: kafka.producer.Producer:type=AsyncProducerStats

def getAsyncProducerEvents: Int (发出消息数量,与所有消费者的getMessagesPerTopic值相关不应太大)
  def getAsyncProducerDroppedEvents: Int

Demo程序

package com.campaign.kafka
 
import javax.management._
import kafka.log.LogStatsMBean
import kafka.network.SocketServerStatsMBean
import kafka.server.BrokerTopicStatMBean
import javax.management.openmbean.CompositeData
import java.lang.management.{MemoryUsage, GarbageCollectorMXBean}
import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
 
 
/**
 * Created by jiaguotian on 14-1-13.
 */
object RmiMonitor {
  def main(args: Array[String]) {
    val jmxUrl: JMXServiceURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi")
    val connector: JMXConnector = JMXConnectorFactory.connect(jmxUrl)
    val mBeanServerconnection: MBeanServerConnection = connector.getMBeanServerConnection
 
    val domains: Array[String] = mBeanServerconnection.getDomains
    println("domains:")
    for (domain <- domains) {
      println("%25s:  %s".format("domain", domain))
    }
 
    println("-------------------------------")
    val beanSet: java.util.Set[ObjectInstance] = mBeanServerconnection.queryMBeans(null, null)
    val beans: Array[ObjectInstance] = beanSet.toArray(new Array[ObjectInstance](0)).sortWith((o1, o2) => o1.getClassName.compare(o2.getClassName) < 0)
    for (instance <- beans) {
      val objectName: ObjectName = instance.getObjectName
      println("%s %s".format(instance.getClassName, objectName))
    }
 
    println("-------------------------------")
 
    {
      val instance: ObjectName = ObjectName.getInstance("kafka:type=kafka.SocketServerStats")
      val bean: SocketServerStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection,
        instance,
        classOf[SocketServerStatsMBean],
        true)
      println(instance.getCanonicalKeyPropertyListString)
      println("%25s:  %s".format("AvgFetchRequestMs", bean.getAvgFetchRequestMs))
      println("%25s:  %s".format("AvgProduceRequestMs", bean.getAvgProduceRequestMs))
      println("%25s:  %s".format("BytesReadPerSecond", bean.getBytesReadPerSecond))
      println("%25s:  %s".format("BytesWrittenPerSecond", bean.getBytesWrittenPerSecond))
      println("%25s:  %s".format("FetchRequestsPerSecond", bean.getFetchRequestsPerSecond))
      println("%25s:  %s".format("MaxFetchRequestMs", bean.getMaxFetchRequestMs))
      println("%25s:  %s".format("MaxProduceRequestMs", bean.getMaxProduceRequestMs))
      println("%25s:  %s".format("NumFetchRequests", bean.getNumFetchRequests))
      println("%25s:  %s".format("NumProduceRequests", bean.getNumProduceRequests))
      println("%25s:  %s".format("ProduceRequestsPerSecond", bean.getProduceRequestsPerSecond))
    }
    println("-------------------------------");
    {
      val objNames: java.util.Set[ObjectName] = mBeanServerconnection.queryNames(
        ObjectName.getInstance("java.lang:type=Memory*"), null)
      val array: Array[ObjectName] = objNames.toArray(new Array[ObjectName](0))
      for (name <- array) {
        val info: _root_.javax.management.MBeanInfo = mBeanServerconnection.getMBeanInfo(name)
        val attrInfos: Array[_root_.javax.management.MBeanAttributeInfo] = info.getAttributes
        println(name.toString)
        for (info <- attrInfos) {
          println(info.getName + " " + info.getType)
          info.getType match {
            case "javax.management.openmbean.CompositeData" =>
              val attribute: AnyRef = mBeanServerconnection.getAttribute(name, info.getName)
              val bean: MemoryUsage = MemoryUsage.from(attribute.asInstanceOf[CompositeData])
              println("%25s:  %s".format("Committed", bean.getCommitted()))
              println("%25s:  %s".format("Init", bean.getInit()))
              println("%25s:  %s".format("Max", bean.getMax()))
              println("%25s:  %s".format("Used", bean.getUsed()))
            case _ =>
          }
        }
      }
    }
    println("-------------------------------")
 
    {
      val objNames: java.util.Set[ObjectName] = mBeanServerconnection.queryNames(
        ObjectName.getInstance("java.lang:type=GarbageCollector,name=*"), null)
      val array: Array[ObjectName] = objNames.toArray(new Array[ObjectName](0))
      for (next <- array) {
        val bean: GarbageCollectorMXBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, next, classOf[GarbageCollectorMXBean], true)
        println("%25s:  %s".format("Name", bean.getName()))
        println("%25s:  %s".format("MemoryPoolNames", bean.getMemoryPoolNames()))
        println("%25s:  %s".format("ObjectName", bean.getObjectName()))
        println("%25s:  %s".format("Class", bean.getClass()))
        println("%25s:  %s".format("CollectionCount", bean.getCollectionCount()))
        println("%25s:  %s".format("CollectionTime", bean.getCollectionTime()))
      }
    }
 
 
    val TypeValuePattern = "(.*):(.*)=(.*)".r
    val kafka1: ObjectName = new ObjectName("kafka", "type", "*")
    val kafka: java.util.Set[ObjectInstance] = mBeanServerconnection.queryMBeans(kafka1, null)
    val kafkas: Array[ObjectInstance] = kafka.toArray(new Array[ObjectInstance](0)).sortWith((o1, o2) => o1.getClassName.compare(o2.getClassName) < 0)
    for (instance <- kafkas) {
      val objectName: ObjectName = instance.getObjectName
      println(instance.getClassName + " " + objectName)
 
      objectName.getCanonicalName match {
        case TypeValuePattern(domain, t, v) =>
          instance.getClassName match {
            case "kafka.log.LogStats" =>
              val oName: ObjectName = new ObjectName(domain, t, v)
              val bean: LogStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[LogStatsMBean], true)
              println("%25s:  %s".format("CurrentOffset", bean.getCurrentOffset))
              println("%25s:  %s".format("Name", bean.getName()))
              println("%25s:  %s".format("NumAppendedMessages", bean.getNumAppendedMessages))
              println("%25s:  %s".format("NumberOfSegments", bean.getNumberOfSegments))
              println("%25s:  %s".format("Size", bean.getSize()))
            case "kafka.message.LogFlushStats" =>
              val oName: ObjectName = new ObjectName(domain, t, v)
              val bean: LogStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[LogStatsMBean], true)
              println("%25s:  %s".format("CurrentOffset", bean.getCurrentOffset))
              println("%25s:  %s".format("Name", bean.getName()))
              println("%25s:  %s".format("NumAppendedMessages", bean.getNumAppendedMessages))
              println("%25s:  %s".format("NumberOfSegments", bean.getNumberOfSegments))
              println("%25s:  %s".format("Size", bean.getSize()))
            case "kafka.network.SocketServerStats" =>
              val oName: ObjectName = new ObjectName(domain, t, v)
              val bean: SocketServerStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[SocketServerStatsMBean], true)
              println("%25s:  %s".format("BytesReadPerSecond", bean.getBytesReadPerSecond))
              println("%25s:  %s".format("AvgFetchRequestMs", bean.getAvgFetchRequestMs))
              println("%25s:  %s".format("AvgProduceRequestMs", bean.getAvgProduceRequestMs))
              println("%25s:  %s".format("BytesWrittenPerSecond", bean.getBytesWrittenPerSecond))
              println("%25s:  %s".format("FetchRequestsPerSecond", bean.getFetchRequestsPerSecond))
              println("%25s:  %s".format("MaxFetchRequestMs", bean.getMaxFetchRequestMs))
              println("%25s:  %s".format("MaxProduceRequestMs", bean.getMaxProduceRequestMs))
              println("%25s:  %s".format("NumFetchRequests", bean.getNumFetchRequests))
              println("%25s:  %s".format("NumProduceRequests", bean.getNumProduceRequests))
              println("%25s:  %s".format("ProduceRequestsPerSecond", bean.getProduceRequestsPerSecond))
              println("%25s:  %s".format("TotalBytesRead", bean.getTotalBytesRead))
            case "kafka.server.BrokerTopicStat" =>
              val oName: ObjectName = new ObjectName(domain, t, v)
              val bean: BrokerTopicStatMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[BrokerTopicStatMBean], true)
              println("%25s:  %s".format("BytesIn", bean.getBytesIn))
              println("%25s:  %s".format("BytesOut", bean.getBytesOut))
              println("%25s:  %s".format("FailedFetchRequest", bean.getFailedFetchRequest))
              println("%25s:  %s".format("FailedProduceRequest", bean.getFailedProduceRequest))
              println("%25s:  %s".format("MessagesIn", bean.getMessagesIn))
            case _ =>
          }
        case _ =>
      }
    }
  }
}

输出结果

domains:
                   domain:  JMImplementation
                   domain:  com.sun.management
                   domain:  kafka
                   domain:  java.nio
                   domain:  java.lang
                   domain:  java.util.logging
-------------------------------
com.sun.management.UnixOperatingSystem java.lang:type=OperatingSystem
javax.management.MBeanServerDelegate JMImplementation:type=MBeanServerDelegate
kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-1
kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-0
kafka.network.SocketServerStats kafka:type=kafka.SocketServerStats
kafka.utils.Log4jController kafka:type=kafka.Log4jController
sun.management.ClassLoadingImpl java.lang:type=ClassLoading
sun.management.CompilationImpl java.lang:type=Compilation
sun.management.GarbageCollectorImpl java.lang:type=GarbageCollector,name=ConcurrentMarkSweep
sun.management.GarbageCollectorImpl java.lang:type=GarbageCollector,name=ParNew
sun.management.HotSpotDiagnostic com.sun.management:type=HotSpotDiagnostic
sun.management.ManagementFactoryHelper$1 java.nio:type=BufferPool,name=direct
sun.management.ManagementFactoryHelper$1 java.nio:type=BufferPool,name=mapped
sun.management.ManagementFactoryHelper$PlatformLoggingImpl java.util.logging:type=Logging
sun.management.MemoryImpl java.lang:type=Memory
sun.management.MemoryManagerImpl java.lang:type=MemoryManager,name=CodeCacheManager
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=Par Survivor Space
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=CMS Perm Gen
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=Par Eden Space
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=Code Cache
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=CMS Old Gen
sun.management.RuntimeImpl java.lang:type=Runtime
sun.management.ThreadImpl java.lang:type=Threading
-------------------------------
type=kafka.SocketServerStats
     getAvgFetchRequestMs:  0.0
   getAvgProduceRequestMs:  0.0
    getBytesReadPerSecond:  0.0
 getBytesWrittenPerSecond:  0.0
getFetchRequestsPerSecond:  -0.0
     getMaxFetchRequestMs:  0.0
   getMaxProduceRequestMs:  0.0
      getNumFetchRequests:  0
    getNumProduceRequests:  0
getProduceRequestsPerSecond:  -0.0
-------------------------------
java.lang:type=Memory
HeapMemoryUsage javax.management.openmbean.CompositeData
             getCommitted:  3194421248
                  getInit:  3221225472
                   getMax:  3194421248
                  getUsed:  163302248
NonHeapMemoryUsage javax.management.openmbean.CompositeData
             getCommitted:  24313856
                  getInit:  24313856
                   getMax:  136314880
                  getUsed:  14854816
ObjectPendingFinalizationCount int
Verbose boolean
ObjectName javax.management.ObjectName
-------------------------------
                  getName:  ParNew
       getMemoryPoolNames:  [Ljava.lang.String;@23652209
            getObjectName:  java.lang:type=GarbageCollector,name=ParNew
                 getClass:  class com.sun.proxy.$Proxy1
       getCollectionCount:  0
        getCollectionTime:  0
                  getName:  ConcurrentMarkSweep
       getMemoryPoolNames:  [Ljava.lang.String;@2c61bbb7
            getObjectName:  java.lang:type=GarbageCollector,name=ConcurrentMarkSweep
                 getClass:  class com.sun.proxy.$Proxy1
       getCollectionCount:  0
        getCollectionTime:  0
kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-1
            CurrentOffset:  5519897
                     Name:  guoguo_t_1-1
      NumAppendedMessages:  0
         NumberOfSegments:  1
                     Size:  5519897
kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-0
            CurrentOffset:  7600338
                     Name:  guoguo_t_1-0
      NumAppendedMessages:  0
         NumberOfSegments:  1
                     Size:  7600338
kafka.network.SocketServerStats kafka:type=kafka.SocketServerStats
       BytesReadPerSecond:  0.0
        AvgFetchRequestMs:  0.0
      AvgProduceRequestMs:  0.0
    BytesWrittenPerSecond:  0.0
   FetchRequestsPerSecond:  -0.0
        MaxFetchRequestMs:  0.0
      MaxProduceRequestMs:  0.0
         NumFetchRequests:  0
       NumProduceRequests:  0
 ProduceRequestsPerSecond:  -0.0
           TotalBytesRead:  0
kafka.utils.Log4jController kafka:type=kafka.Log4jController

Posted in kafka | Leave a comment

Gnome3 Classic面板图标删除

你是Gnome3 Classic的用户嘛?肯定是遇到了面板上的图标找不到删除的方法的问题吧,右键点击不行,天杀的Gnome3 Classic需要在点击右键的同时按下WIN键和ALT键。这是谁能想到的呢。

Posted in Unix/Linux | Leave a comment

kafka的ZookeeperConsumer实现

kafka的ZookeeperConsumer数据获取的步骤如下:

入口ZookeeperConsumerConnector def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T])
: Map[String,List[KafkaStream[T]]] 方法
客户端启动后会在消费者注册目录上添加子节点变化的监听ZKRebalancerListener,ZKRebalancerListener实例会在内部创建一个线程,这个线程定时检查监听的事件有没有执行(消费者发生变化),如果没有变化则wait1秒钟,当发生了变化就调用 syncedRebalance 方法,去rebalance消费者。

while (!isShuttingDown.get) {
          try {
            lock.lock()
            try {
              if (!isWatcherTriggered)
                cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag
            } finally {
              doRebalance = isWatcherTriggered
              isWatcherTriggered = false
              lock.unlock()
            }
            if (doRebalance)
              syncedRebalance
          } catch {
            case t => error("error during syncedRebalance", t)
          }

syncedRebalance方法在内部会调用def rebalance(cluster: Cluster): Boolean方法,去执行操作。
这个方法的伪代码如下:

// 关闭所有的数据获取者
closeFetchers
// 解除分区的所有者
releasePartitionOwnership
// 按规则得到当前消费者拥有的分区信息并保存到topicRegistry中
topicRegistry=getCurrentConsumerPartitionInfo
// 修改并重启Fetchers
updateFetchers

updateFetcher是这样实现的。

private def updateFetcher(cluster: Cluster) {
      // 遍历topicRegistry中保存的当前消费者的分区信息,修改Fetcher的partitions信息 
      var allPartitionInfos : List[PartitionTopicInfo] = Nil
      for (partitionInfos <- topicRegistry.values)
        for (partition <- partitionInfos.values)
          allPartitionInfos ::= partition
      info("Consumer " + consumerIdString + " selected partitions : " +
        allPartitionInfos.sortWith((s,t) => s.partition < t.partition).map(_.toString).mkString(","))

      fetcher match {
        case Some(f) =>
          // 调用fetcher的startConnections方法,初始化Fetcher并启动它
          f.startConnections(allPartitionInfos, cluster)
        case None =>
      }
    }

Fetcher在startConnections时,它先把topicInfo按brokerid去分组

for(info <- topicInfos) {
      m.get(info.brokerId) match {
        case None => m.put(info.brokerId, List(info))
        case Some(lst) => m.put(info.brokerId, info :: lst)
      }
    }

然后检查每组topicInfo对应的broker是否在当前集群中注册了

val brokers = ids.map { id =>
      cluster.getBroker(id) match {
        case Some(broker) => broker
        case None => throw new IllegalStateException("Broker " + id + " is unavailable, fetchers could not be started")
      }
    }

最后对每个broker创建一个FetcherRunnable线程,并启动它。这个线程负责从服务器上不断获取数据,把数据插入内部阻塞队列的操作。

// 对每个分区分别创建FetchRequest
val fetches = partitionTopicInfos.map(info =>
          new FetchRequest(info.topic, info.partition.partId, info.getFetchOffset, config.fetchSize))
// 批量执行fetch操作
        val response = simpleConsumer.multifetch(fetches : _*)

....
// 遍历返回获取到的数据
for((messages, infopti) <- response.zip(partitionTopicInfos)) {
          try {
            var done = false
// 当zk中存放的offset值不在kafka机器上存在时,比如consumer好久没有启动,相应的offset的数据已经在kafka集群中被过期删除清理掉了
            if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) {
              info("offset for " + infopti + " out of range")
              // see if we can fix this error
              val resetOffset = resetConsumerOffsets(infopti.topic, infopti.partition)
              if(resetOffset >= 0) {
                infopti.resetFetchOffset(resetOffset)
                infopti.resetConsumeOffset(resetOffset)
                done = true
              }
            }
// 如果成功了,把消息放到队列中,实际上是把当前分区信息、当前获取到的消息、当前获取使用的fetchoffset封装FetchedDataChunk对象,放到分区消息对象的内部队列中(chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset)))。
            if (!done)
              read += infopti.enqueue(messages, infopti.getFetchOffset)
          }

客户端用ConsumerIterator不断的从分区信息的内部队列中取数据。ConsumerIterator实现了IteratorTemplate的接口,它的内部保存一个Iterator的属性current,每次调用makeNext时会检查它,如果有则从中取否则从队列中取。

  protected def makeNext(): MessageAndMetadata[T] = {
    var currentDataChunk: FetchedDataChunk = null
    // if we don't have an iterator, get one,从内部变量中取数据
    var localCurrent = current.get()
    if(localCurrent == null || !localCurrent.hasNext) {
// 内部变量中取不到值,检查timeout的值
      if (consumerTimeoutMs < 0)
        currentDataChunk = channel.take // 是负数(-1),则表示永不过期,如果接下来无新数据可取,客户端线程会在channel.take阻塞住
      else {
// 设置了过期时间,在没有新数据可用时,pool会在相应的时间返回,返回值为空,则说明没有取到新数据,抛出timeout的异常
        currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
        if (currentDataChunk == null) {
          // reset state to make the iterator re-iterable
          resetState()
          throw new ConsumerTimeoutException
        }
      }
// kafka把shutdown的命令也做为一个datachunk放到队列中,用这种方法来保证消息的顺序性
      if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
        debug("Received the shutdown command")
        channel.offer(currentDataChunk)
        return allDone
      } else {
        currentTopicInfo = currentDataChunk.topicInfo
        if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
          error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
                        .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
          currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
        }
// 把取出chunk中的消息转化为iterator
        localCurrent = if (enableShallowIterator) currentDataChunk.messages.shallowIterator
                       else currentDataChunk.messages.iterator
// 使用这个新的iterator初始化current,下次可直接从current中取数据
        current.set(localCurrent)
      }
    }
// 取出下一条数据,并用下一条数据的offset值设置consumedOffset
    val item = localCurrent.next()
    consumedOffset = item.offset
// 解码消息,封装消息和它的topic信息到MessageAndMetadata对象,返回
    new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic)
  }

ConsumerIterator的next方法

  override def next(): MessageAndMetadata[T] = {
    val item = super.next()
    if(consumedOffset < 0)
      throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
// 使用makeNext方法设置的consumedOffset,去修改topicInfo的消费offset
    currentTopicInfo.resetConsumeOffset(consumedOffset)
    val topic = currentTopicInfo.topic
    trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
    ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
    ConsumerTopicStat.getConsumerAllTopicStat().recordMessagesPerTopic(1)
// 返回makeNext得到的item
    item
  }

KafkaStream对ConsumerIterator做了进一步的封装,我们调用stream的next方法就可以取到数据了(内部通过调用ConsumerIterator的next方法实现)

注意:
ConsumerIterator的实现可能会造成数据的重复发送(这要看生产者如何生产数据),FetchedDataChunk是一个数据集合,它内部会包含很多数据块,一个数据块可能包含多条消息,但同一个数据块中的消息只有一个offset,所以当一个消息块有多条数据,处理完部分数据发生异常时,消费者重新去取数据,就会再次取得这个数据块,然后消费过的数据就会被重新消费。

PS:有时间搞个图来说明这个获取数据的过程。

Posted in kafka | Leave a comment

kafka0.8.0找不到scala/Tuple2$mcJJ$sp问题

kafka的最新版本0.8.0做了很多改进,其提供的API接口相应的有较大的变动,在修改其客户端封装代码遇到了不少问题。

改了半天,代码都改好了,第一次执行test run就很受挫,报了一大堆的错误。

我的pom.xml的文件配置相关的部分最初是这样的


        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.6.2</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.3</version>
        </dependency>
        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.3.4</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.6</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.8.1</version>
            <scope>test</scope>
        </dependency>

报如下的错误

Caused by: java.lang.ClassNotFoundException: scala.Tuple2$mcJJ$sp
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	... 35 more

这个错误解决很简单,和我们使用的kafka代码的版本有关系,配置文件中使用的kafka的artifactId是kafka_2.10,所以对应的scala也要使用2.10的,修改scala library的配置为:

<dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.0</version>
        </dependency>

重新启动测试,问题就木有了。

Posted in kafka | Leave a comment

分布式发布订阅消息系统Kafka架构设计

原文地址
kafka-design

 我们为什么要搭建该系统

Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础。现在它已为多家不同类型的公司 作为多种类型的数据管道(data pipeline)和消息系统使用。
活动流数据是所有站点在对其网站使用情况做报表时要用到的数据中最常规的部分。活动数据包括页面访问量(page view)、被查看内容方面的信息以及搜索情况等内容。这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性地对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多。
近年来,活动和运营数据处理已经成为了网站软件产品特性中一个至关重要的组成部分,这就需要一套稍微更加复杂的基础设施对其提供支持。

 活动流和运营数据的若干用例

  • “动态汇总(News feed)”功能。将你朋友的各种活动信息广播给你
  • 相关性以及排序。通过使用计数评级(count rating)、投票(votes)或者点击率( click-through)判定一组给定的条目中那一项是最相关的.
  • 安全:网站需要屏蔽行为不端的网络爬虫(crawler),对API的使用进行速率限制,探测出扩散垃圾信息的企图,并支撑其它的行为探测和预防体系,以切断网站的某些不正常活动。
  • 运营监控:大多数网站都需要某种形式的实时且随机应变的方式,对网站运行效率进行监控并在有问题出现的情况下能触发警告。
  • 报表和批处理: 将数据装载到数据仓库或者Hadoop系统中进行离线分析,然后针对业务行为做出相应的报表,这种做法很普遍。

 活动流数据的特点

这种由不可变(immutable)的活动数据组成的高吞吐量数据流代表了对计算能力的一种真正的挑战,因其数据量很容易就可能会比网站中位于第二位的数据源的数据量大10到100倍。
传统的日志文件统计分析对报表和批处理这种离线处理的情况来说,是一种很不错且很有伸缩性的方法;但是这种方法对于实时处理来说其时延太大,而且还具有较高的运营复杂度。另一方面,现有的消息队列系统(messaging and queuing system)却很适合于在实时或近实时(near-real-time)的情况下使用,但它们对很长的未被处理的消息队列的处理很不给力,往往并不将数据持久化作为首要的事情考虑。这样就会造成一种情况,就是当把大量数据传送给Hadoop这样的离线系统后, 这些离线系统每个小时或每天仅能处理掉部分源数据。Kafka的目的就是要成为一个队列平台,仅仅使用它就能够既支持离线又支持在线使用这两种情况。
Kafka支持非常通用的消息语义(messaging semantics)。尽管我们这篇文章主要是想把它用于活动处理,但并没有任何限制性条件使得它仅仅适用于此目的。

 部署

下面的示意图所示是在LinkedIn中部署后各系统形成的拓扑结构。

要注意的是,一个单个的Kafka集群系统用于处理来自各种不同来源的所有活动数据。它同时为在线和离线的数据使用者提供了一个单个的数据管道,在线活动和异步处理之间形成了一个缓冲区层。我们还使用kafka,把所有数据复制(replicate)到另外一个不同的数据中心去做离线处理。
我们并不想让一个单个的Kafka集群系统跨越多个数据中心,而是想让Kafka支持多数据中心的数据流拓扑结构。这是通过在集群之间进行镜像或“同步”实现的。这个功能非常简单,镜像集群只是作为源集群的数据使用者的角色运行。这意味着,一个单个的集群就能够将来自多个数据中心的数据集中到一个位置。下面所示是可用于支持批量装载(batch loads)的多数据中心拓扑结构的一个例子:

请注意,在图中上面部分的两个集群之间不存在通信连接,两者可能大小不同,具有不同数量的节点。下面部分中的这个单个的集群可以镜像任意数量的源集群。要了解镜像功能使用方面的更多细节,请访问这里.

 主要的设计元素

Kafka之所以和其它绝大多数信息系统不同,是因为下面这几个为数不多的比较重要的设计决策:

1. Kafka在设计之时为就将持久化消息作为通常的使用情况进行了考虑。
2. 主要的设计约束是吞吐量而不是功能。
3. 有关哪些数据已经被使用了的状态信息保存为数据使用者(consumer)的一部分,而不是保存在服务器之上。
4. Kafka是一种显式的分布式系统。它假设,数据生产者(producer)、代理(brokers)和数据使用者(consumer)分散于多台机器之上。

以上这些设计决策将在下文中进行逐条详述。

 基础知识

首先来看一些基本的术语和概念。
消息指的是通信的基本单位。由消息生产者(producer)发布关于某话题(topic)的消息,这句话的意思是,消息以一种物理方式被发送给了作为代理(broker)的服务器(可能是另外一台机器)。若干的消息使用者(consumer)订阅(subscribe)某个话题,然后生产者所发布的每条消息都会被发送给所有的使用者。
Kafka是一个显式的分布式系统 —— 生产者、使用者和代理都可以运行在作为一个逻辑单位的、进行相互协作的集群中不同的机器上。对于代理和生产者,这么做非常自然,但使用者却需要一些特殊的支持。每个使用者进程都属于一个使用者小组(consumer group) 。准确地讲,每条消息都只会发送给每个使用者小组中的一个进程。因此,使用者小组使得许多进程或多台机器在逻辑上作为一个单个的使用者出现。使用者小组这个概念非常强大,可以用来支持JMS中队列(queue)或者话题(topic)这两种语义。为了支持队列 语义,我们可以将所有的使用者组成一个单个的使用者小组,在这种情况下,每条消息都会发送给一个单个的使用者。为了支持话题语义,可以将每个使用者分到它自己的使用者小组中,随后所有的使用者将接收到每一条消息。在我们的使用当中,一种更常见的情况是,我们按照逻辑划分出多个使用者小组,每个小组都是有作为一个逻辑整体的多台使用者计算机组成的集群。在大数据的情况下,Kafka有个额外的优点,对于一个话题而言,无论有多少使用者订阅了它,一条条消息都只会存储一次。

 消息持久化(Message Persistence)及其缓存

不要害怕文件系统!
在对消息进行存储和缓存时,Kafka严重地依赖于文件系统。 大家普遍认为“磁盘很慢”,因而人们都对持久化结(persistent structure)构能够提供说得过去的性能抱有怀疑态度。实际上,同人们的期望值相比,磁盘可以说是既很慢又很快,这取决于磁盘的使用方式。设计的很好的磁盘结构往往可以和网络一样快。
磁盘性能方面最关键的一个事实是,在过去的十几年中,硬盘的吞吐量正在变得和磁盘寻道时间严重不一致了。结果,在一个由6个7200rpm的SATA硬盘组成的RAID-5磁盘阵列上,线性写入(linear write)的速度大约是300MB/秒,但随即写入却只有50k/秒,其中的差别接近10000倍。线性读取和写入是所有使用模式中最具可预计性的一种方式,因而操作系统采用预读(read-ahead)和后写(write-behind)技术对磁盘读写进行探测并优化后效果也不错。预读就是提前将一个比较大的磁盘块中内容读入内存,后写是将一些较小的逻辑写入操作合并起来组成比较大的物理写入操作。关于这个问题更深入的讨论请参考这篇文章ACM Queue article;实际上他们发现,在某些情况下,顺序磁盘访问能够比随即内存访问还要快!

为了抵消这种性能上的波动,现代操作系变得越来越积极地将主内存用作磁盘缓存。所有现代的操作系统都会乐于将所有空闲内存转做磁盘缓存,即时在需要回收这些内存的情况下会付出一些性能方面的代价。所有的磁盘读写操作都需要经过这个统一的缓存。想要舍弃这个特性都不太容易,除非使用直接I/O。因此,对于一个进程而言,即使它在进程内的缓存中保存了一份数据,这份数据也可能在OS的页面缓存(pagecache)中有重复的一份,结构就成了一份数据保存了两次。
更进一步讲,我们是在JVM的基础之上开发的系统,只要是了解过一些Java中内存使用方法的人都知道这两点:

1. Java对象的内存开销(overhead)非常大,往往是对象中存储的数据所占内存的两倍(或更糟)。
2. Java中的内存垃圾回收会随着堆内数据不断增长而变得越来越不明确,回收所花费的代价也会越来越大。

由于这些因素,使用文件系统并依赖于页面缓存要优于自己在内存中维护一个缓存或者什么别的结构 —— 通过对所有空闲内存自动拥有访问权,我们至少将可用的缓存大小翻了一倍,然后通过保存压缩后的字节结构而非单个对象,缓存可用大小接着可能又翻了一倍。这么做下来,在GC性能不受损失的情况下,我们可在一台拥有32G内存的机器上获得高达28到30G的缓存。而且,这种缓存即使在服务重启之后会仍然保持有效,而不象进程内缓存,进程重启后还需要在内存中进行缓存重建(10G的缓存重建时间可能需要10分钟),否则就需要以一个全空的缓存开始运行(这么做它的初始性能会非常糟糕)。这还大大简化了代码,因为对缓存和文件系统之间的一致性进行维护的所有逻辑现在都是在OS中实现的,这事OS做起来要比我们在进程中做那种一次性的缓存更加高效,准确性也更高。如果你使用磁盘的方式更倾向于线性读取操作,那么随着每次磁盘读取操作,预读就能非常高效使用随后准能用得着的数据填充缓存。

这就让人联想到一个非常简单的设计方案:不是要在内存中保存尽可能多的数据并在需要时将这些数据刷新(flush)到文件系统,而是我们要做完全相反的事情。所有数据都要立即写入文件系统中持久化的日志中但不进行刷新数据的任何调用。实际中这么做意味着,数据被传输到OS内核的页面缓存中了,OS随后会将这些数据刷新到磁盘的。此外我们添加了一条基于配置的刷新策略,允许用户对把数据刷新到物理磁盘的频率进行控制(每当接收到N条消息或者每过M秒),从而可以为系统硬件崩溃时“处于危险之中”的数据在量上加个上限。
这种以页面缓存为中心的设计风格在一篇讲解Varnish的设计思想的文章中有详细的描述(文风略带有助于身心健康的傲气)。

 常量时长足矣

消息系统元数据的持久化数据结构往往采用BTree。 BTree是目前最通用的数据结构,在消息系统中它可以用来广泛支持多种不同的事务性或非事务性语义。 它的确也带来了一个非常高的处理开销,Btree运算的时间复杂度为O(log N)。一般O(log N)被认为基本上等于常量时长,但对于磁盘操作来讲,情况就不同了。磁盘寻道时间一次要花10ms的时间,而且每个磁盘同时只能进行一个寻道操作,因而其并行程度很有限。因此,即使少量的磁盘寻道操作也会造成非常大的时间开销。因为存储系统混合了高速缓存操作和真正的物理磁盘操作,所以树型结构(tree structure)可观察到的性能往往是超线性的(superlinear)。更进一步讲,BTrees需要一种非常复杂的页面级或行级锁定机制才能避免在每次操作时锁定一整颗树。实现这种机制就要为行级锁定付出非常高昂的代价,否则就必须对所有的读取操作进行串行化(serialize)。因为对磁盘寻道操作的高度依赖,就不太可能高效地从驱动器密度(drive density)的提高中获得改善,因而就不得不使用容量较小(< 100GB)转速较高的SAS驱动去,以维持一种比较合理的数据与寻道容量之比。

直觉上讲,持久化队列可以按照通常的日志解决方案的样子构建,只是简单的文件读取和简单地向文件中添加内容。虽然这种结果必然无法支持BTree实现中的丰富语义,但有个优势之处在于其所有的操作的复杂度都是O(1),读取操作并不需要阻止写入操作,而且反之亦然。这样做显然有性能优势,因为性能完全同数据大小之间脱离了关系 —— 一个服务器现在就能利用大量的廉价、低转速、容量超过1TB的SATA驱动器。虽然这些驱动器寻道操作的性能很低,但这些驱动器在大量数据读写的情况下性能还凑和,而只需1/3的价格就能获得3倍的容量。 能够存取到几乎无限大的磁盘空间而无须付出性能代价意味着,我们可以提供一些消息系统中并不常见的功能。例如,在Kafka中,消息在使用完后并没有立即删除,而是会将这些消息保存相当长的一段时间(比方说一周)。

 效率最大化

我们的假设是,系统里消息的量非常之大,实际消息量是网站页面浏览总数的数倍之多(因为每个页面浏览就是我们要处理的其中一个活动)。而且我们假设发布的每条消息都会被至少读取一次(往往是多次),因而我们要为消息使用而不是消息的产生进行系统优化,
导致低效率的原因常见的有两个:过多的网络请求和大量的字节拷贝操作。
为了提高效率,API是围绕这“消息集”(message set)抽象机制进行设计的,消息集将消息进行自然分组。这么做能让网络请求把消息合成一个小组,分摊网络往返(roundtrip)所带来的开销,而不是每次仅仅发送一个单个消息。

MessageSet实现(implementation)本身是对字节数组或文件进行一次包装后形成的一薄层API。因而,里面并不存在消息处理所需的单独的序列化(serialization)或逆序列化(deserialization)的步骤。消息中的字段(field)是按需进行逆序列化的(或者说,在不需要时就不进行逆序列化)。
由代理维护的消息日志本身不过是那些已写入磁盘的消息集的目录。按此进行抽象处理后,就可以让代理和消息使用者共用一个单个字节的格式(从某种程度上说,消息生产者也可以用它,消息生产者的消息要求其校验和(checksum)并在验证后才会添加到日志中)
使用共通的格式后就能对最重要的操作进行优化了:持久化后日志块(chuck)的网络传输。为了将数据从页面缓存直接传送给socket,现代的Unix操作系统提供了一个高度优化的代码路径(code path)。在Linux中这是通过sendfile这个系统调用实现的。通过Java中的API,FileChannel.transferTo,由它来简洁的调用上述的系统调用。

为了理解sendfile所带来的效果,重要的是要理解将数据从文件传输到socket的数据路径:

1. 操作系统将数据从磁盘中读取到内核空间里的页面缓存
2. 应用程序将数据从内核空间读入到用户空间的缓冲区
3. 应用程序将读到的数据写回内核空间并放入socke的缓冲区
4. 操作系统将数据从socket的缓冲区拷贝到NIC(网络借口卡,即网卡)的缓冲区,自此数据才能通过网络发送出去

这样效率显然很低,因为里面涉及4次拷贝,2次系统调用。使用sendfile就可以避免这些重复的拷贝操作,让OS直接将数据从页面缓存发送到网络中,其中只需最后一步中的将数据拷贝到NIC的缓冲区。
我们预期的一种常见的用例是一个话题拥有多个消息使用者。采用前文所述的零拷贝优化方案,数据只需拷贝到页面缓存中一次,然后每次发送给使用者时都对它进行重复使用即可,而无须先保存到内存中,然后在阅读该消息时每次都需要将其拷贝到内核空间中。如此一来,消息使用的速度就能接近网络连接的极限。
要得到Java中对send’file和零拷贝的支持方面的更多背景知识,请参考IBM developerworks上的这篇文章。

 端到端的批量压缩

多数情况下系统的瓶颈是网络而不是CPU。 这一点对于需要将消息在个数据中心间进行传输的数据管道来说,尤其如此。当然,无需来自Kafka的支持,用户总是可以自行将消息压缩后进行传输,但这么做的压缩率会非常低,因为不同的消息里都有很多重复性的内容(比如JSON里的字段名、web日志中的用户代理或者常用的字符串)。高效压缩需要将多条消息一起进行压缩而不是分别压缩每条消息。理想情况下,以端到端的方式这么做是行得通的 —— 也即,数据在消息生产者发送之前先压缩一下,然后在服务器上一直保存压缩状态,只有到最终的消息使用者那里才需要将其解压缩。
通过运行递归消息集,Kafka对这种压缩方式提供了支持。 一批消息可以打包到一起进行压缩,然后以这种形式发送给服务器。这批消息都会被发送给同一个消息使用者,并会在到达使用者那里之前一直保持为被压缩的形式。
Kafka支持GZIP和Snappy压缩协议。关于压缩的更多更详细的信息,请参见这里。

 客户状态

追踪(客户)消费了什么是一个消息系统必须提供的一个关键功能之一。它并不直观,但是记录这个状态是该系统的关键性能之一。状态追踪要求(不断)更新一个有持久性的实体的和一些潜在会发生的随机访问。因此它更可能受到存储系统的查询时间的制约而不是带宽(正如上面所描述的)。
大部分消息系统保留着关于代理者使用(消费)的消息的元数据。也就是说,当消息被交到客户手上时,代理者自己记录了整个过程。这是一个相当直观的选择,而且确实对于一个单机服务器来说,它(数据)能去(放在)哪里是不清晰的。又由于许多消息系统存储使用的数据结构规模小,所以这也是个实用的选择–因为代理者知道什么被消费了使得它可以立刻删除它(数据),保持数据大小不过大。

也许不显然的是,让代理和使用者这两者对消息的使用情况做到一致表述绝不是一件轻而易举的事情。如果代理每次都是在将消息发送到网络中后就将该消息记录为已使用的话,一旦使用者没能真正处理到该消息(比方说,因为它宕机或这请求超时了抑或别的什么原因),就会出现消息丢失的情况。为了解决此问题,许多消息系新加了一个确认功能,当消息发出后仅把它标示为已发送而不是已使用,然后代理需要等到来自使用者的特定的确认信息后才将消息记录为已使用。这种策略的确解决了丢失消息的问题,但由此产生了新问题。首先,如果使用者已经处理了该消息但却未能发送出确认信息,那么就会让这一条消息被处理两次。第二个问题是关于性能的,这种策略中的代理必须为每条单个的消息维护多个状态(首先为了防止重复发送就要将消息锁定,然后,然后还要将消息标示为已使用后才能删除该消息)。另外还有一些棘手的问题需要处理,比如,对于那些以发出却未得到确认的消息该如何处理?

 消息传递语义(Message delivery semantics)

系统可以提供的几种可能的消息传递保障如下所示:

  • 最多一次—这种用于处理前段文字所述的第一种情况。消息在发出后立即标示为已使用,因此消息不会被发出去两次,但这在许多故障中都会导致消息丢失。
  • 至少一次—这种用于处理前文所述的第二种情况,系统保证每条消息至少会发送一次,但在有故障的情况下可能会导致重复发送。
  • 仅仅一次—这种是人们实际想要的,每条消息只会而且仅会发送一次。

这个问题已得到广泛的研究,属于“事务提交”问题的一个变种。提供仅仅一次语义的算法已经有了,两阶段或者三阶段提交法以及Paxos算法的一些变种就是其中的一些例子,但它们都有与生俱来的的缺陷。这些算法往往需要多个网络往返(round trip),可能也无法很好的保证其活性(liveness)(它们可能会导致无限期停机)。FLP结果给出了这些算法的一些基本的局限。

Kafka对元数据做了两件很不寻常的事情。一件是,代理将数据流划分为一组互相独立的分区。这些分区的语义由生产者定义,由生产者来指定每条消息属于哪个分区。一个分区内的消息以到达代理的时间为准进行排序,将来按此顺序将消息发送给使用者。这么一来,就用不着为每一天消息保存一条元数据(比如说,将消息标示为已使用)了,我们只需为使用者、话题和分区的每种组合记录一个“最高水位标记”(high water mark)即可。因此,标示使用者状态所需的元数据总量实际上特别小。在Kafka中,我们将该最高水位标记称为“偏移量”(offset),这么叫的原因将在实现细节部分讲解。

 使用者的状态

在Kafka中,由使用者负责维护反映哪些消息已被使用的状态信息(偏移量)。典型情况下,Kafka使用者的library会把状态数据保存到Zookeeper之中。然而,让使用者将状态信息保存到保存它们的消息处理结果的那个数据存储(datastore)中也许会更佳。例如,使用者也许就是要把一些统计值存储到集中式事物OLTP数据库中,在这种情况下,使用者可以在进行那个数据库数据更改的同一个事务中将消息使用状态信息存储起来。这样就消除了分布式的部分,从而解决了分布式中的一致性问题!这在非事务性系统中也有类似的技巧可用。搜索系统可用将使用者状态信息同它的索引段(index segment)存储到一起。尽管这么做可能无法保证数据的持久性(durability),但却可用让索引同使用者状态信息保存同步:如果由于宕机造成有一些没有刷新到磁盘的索引段信息丢了,我们总是可用从上次建立检查点(checkpoint)的偏移量处继续对索引进行处理。与此类似,Hadoop的加载作业(load job)从Kafka中并行加载,也有相同的技巧可用。每个Mapper在map任务结束前,将它使用的最后一个消息的偏移量存入HDFS。
这个决策还带来一个额外的好处。使用者可用故意回退(rewind)到以前的偏移量处,再次使用一遍以前使用过的数据。虽然这么做违背了队列的一般协约(contract),但对很多使用者来讲却是个很基本的功能。举个例子,如果使用者的代码里有个Bug,而且是在它处理完一些消息之后才被发现的,那么当把Bug改正后,使用者还有机会重新处理一遍那些消息。

 Push和Pull相关问题

还有一个,就是到底是应该让使用者从代理那里吧数据Pull(拉)回来还是应该让代理把数据Push(推)给使用者。和大部分消息系统一样,Kafka在这方面遵循了一种更加传统的设计思路:由生产者将数据Push给代理,然后由使用者将数据代理那里Pull回来。近来有些系统,比如scribe和flume,更着重于日志统计功能,遵循了一种非常不同的基于Push的设计思路,其中每个节点都可以作为代理,数据一直都是向下游Push的。上述两种方法都各有优缺点。然而,因为基于Push的系统中代理控制着数据的传输速率,因此它难以应付大量不同种类的使用者。我们的设计目标是,让使用者能以它最大的速率使用数据。不幸的是,在Push系统中当数据的使用速率低于产生的速率时,使用者往往会处于超载状态(这实际上就是一种拒绝服务攻击)。基于Pull的系统在使用者的处理速度稍稍落后的情况下会表现更佳,而且还可以让使用者在有能力的时候往往前赶赶。让使用者采用某种退避协议(backoff protocol)向代理表明自己处于超载状态,可以解决部分问题,但是,将传输速率调整到正好可以完全利用(但从不能过度利用)使用者的处理能力可比初看上去难多了。以前我们尝试过多次,想按这种方式构建系统,得到的经验教训使得我们选择了更加常规的Pull模型。

 分发

Kafka通常情况下是运行在集群中的服务器上。没有中央的“主”节点。代理彼此之间是对等的,不需要任何手动配置即可可随时添加和删除。同样,生产者和消费者可以在任何时候开启。 每个代理都可以在Zookeeper(分布式协调系统)中注册的一些元数据(例如,可用的主题)。生产者和消费者可以使用Zookeeper发现主题和相互协调。关于生产者和消费者的细节将在下面描述。

 生产者

生产者自动负载均衡
对于生产者,Kafka支持客户端负载均衡,也可以使用一个专用的负载均衡器对TCP连接进行负载均衡调整。专用的第四层负载均衡器在Kafka代理之上对TCP连接进行负载均衡。在这种配置的情况,一个给定的生产者所发送的消息都会发送给一个单个的代理。使用第四层负载均衡器的好处是,每个生产者仅需一个单个的TCP连接而无须同Zookeeper建立任何连接。不好的地方在于所有均衡工作都是在TCP连接的层次完成的,因而均衡效果可能并不佳(如果有些生产者产生的消息远多于其它生产者,按每个代理对TCP连接进行平均分配可能会导致每个代理接收到的消息总数并不平均)。

采用客户端基于zookeeper的负载均衡可以解决部分问题。如果这么做就能让生产者动态地发现新的代理,并按请求数量进行负载均衡。类似的,它还能让生产者按照某些键值(key)对数据进行分区(partition)而不是随机乱分,因而可以保存同使用者的关联关系(例如,按照用户id对数据使用进行分区)。这种分法叫做“语义分区”(semantic partitioning),下文再讨论其细节。
下面讲解基于zookeeper的负载均衡的工作原理。在发生下列事件时要对zookeeper的监视器(watcher)进行注册:

  • 加入了新的代理
  • 有一个代理下线了
  • 注册了新的话题
  • 代理注册了已有话题。

生产者在其内部为每一个代理维护了一个弹性的连接(同代理建立的连接)池。通过使用zookeeper监视器的回调函数(callback),该连接池在建立/保持同所有在线代理的连接时都要进行更新。当生产者要求进入某特定话题时,由分区者(partitioner)选择一个代理分区(参加语义分区小结)。从连接池中找出可用的生产者连接,并通过它将数据发送到刚才所选的代理分区。

 异步发送

对于可伸缩的消息系统而言,异步非阻塞式操作是不可或缺的。在Kafka中,生产者有个选项(producer.type=async)可用指定使用异步分发出产请求(produce request)。这样就允许用一个内存队列(in-memory queue)把生产请求放入缓冲区,然后再以某个时间间隔或者事先配置好的批量大小将数据批量发送出去。因为一般来说数据会从一组以不同的数据速度生产数据的异构的机器中发布出,所以对于代理而言,这种异步缓冲的方式有助于产生均匀一致的流量,因而会有更佳的网络利用率和更高的吞吐量。

 语义分区

下面看看一个想要为每个成员统计一个个人空间访客总数的程序该怎么做。应该把一个成员的所有个人空间访问事件发送给某特定分区,因此就可以把对一个成员的所有更新都放在同一个使用者线程中的同一个事件流中。生产者具有从语义上将消息映射到有效的Kafka节点和分区之上的能力。这样就可以用一个语义分区函数将消息流按照消息中的某个键值进行分区,并将不同分区发送给各自相应的代理。通过实现kafak.producer.Partitioner接口,可以对分区函数进行定制。在缺省情况下使用的是随即分区函数。上例中,那个键值应该是member_id,分区函数可以是hash(member_id)%num_partitions。

 对Hadoop以及其它批量数据装载的支持

具有伸缩性的持久化方案使得Kafka可支持批量数据装载,能够周期性将快照数据载入进行批量处理的离线系统。我们利用这个功能将数据载入我们的数据仓库(data warehouse)和Hadoop集群。
批量处理始于数据载入阶段,然后进入非循环图(acyclic graph)处理过程以及输出阶段(支持情况在这里)。支持这种处理模型的一个重要特性是,要有重新装载从某个时间点开始的数据的能力(以防处理中有任何错误发生)。
对于Hadoop,我们通过在单个的map任务之上分割装载任务对数据的装载进行了并行化处理,分割时,所有节点/话题/分区的每种组合都要分出一个来。Hadoop提供了任务管理,失败的任务可以重头再来,不存在数据被重复的危险。

 实施细则

下面给出了一些在上一节所描述的低层相关的实现系统的某些部分的细节的简要说明。

 API 设计

生产者
APIs生产者 API 是给两个底层生产者的再封装

// -kafka.producer.SyncProducerandkafka.producer.async.AsyncProducer.
class Producer {
        
  /* Sends the data, partitioned by key to the topic using either the */
  /* synchronous or the asynchronous producer */
  public void send(kafka.javaapi.producer.ProducerData producerData);
  /* Sends a list of data, partitioned by key to the topic using either */
  /* the synchronous or the asynchronous producer */
  public void send(java.util.List< kafka.javaapi.producer.ProducerData> producerData);
  /* Closes the producer and cleans up */
  public void close();
}

该API的目的是将生产者的所有功能通过一个单个的API公开给其使用者(client)。新建的生产者可以:

  • 对多个生产者请求进行排队/缓冲并异步发送批量数据 —— kafka.producer.Producer提供了在将多个生产请求序列化并发送给适当的Kafka代理分区之前,对这些生产请求进行批量处理的能力(producer.type=async)。批量的大小可以通过一些配置参数进行控制。当事件进入队列时会先放入队列进行缓冲,直到时间到了queue.time或者批量大小到达batch.size为止,后台线程(kafka.producer.async.ProducerSendThread)会将这批数据从队列中取出,交给kafka.producer.EventHandler进行序列化并发送给适当的kafka代理分区。通过event.handler这个配置参数,可以在系统中插入一个自定义的事件处理器。在该生产者队列管道中的各个不同阶段,为了插入自定义的日志/跟踪代码或者自定义的监视逻辑,如能注入回调函数会非常有用。通过实现kafka.producer.asyn.CallbackHandler接口并将配置参数callback.handler设置为实现类就能够实现注入。
  • 使用用户指定的Encoder处理数据的序列化(serialization)
    interface Encoder<T> {
      public Message toMessage(T data);
    }

    Encoder的缺省值是一个什么活都不干的kafka.serializer.DefaultEncoder。

  • 提供基于zookeeper的代理自动发现功能 —— 通过使用zk.connect配置参数指定zookeeper的连接url,就能够使用基于zookeeper的代理发现和负载均衡功能。在有些应用场合,可能不太适合于依赖zookeeper。在这种情况下,生产者可以从broker.list这个配置参数中获得一个代理的静态列表,每个生产请求会被随即的分配给各代理分区。如果相应的代理宕机,那么生产请求就会失败。
  • 通过使用一个可选性的、由用户指定的Partitioner,提供由软件实现的负载均衡功能 —— 数据发送路径选择决策受kafka.producer.Partitioner的影响。
    interface Partitioner<T> {
       int partition(T key, int numPartitions);
    }

    分区API根据相关的键值以及系统中具有的代理分区的数量返回一个分区id。将该id用作索引,在broker_id和partition组成的经过排序的列表中为相应的生产者请求找出一个代理分区。缺省的分区策略是hash(key)%numPartitions。如果key为null,那就进行随机选择。使用partitioner.class这个配置参数也可以插入自定义的分区策略。

使用者API
我们有两个层次的使用者API。底层比较简单的API维护了一个同单个代理建立的连接,完全同发送给服务器的网络请求相吻合。该API完全是无状态的,每个请求都带有一个偏移量作为参数,从而允许用户以自己选择的任意方式维护该元数据。
高层API对使用者隐藏了代理的具体细节,让使用者可运行于集群中的机器之上而无需关心底层的拓扑结构。它还维护着数据使用的状态。高层API还提供了订阅同一个过滤表达式(例如,白名单或黑名单的正则表达式)相匹配的多个话题的能力。

底层API

class SimpleConsumer {
        
  /* Send fetch request to a broker and get back a set of messages. */
  public ByteBufferMessageSet fetch(FetchRequest request);
  /* Send a list of fetch requests to a broker and get back a response set. */
  public MultiFetchResponse multifetch(List<FetchRequest> fetches);
  /**
   * Get a list of valid offsets (up to maxSize) before the given time.
   * The result is a list of offsets, in descending order.
   * @param time: time in millisecs,
   *              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.
   *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
   */
  public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}

底层API不但用于实现高层API,而且还直接用于我们的离线使用者(比如Hadoop这个使用者),这些使用者还对状态的维护有比较特定的需求。
高层API

/* create a connection to the cluster */
ConsumerConnector connector = Consumer.create(consumerConfig);
interface ConsumerConnector {
        
  /**
   * This method is used to get a list of KafkaStreams, which are iterators over
   * MessageAndMetadata objects from which you can obtain messages and their
   * associated metadata (currently only topic).
   *  Input: a map of <topic, #streams>
   *  Output: a map of <topic, list of message streams>
   */
  public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
  /**
   * You can also obtain a list of KafkaStreams, that iterate over messages
   * from topics that match a TopicFilter. (A TopicFilter encapsulates a
   * whitelist or a blacklist which is a standard Java regex.)
   */
  public List<KafkaStream> createMessageStreamsByFilter(
      TopicFilter topicFilter, int numStreams);
  /* Commit the offsets of all messages consumed so far. */
  public commitOffsets()
  
  /* Shut down the connector */
  public shutdown()
}

该API的中心是一个由KafkaStream这个类实现的迭代器(iterator)。每个KafkaStream都代表着一个从一个或多个分区到一个或多个服务器的消息流。每个流都是使用单个线程进行处理的,所以,该API的使用者在该API的创建调用中可以提供所需的任意个数的流。这样,一个流可能会代表多个服务器分区的合并(同处理线程的数目相同),但每个分区只会把数据发送给一个流中。
createMessageStreams方法为使用者注册到相应的话题之上,这将导致需要对使用者/代理的分配情况进行重新平衡。为了将重新平衡操作减少到最小。该API鼓励在一次调用中就创建多个话题流。createMessageStreamsByFilter方法为发现同其过滤条件想匹配的话题(额外地)注册了多个监视器(watchers)。应该注意,createMessageStreamsByFilter方法所返回的每个流都可能会对多个话题进行迭代(比如,在满足过滤条件的话题有多个的情况下)。

 网络层

网络层就是一个特别直截了当的NIO服务器,在此就不进行过于细致的讨论了。sendfile是通过给MessageSet接口添加了一个writeTo方法实现的。这样就可以让基于文件的消息更加高效地利用transferTo实现,而不是使用线程内缓冲区读写方式。线程模型用的是一个单个的接收器(acceptor)线程和每个可以处理固定数量网络连接的N个处理器线程。这种设计方案在别处已经经过了非常彻底的检验,发现其实现起来简单、运行起来很快。其中使用的协议一直都非常简单,将来还可以用其它语言实现其客户端。

 消息

消息由一个固定大小的消息头和一个变长不透明字节数字的有效载荷构成(opaque byte array payload)。消息头包含格式的版本信息和一个用于探测出坏数据和不完整数据的CRC32校验。让有效载荷保持不透明是个非常正确的决策:在用于序列化的代码库方面现在正在取得非常大的进展,任何特定的选择都不可能适用于所有的使用情况。都不用说,在Kafka的某特定应用中很有可能在它的使用中需要采用某种特殊的序列化类型。MessageSet接口就是一个使用特殊的方法对NIOChannel进行大宗数据读写(bulk reading and writing to an NIOChannel)的消息迭代器。

 消息的格式

/**
         * A message. The format of an N byte message is the following:
         *
         * If magic byte is 0
         *
         * 1. 1 byte "magic" identifier to allow format changes
         *
         * 2. 4 byte CRC32 of the payload
         *
         * 3. N - 5 byte payload
         *
         * If magic byte is 1
         *
         * 1. 1 byte "magic" identifier to allow format changes
         *
         * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
         *
         * 3. 4 byte CRC32 of the payload
         *
         * 4. N - 6 byte payload
         *
         */

 日志

具有两个分区的、名称为”my_topic”的话题的日志由两个目录组成(即:my_topic_0和my_topic_1),目录中存储的是内容为该话题的消息的数据文件。日志的文件格式是一系列的“日志项”;每条日志项包含一个表示消息长度的4字节整数N,其后接着保存的是N字节的消息。每条消息用一个64位的整数偏移量进行唯一性标示,该偏移量表示了该消息在那个分区中的那个话题下发送的所有消息组成的消息流中所处的字节位置。每条消息在磁盘上的格式如下文所示。每个日志文件的以它所包含的第一条消息的偏移量来命名。因此,第一个创建出来的文件的名字将为00000000000.kafka,随后每个后加的文件的名字将是前一个文件的文件名大约再加S个字节所得的整数,其中,S是配置文件中指定的最大日志文件的大小。

消息的确切的二进制格式都有版本,它保持为一个标准的接口,让消息集可以根据需要在生产者、代理、和使用者直接进行自由传输而无须重新拷贝或转换。其格式如下所示:

On-disk format of a message
message length : 4 bytes (value: 1+4+n)
"magic" value  : 1 byte
crc            : 4 bytes
payload        : n bytes

将消息的偏移量作为消息的可不常见。我们原先的想法是使用由生产者产生的GUID作为消息id,然后在每个代理上作一个从GUID到偏移量的映射。但是,既然使用者必须为每个服务器维护一个ID,那么GUID所具有的全局唯一性就失去了价值。更有甚者,维护将从一个随机数到偏移量的映射关系带来的复杂性,使得我们必须使用一种重量级的索引结构,而且这种结构还必须与磁盘保持同步,这样我们还就必须使用一种完全持久化的、需随机访问的数据结构。如此一来,为了简化查询结构,我们就决定使用一个简单的依分区的原子计数器(atomic counter),这个计数器可以同分区id以及节点id结合起来唯一的指定一条消息;这种方法使得查询结构简化不少,尽管每次在处理使用者请求时仍有可能会涉及多次磁盘寻道操作。然而,一旦我们决定使用计数器,跳向直接使用偏移量作为id就非常自然了,毕竟两者都是分区内具有唯一性的、单调增加的整数。既然偏移量是在使用者API中并不会体现出来,所以这个决策最终还是属于一个实现细节,进而我们就选择了这种更加高效的方式。

 写操作

日志可以顺序添加,添加的内容总是保存到最后一个文件。当大小超过配置中指定的大小(比如说1G)后,该文件就会换成另外一个新文件。有关日志的配置参数有两个,一个是M,用于指出写入多少条消息之后就要强制OS将文件刷新到磁盘;另一个是S,用来指定过多少秒就要强制进行一次刷新。这样就可以保证一旦发生系统崩溃,最多会有M条消息丢失,或者最长会有S秒的数据丢失,

 读操作

可以通过给出消息的64位逻辑偏移量和S字节的数据块最大的字节数对日志文件进行读取。读取操作返回的是这S个字节中包含的消息的迭代器。S应该要比最长的单条消息的字节数大,但在出现特别长的消息情况下,可以重复进行多次读取,每次的缓冲区大小都加倍,直到能成功读取出这样长的一条消息。也可以指定一个最大的消息和缓冲区大小并让服务器拒绝接收比这个大小大一些的消息,这样也能给客户端一个能够读取一条完整消息所需缓冲区的大小的上限。很有可能会出现读取缓冲区以一个不完整的消息结尾的情况,这个情况用大小界定(size delimiting)很容易就能探知。

从某偏移量开始进行日志读取的实际过程需要先找出存储所需数据的日志段文件,从全局偏移量计算出文件内偏移量,然后再从该文件偏移量处开始读取。搜索过程通过对每个文件保存在内存中的范围值进行一种变化后的二分查找完成。
日志提供了获取最新写入的消息的功能,从而允许从“当下”开始消息订阅。这个功能在使用者在SLA规定的天数内没能正常使用数据的情况下也很有用。当使用者企图从一个并不存在的偏移量开始使用数据时就会出现这种情况,此时使用者会得到一个OutOfRangeException异常,它可以根据具体的使用情况对自己进行重启或者仅仅失败而退出。

以下是发送给数据使用者(consumer)的结果的格式。

MessageSetSend (fetch result)
total length     : 4 bytes
error code       : 2 bytes
message 1        : x bytes
...
message n        : x bytes
MultiMessageSetSend (multiFetch result)
total length       : 4 bytes
error code         : 2 bytes
messageSetSend 1
...
messageSetSend n

 删除

一次只能删除一个日志段的数据。 日志管理器允许通过可加载的删除策略设定删除的文件。 当前策略删除修改事件超过 N 天以上的文件,也可以选择保留最后 N GB 的数据。 为了避免删除时的读取锁定冲突,我们可以使用副本写入模式,以便在进行删除的同时对日志段的一个不变的静态快照进行二进制搜索。

 数据正确性保证

日志功能里有一个配置参数M,可对在强制进行磁盘刷新之前可写入的消息的最大条目数进行控制。在系统启动时会运行一个日志恢复过程,对最新的日志段内所有消息进行迭代,以对每条消息项的有效性进行验证。一条消息项是合法的,仅当其大小加偏移量小于文件的大小并且该消息中有效载荷的CRC32值同该消息中存储的CRC值相等。在探测出有数据损坏的情况下,就要将文件按照最后一个有效的偏移量进行截断。
要注意,这里有两种必需处理的数据损坏情况:由于系统崩溃造成的未被正常写入的数据块(block)因而需要截断的情况以及由于文件中被加入了毫无意义的数据块而造成的数据损坏情况。造成数据损坏的原因是,一般来说OS并不能保证文件索引节点(inode)和实际数据块这两者的写入顺序,因此,除了可能会丢失未刷新的已写入数据之外,在索引节点已经用新的文件大小更新了但在将数据块写入磁盘块之前发生了系统崩溃的情况下,文件就可能会获得一些毫无意义的数据。CRC值就是用于这种极端情况,避免由此造成整个日志文件的损坏(尽管未得到保存的消息当然是真的找不回来了)。

 分发

 Zookeeper目录

接下来讨论zookeeper用于在使用者和代理直接进行协调的结构和算法。

 记法

当一个路径中的元素是用xyz这种形式表示的时,其意思是, xyz的值并不固定而且实际上xyz的每种可能的值都有一个zookpeer z节点(znode)。例如,/topics/topic表示了一个名为/topics的目录,其中包含的子目录同话题对应,一个话题一个目录并且目录名即为话题的名称。也可以给出数字范围,例如0…5,表示的是子目录0、1、2、3、4。箭头->用于给出z节点的内容。例如/hello -> world表示的是一个名称为/hello的z节点,包含的值为”world”。

 代理节点的注册

/brokers/ids/[0...N] --> host:port (ephemeral node)

上面是所有出现的代理节点的列表,列表中每一项都提供了一个具有唯一性的逻辑代理id,用于让使用者能够识别代理的身份(这个必须在配置中给出)。在启动时,代理节点就要用/brokers/ids下列出的逻辑代理id创建一个z节点,并在自己注册到系统中。使用逻辑代理id的目的是,可以让我们在不影响数据使用者的情况下就能把一个代理搬到另一台不同的物理机器上。试图用已在使用中的代理id(比如说,两个服务器配置成了同一个代理id)进行注册会导致发生错误。
因为代理是以非长久性z节点的方式注册的,所以这个注册过程是动态的,当代理关闭或宕机后注册信息就会消失(至此要数据使用者,该代理不再有效)。

 代理话题的注册

/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)

每个代理会都要注册在某话题之下,注册后它会维护并保存该话题的分区总数。

 使用者和使用者小组

为了对数据的使用进行负载均衡并记录使用者使用的每个代理上的每个分区上的偏移量,所有话题的使用者都要在Zookeeper中进行注册。
多个使用者可以组成一个小组共同使用一个单个的话题。同一小组内的每个使用者共享同一个给定的group_id。比如说,如果某个使用者负责用三台机器进行某某处理过程,你就可以为这组使用者分配一个叫做“某某”的id。这个小组id是在使用者的配置文件中指定的,并且这就是你告诉使用者它到底属于哪个组的方法。
小组内的使用者要尽量公正地划分出分区,每个分区仅为小组内的一个使用者所使用。

 使用者ID的注册

除了小组内的所有使用者都要共享一个group_id之外,每个使用者为了要同其它使用者区别开来,还要有一个非永久性的、具有唯一性的consumer_id(采用hostname:uuid的形式)。 consumer_id要在以下的目录中进行注册。

/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)

小组内的每个使用者都要在它所属的小组中进行注册并采用consumer_id创建一个z节点。z节点的值包含了一个<topic, #streams>的map。 consumer_id只是用来识别小组内活跃的每个使用者。使用者建立的z节点是个临时性的节点,因此如果这个使用者进程终止了,注册信息也将随之消失。

 数据使用者偏移追踪数据

使用者跟踪他们在每个分区中耗用的最大偏移量。这个值被存储在一个Zookeeper(分布式协调系统)目录中。

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)

 分区拥有者注册表

每个代理分区都被分配给了指定使用者小组中的单个数据使用者。数据使用者必须在耗用给定分区前确立对其的所有权。要确立其所有权,数据使用者需要将其 id 写入到特定代理分区中的一个临时节点(ephemeral node)中。

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)

 代理节点的注册

代理节点之间基本上都是相互独立的,因此它们只需要发布它们拥有的信息。当有新的代理加入进来时,它会将自己注册到代理节点注册目录中,写下它的主机名和端口。代理还要将已有话题的列表和它们的逻辑分区注册到代理话题注册表中。在代理上生成新话题时,需要动态的对话题进行注册。

 使用者注册算法

当使用者启动时,它要做以下这些事情:

1. 将自己注册到它属小组下的使用者id注册表。
2. 注册一个监视使用者id列的表变化情况(有新的使用者加入或者任何现有使用者的离开)的变化监视器。(每个变化都会触发一次对发生变化的使用者所属的小组内的所有使用者进行负载均衡。)
3. 主次一个监视代理id注册表的变化情况(有新的代理加入或者任何现有的代理的离开)的变化监视器。(每个变化都会触发一次对所有小组内的所有使用者负载均衡。)
4. 如果使用者使用某话题过滤器创建了一个消息流,它还要注册一个监视代理话题变化情况(添加了新话题)的变化监视器。(每个变化都会触发一次对所有可用话题的评估,以找出话题过滤器过滤出哪些话题。新过滤出来的话题将触发一次对该使用者所在的小组内所有的使用者负载均衡。)
5. 迫使自己在小组内进行重新负载均衡。

!

 使用者重新负载均衡的算法

使用者重新复杂均衡的算法可用让小组内的所有使用者对哪个使用者使用哪些分区达成一致意见。使用者重新负载均衡的动作每次添加或移除代理以及同一小组内的使用者时被触发。对于一个给定的话题和一个给定的使用者小组,代理分区是在小组内的所有使用者中进行平均划分的。一个分区总是由一个单个的使用者使用。这种设计方案简化了实施过程。假设我们运行多个使用者以并发的方式同时使用同一个分区,那么在该分区上就会形成争用(contention)的情况,这样一来就需要某种形式的锁定机制。如果使用者的个数比分区多,就会出现有写使用者根本得不到数据的情况。在重新进行负载均衡的过程中,我们按照尽量减少每个使用者需要连接的代理的个数的方式,尝尝试着将分区分配给使用者。
每个使用者在重新进行负载均衡时需要做下列的事情:

1. 针对Ci所订阅的每个话题T
   2.   将PT设为生产话题T的所有分区
   3.   将CG设为小组内同Ci 一样使用话题T的所有使用者
   4.   对PT进行排序(让同一个代理上的各分区挨在一起)
   5.   对CG进行排序
   6.   将i设为Ci在CG中的索引值并让N = size(PT)/size(CG)
   7.   将从i*N到(i+1)*N - 1的分区分配给使用者Ci
   8.   将Ci当前所拥有的分区从分区拥有者注册表中删除
   9.   将新分配的分区加入到分区拥有者注册表中
        (我们可能需要多次尝试才能让原先的分区拥有者释放其拥有权)

在触发了一个使用者要重新进行负载均衡时,同一小组内的其它使用者也会几乎在同时被触发重新进行负载均衡。

本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们

Posted in kafka, 大数据 | Leave a comment