日志:每个程序员都应该知道的实时数据的统一抽象

翻译自:http://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying 原文作者:Jay Kreps

六年前我在一个特别有意思的时间加入了LinkedIn。我们整体集中式的数据库刚刚开始显露出它的局限性,我们需要开始向分布式系统转变。这是一次特别有意思的经历:我们在那些日子里建立,实施,运行了一套分布式图状的数据库,一个分布式的后台搜索,Hadoop的安装,以及第一和第二代键值对存储。

其中我学到的最有用的事情之一是我们建立的所有的东西的核心都有的一个简单的概念:日志。有时叫做预写日志(write-ahead logs)有时叫提交日志(commit logs)或者事务日志(transaction logs),日志几乎伴随着计算机的始终,而且是很多分布式数据系统和实时应用架构的心脏。

如果你不懂日志,那你不可能完全理解数据库,NoSQL存储,键值存储,冗余备份,paxos,hadoop,版本控制或者其它大多数的软件系统;可惜,大多数的软件工程师对它们并不熟悉。我想改变这一切。在这篇文章里,我会带着你走遍你需要知道的关于日志的每一件事,包括什么是日志以及怎么使用日志来做数据集成,实时处理,和系统构建。

第一部分:什么是日志?

一条日志可能是最简单的存储抽象。它只能追加,完全按时间顺序排序。它看起来像这个样子:

记录被追加在日志的尾部,并且从左向右读取。每一个条目被分配一个唯一的顺序的条目编号。

记录的顺序是按“时间”的概念定义的,也就是说左边的记录比右边的记录旧一些。日志的条目编号可以被理解为条目的“时间戳”。在一开始就论述以时间顺序排列似乎看起来有些奇怪,但是它是一个非常方便的特性,可以和几乎任何特定的物理时钟解耦。当我们想去了解分布式系统时,这个概念是必不可少的。

在本文的讨论中,每条记录具体的内容和格式是不重要的。并且,我们也不能仅仅保持向日志中追加记录,因为我们会时不时地将空间耗尽。我会过一会儿回来论述这个问题。

其实,日志和文件或表格并不是完全不同的。文件是一个存储字符的数组,表格是一个存储记录的数组,而日志其实就是一个这样的表格或文件,不过其中的记录是按照时间顺序排列的。

在这里,你可能会质疑为什么我们要讲一些如此简单的东西?一个只能追加记录的序列又怎么会与数据系统有关系?问题的答案在于日志有一个特殊的目的:它们记录了在什么时间发生了什么事情。对于分布式系统来说,在很多方面,这就是问题的核心所在。

但是在我们继续深入之前,让我们先来对一些容易混淆的东西做一下澄清。每一个程序员都熟悉另外一种定义的日志 – 就是应用程序可能会通过syslog或者log4j将一些没有结构化的错误消息或者跟踪信息写到本地文件中。为了澄清概念,我把它称作“程序日志”。程序日志是本篇文章中论述的日志概念的一种退化形式。最大的不同在于那种文本日志的目的主要是用于人们的阅读和“日记”或者“数据记录”,而本文要论述的日志是为了程序访问建立的。

(事实上,如果你仔细想一想,通过人类来阅读各个机器上日志的想法多少有些不合时宜。当很多服务器和服务一起工作的时候,这种方法很快会变得无法管理并且日志的目的也会很快变成输入一些查询然后画图去了解跨各个机器的行为 – 这时英文的文本文件就比这里要讨论的结构化日志差远了。)

数据库日志

我不知道日志这个概念是从哪里起源的 – 它可能就像二叉查找一样简单的连发明者都没有意识到它是一个发明。早在IBM的System R中就论述过它。在数据库中使用是因为有保持同步的各种数据结构和索引的存在。为了保证原子性和健壮性,数据库会在修改其维护的各种数据结构之前,先将它想要修改的条目信息写到日志里。日志记录了发生的事情,每个表格或者索引将操作的历史记录成为数据结构或索引的形式。因为日志是立即保存的,所以在发生系统奔溃之类的事件后,日志被用做主要的还原材料用来还原其它的那些持久化的数据结构。

随着时间的推移,日志的用途不断增长,从ACID(Atomicity原子性, Consistency一致性, Isolation隔离性, Durability持久性)的具体实现到数据库之间数据的备份。事实证明,对于一个数据库上发生的改变组成的序列应该同步地备份到一个远程的数据库中。Oracle,MySQL和PostgreSQL包扩日志的传送协议传输部分的日志到备份用的数据库中,这些备份用的数据库扮演奴隶的角色。Oracle已经通过XStreamsGoldenGate将日志产品化地作为非oracle数据使用者的订阅机制,MySQL和PostgreSQL中也有类似的东西,它们是很多数据架构的关键组件。

因为这样的原因,机器可读日志这个概念很大程度上被局限在数据库内部。使用日志作为数据舒服的机制似乎是偶然出现的。但是它极端的抽象性是它非常适合支持不同种类的消息,数据流和实时数据处理。

分布式系统中的日志

日志解决两个问题 – 将变化排序和发布数据 – 在分布式数据系统中尤其重要。在更新的顺序上达成一致(同意或不同意以及边界效应的应对)是这些系统设计的核心问题。

日志为中心的分布式系统源于一个简单的观察,我将其称为状态机复制原则:

如果两个相同的,确定性的进程起始于相同的状态并且得到以相同顺序排列的相同的输入,那么它们会产生相同的输出且终止于相同的状态。

这看起来可能有些让人糊涂,所以让我们来详细的分析一下这是什么意思。

确定性(Deterministic)意思是指这个进程与时间无关,不会让任何外来的输入影响其结果。比如,如果一个进程会受到其执行线程的特定顺序影响,或者调用了gettimeofday或其它一些无法重现的东西的时候,一般被称作非确定性。

进程的“状态”是指在进程终结的时候一台机器上保留的所有数据的状态,既包括在内存中的,也包括在磁盘上的。

对于获取同样顺序的相同输入,这正是日志的老本行。它基于一个非常直观的概念:如果你给两个确定的代码输入同样的日志,他们会产生相同的输出。

一个程序的分布式计算就很明显了。你可以将如何让多台机器做同一件事这个问题简化成如何实现一个稳定的分布式日志来作为这些进程的输入。在这里,日志的目的是排除所有的非确定性输入流,而保证每一个副本进程的输入保持同步。

关于这个原则,没有什么更多的东西供你了解了,简而言之,它就是在说“确定的过程产生确定的结果”。不过我认为它是分布式系统设计中最常用的工具之一。

这种方法的魅力之一,是日志中作为索引的时间戳扮演了所有副本状态的时钟的概念–你可以通过一个简单的数字(该副本上执行过的日志条目的时间中最后面的那个时间)来描述一个副本。用这个时间戳加上日志,可以描述副本整个的状态。

在系统中,根据日志的不同内容,有多种方式应用该原则。例如,我们可以记录下服务器的输入请求,或者记录下在响应请求的各个阶段服务的状态变化,或者是它执行的转换后的命令。理论上,我们甚至为每一台副本记录下其上运行的机器指令序列或者执行的函数名及参数。只要两个进程用相同的方式执行相同的输入,那么在不同副本上运行的进程会保持相同的状态。

对于日志的应用,不同领域的人们可能有不同的说法。搞数据库的人们在物理日志和逻辑日志两方面往往是不同的。物理日志意思是记录下每一行变化的数据。逻辑日志的意思不是记录变化的行,而是记录抵达该行数据变化的SQL命令(插入,更新以及删除)。

分布式系统的文献中,常常就处理和备份这两个方面做区分。“状态机模式(state machine model)”通常指一种“主动-主动(active-active)”模式,在这种模式中,我们持续地记录输入的请求以及每个副本对每个请求的处理。对这种模式稍加更改,就成为了“主从模式(primary-backup model)”,该模式选取一个副本作为主节点,并允许该主节点服务器请求按照到达的顺序处理这些请求,并且记录下在处理请求过程中其上各个状态的变化。其他的从属节点根据主节点的状态变化的顺序作出相应的改变,它们会与主节点保持同步,并且时刻准备着在主节点挂掉的时候取代其成为新的主节点。

alt

为了理解这两个模式的不同,让我们来看一个简单的例子。考虑一个具有副本节点的“算数运算服务器”,其上的状态用一个数字表示,并且对该数字的值运行加法和乘法。那么一个“主动-主动”的模式,可能记录下其上运行的转换命令,例如“+1”, “*2”等等,每一个副本将会运行这些转换命令从而得到相同的值。“主动-被动”模式会通过一个主节点来执行这些转换然后记录下其结果,例如“1”,“3”,“6”等。这个例子也很好的解释了为什么顺序是在副本间保持一致性的关键:如果改变加法和乘法的顺序,自然会得到不同的结果。

分布式日志可以被看做是一种用来表示一致性问题的数据结构模型。一条日志,总而言之,表示一系列关于“下一个”追加的值的决定。你不得不眯着眼睛仔细地用Paxos算法观察一条日志,即使建立日志是它们最常见的实际应用。在Paxos中,常常使用一个叫做“multi-paxos”的扩展协议,该协议将日志建模成为一个一致性问题的序列,日志中的每一格表示一个一致性问题。日志相比于其他协议比如ZABRAFTViewstamped Replication来说更为先进,它直接对维护分布式和一致性日志的问题进行了建模。

我怀疑我们对于一致性问题的观点在历史的轨道中有失偏颇,可能是由于近几年来在分布式计算领域方面的理论的发展超过了其现实中的应用。在现实中,一致性问题有点太简单了。计算机系统很少需要决定一个单独的值,他们几乎总是处理一系列请求。所以日志,相比一个简单的单值注册器来说,是一个更自然的抽象。

此外,对于算法的专注遮蔽了日志抽象系统中相关的需要。我觉得我们会变得更多地将日志视为一个成型的建筑物而非它具体的实现细节,正如我们日常谈论的哈希表而不关心它到底是基于线性探测的murmur哈希还是其他变种那样。日志会变成一个成型的接口,其内由很多算法和实现相互竞争而提供最好的保障和最优的性能。

更新日志101:表和事件成双成对

让我们先回到数据库一会儿。在用以记录变化的日志和一个表之间,存在千丝万缕的联系。比方说日志是所有信用记录,贷款和银行操作的记录;表就是当前所有账号的平衡。如果你有一个记录变化的日志,你就可以用这些变化来生成一个表用以取得当前的状态。这个表会为每个键(作为一个特点的日志时间)记录最新的状态。这种感觉,就好像日志是一个更加基本的数据结构,它除了可以用来创建原始表,也可以用来创建各种派生表。(嗯,是的,表可以看做是对于无关数据的加键存储)。

这个过程反过来也成立:如果你有一个表记录了更新,你可以将这个表的所有状态的更新存储并发布成一个“更新日志(changelog)”。这个“更新日志”就正是你做“近实时”备份所需要的。所以从这个角度你可以看到,表和事件是成双成对的:表提供数据,日志记录变化。日志的牛逼之处在于,如果它是一个完整的记录变化的日志,它不仅包含了表的最终版本的内容,而且运行从新创建任何存在过的版本。它高效,且按照表的所有历史状态备份排序。

alt

这也许会让你想起源代码的版本控制。在版本控制和数据库直接有一个紧密的关系。版本控制解决了一个分布式数据系统需要解决的非常相似的问题-管理分布式的并发的状态改变。版本控制系统常常以一系列补丁作为模型,这实际上就是日志。你直接与一个当前代码的“snapshot”打交道,这与表是类似的。你会发现在版本控制系统中,如同在分布式系统中一样,通过日志进行备份:当你更新的时候,你通过“拉”的方式获取最新的补丁,然后将其运用道自己当前的代码上。

最近一些同学从Datomic看到了一些想法,这个公司销售一种以日志为中心的数据库。这个介绍讲解了他们是如何将这种想法应用到他们的系统上的。当然,无独有偶,这些想法仅是这十年来分布式系统和数据库方面的文献的一部分。

这些看起来都有点理论化,别失望,我们会很快进入实战环节。

接下来呢

在本文的后续章节中,我会在内部分布式计算或抽象分布式计算模型之上,描述怎样的日志是一个好的日志。包括:

  1. 数据集成 – 让所有数据都能够在其存储和处理系统的组织中容易地被访问。
  2. 实时数据处理 – 运算导出数据流。
  3. 分布式系统设计 – 如何通过日志为中心的设计来让实际生产中的系统变的更简单。

这些问题的解决都围绕这一个想法,即将日志看做是一个独立的服务。

实践证明,一个有效的日志,源自于一个日志提供的简单的功能,即:生产一个持续的,可回放的历史记录。令人惊讶的是,这些问题的核心是一种能力,要求多台机器可以在确定的方式下,以它们自有的速度回放历史记录。

第二部分:数据集成

然我们首先来看看什么是“数据集成”并且为什么它很重要,然后让我们看看它为啥和日志有关系。

数据集成是指为数据建立一种可以使数据在所有服务和系统间使用的的组织形式。

“数据集成”这个名词也许不够通用,但是我找不到更好的词了。更为常见的术语“ETL”一般仅覆盖了数据集成的一部分内容–填充关系型数据仓库(relational data warehouse)。但我更想介绍是如何用ETL来做实时系统和流处理。

你估计在现下流行的对大数据的吹捧中,对数据集成鲜有耳闻,不过我相信,“如何使数据可用”这一现实问题是一个企业更值得关注的问题。

数据的有效使用一定程度上遵循马斯洛的需求层次理论。这个金字塔的基础包括收集相关数据,将其聚在一起并放在一个可以处理的环境中(可以是一个牛逼的实时查询系统或者仅仅是一个文本文件加上一个python的脚本)。数据需要生成为一种同一的模型,使其能够很容易的读和处理。一旦数据的同一形式建立以后,那就可以开始构建基础设施,来用多种方式处理数据了 – 比如MapReduce,实时查询系统等。

值得一提的一个显而易见的问题是:如果没有一个可靠的完整的数据流,Hadoop集群会比那些昂贵且难以组装的空调还难搞定。一旦数据和其处理构建好了,就可以将注意力转向更优雅的数据模型和连贯容易理解的语义了。最终,注意力将转换到更复杂的处理–更好的抽象,报告系统,以及处理和预测的算法。

在我看来,大部分的组织在金字塔底层都有很大的漏洞–它们还没有可靠且完整的数据流,就试图直接跳到高级数据建模的技术。这完全是本末倒置。

所以,问题是,我们如何才能构建一个可靠的数据流,来为一个组织中所有数据系统服务呢?

数据集成:两个难点

两个趋势使数据集成变得更加困难。

事件数据井喷

第一个趋势是越来越多的事件数据。事件数据记录了事情的发生而非事情本身。在网络系统中,这不仅是用户活动的记录,还包括机器层面的事件和对可靠的操作以及数据中心机器的价值的监视数据的统计。人们一般称其为“日志数据”,因为它常常写入应用程序的日志,但混淆了形式和功能。这个数据是现代网络的核心:Google的财富,毕竟,它是由建立在点击和展示次数上相关的管道构建的–这就是,事件。

这个东西不局限在互联网公司,只是完全数字化的互联网公司更容易使用它而已。金融数据早就已经以事件为中心了。射频识别(RFID)将这种记录的方式延伸到物理世界中。我认为这种趋势将会持续,传统的商业活动将数字化。

这种类型的事件数据,记录了发生的事情,并且往往比传统数据库大几个数量级。这给处理带来了明显的挑战。

专有数据系统的爆炸

第二个趋势来自于在过去5年来,各种专用数据库系统浮出水面,且尝尝以免费的形式出现。专用系统普遍存在于OLAP(在线分析处理),搜索简单在线存储](http://cassandra.apache.org/),批处理图形处理一些地方。

越来越多的数据,越来愈多的数据种类,以及越来越高的将这些数据放入更多系统的需求,导致了非常巨大的数据集成问题。

日志结构的数据流

日志是一种用来处理系统间数据流的自然数据结构。其方法极其简单:

取得所有组织的数据并将其放入中央日志中用来提供实时订阅。

每一个逻辑数据源都可以建模成它自己的日志。数据源可以是一个记录外部事件(比如点击或者页面访问)的应用程序,或者是一个支持修改的数据库表。每一个订阅它的系统用最快的速度从这个日志中读取信息,并将每个新的记录应用到它自己的存储中,随即递增其在日志中的位置。订阅者可以是任何类型的数据系统 – 一个高速缓存,Hadoop,其它网站的其它数据库,一个搜索引擎,等。

例如,日志的概念对每一个变化给出了一个可以被所有订阅者测量的逻辑时钟。这大大方便了不同状态的订阅者系统之间的交流,他们在日志的读取过程中,如同有一个共同的遵循的“时间点”。

具体来说,设想一个简单的情景,有一个数据库和一堆缓存服务器。由于系统间共享一个时间点的关系,日志因此可以提供一个同步所有服务器间更新的方法。比如我们在日志里写一个记录X,然后我们需要从缓存中做一次读取。如果我们想要保住我们没有看到过时的数据,我们仅仅需要确保我们没有从那些没有复制到X的缓存中读取就可以了。

日志也扮演了缓冲器(buffer)的角色,用来实现异步的数据消费(consumption)。有很多时候这都是很重要的,尤其是当有多个订阅者,他们也许用不同的频率消费数据。这意味着它允许订阅系统突然死机或者中断去维护一下再重启起来:订阅系统自己控制着自己消费数据的节奏。批处理系统比如Hadoop或者data warehouse也许按照每小时或者每天一次的频率消费数据,而一个实时查询系统也许需要每秒钟都在消费。原始的数据源和日志都和最终的那些各种各样的数据处理系统无关,消费系统可以随意增加和删除而不影响传输的过程。

特别重要的是:最终用来处理数据的目标系统仅仅知道日志而对原系统的细节一无所知。这种消费系统自己不用管是否数据来自RDBMS还是新型的键值存储器又或是其产生没经过某种实时查询系统。这看起来很不起眼,但的确是十分关键的。

我在这里使用“日志(log)”一词而不使用“消息系统(message system)”或者是“发布/订阅”,是因为它具有更特殊的语义,能更贴切地描述那个当你想要实现数据复制所需要的东西。我觉得”发布/订阅“和消息间接寻址差不多–如果你比较任何两个有前途的“发布/订阅”式的系统,你会发现他们保证非常不同的东西,大多数模块在这个领域都没什么用。你可以将日志想成扮演了一个拥有持久化保证和强顺序语义的消息系统。在分布式系统中,这种通信模块有时候被命名为原子广播(有点糟糕的名字)。

“每一个成功的数据管道都被设计地像日志;每一个坏掉的数据管道各有不同” – Cout Leo Tolstoy (作者自行翻译。)

(【译者注】原文是:“Happy families are all alike; every unhappy family is unhappy in its own way.”幸福的家庭都很相似,不幸的家庭各有不同。)

值得强调的是,日志依然仅仅是基础设施。它不是数据传输纽带故事的结局:余下的故事围绕着元数据(metadata),架构(schemas),兼容性以及其他控制数据结构和演化的细节。但是在一个可靠的,通用的处理数据流结构的方法出现以前,这些所谓的细节都是次要的。

在LinkedIn

当LinkedIn从一个集中关系数据库到一个分布式系统集群的演进过程中,我目睹了数据集成问题的快速产生。这些日子我们的主要数据系统包括:

这些每一个都是一个专用分布系统在其专有领域内提供高级的功能。

采用日志作为流动在LinkedIn间数据流的想法在我到那里之前其实就已经诞生了。在基础设施中我们最早开发的一个服务叫做databus,它是我们早期的Oracle数据库表的之上的一个抽象层,提供了日志的缓存功能,用来扩展数据库变化的订阅,以此使我们能够为社交图表提供数据和搜素索引。

接下来我会介绍一点历史背景。我是在2008年左右也就在我们已经发布了我们的键-值存储之后参与其中的。我的下一个项目是尝试配置一个Hadoop让其工作,并将一部分我们的推荐处理转移到上面去。我们在这方面缺乏经验,采用很自然的方法规划了几周时间做数据导入和导出,然后余下的时间我们就用来实现一些很炫的预测算法。从此开始了漫长的跋涉。

我们一开始的计划仅仅是从我们已有的Oracle data warehouse中取出数据。第一个发现是想要从Oracle快速提取数据是不现实的。更糟糕的是,data warehouse的处理和我们计划的Hadoop批处理不合适–很多处理都不可逆而且只对特定的报告有效。我们停止了data warehouse的使用转而直接使用元数据库和日志文件。最终我们实现了另一个通道用来将读取的数据写入我们的键值存储来提供结果。

这个普通数据拷贝被原始开发中的一个主导元素终结了。更糟的是,随便如果哪个管道在何时出现了问题,hadoop系统就变得非常没用–任何用坏数据来运行的算法,越是酷炫,得到的数据就越糟糕。

虽然我们已经将每样东西都用一种通用的方法构建,每一个新的数据源都通过可定制的配置文件进行配置。它还是被证明了是大量错误和失败的根源。我们基于hadoop实现的网站功能变得很受欢迎,我们也发现有很多对我们感兴趣的工程师。每一个用户有一大堆系统想要和一大堆新的数据源集成。

一些事情慢慢在我眼前清晰起来。

第一,我们一直以来构建的管道,即便有很多混乱的地方,但是确实非常有价值。仅仅是使数据能够在新的处理系统(hadoop)上处理,就为我们打开了很多的可能性。对数据的新的运算变成了可能,而这是在以前很难做到的。很多新的产品和分析仅仅诞生于将多个数据单元放到一起,而这在之前的专用系统中是被锁住无法运用的。

第二,很明显,可靠的数据负载需要植根于的数据管道的支持。如果我们获得所有我们需要的结构,我们就可以让hadoop自动加载数据,这样添加新数据源或者处理模式(schema)的变化都不需要投入手工–数据会魔法般的出现在HDFS中,并且Hive tables会为新数据自动生成合适的列。

第三,我们依然只有很低的数据覆盖量。那是指,如果你看看LinkedIn在Hadoop上的数据占其全部数据的百分比,就可以知道它其实只是非常不完整的一部分。想要完全覆盖并不是件容易的事,对每一个投入运行的新数据源都需要很大的投入。

我们为每个数据源和数据终端进行的处理,和可定制数据负载的构建,显然是不可能的。我们有成千上万的数据系统和数据库。如果将这些所有都连起来,那将导致对每一批系统都要构建可定制的管道,如下图所示:

请注意,数据常常从两个方向流过来,很多系统(数据库,Hadoop)既是数据源也是数据的目的地。这意味着我们最终结果是需要为每个系统建立两条管道:一条用来获得输入的数据,一条用来获得输出的数据。

这显然需要耗费大量的人力且永远不能成功。要达到完全连接,结果会需要N的平方个管道。

所以,我们应该转而采用如下更通用的东西:

尽可能多的,我们需要将数据消费者和数据源隔离开。在理想状态下,他们应该仅仅与单一的数据来源连接,而这个数据来源会提供一切他们需要的东西。

这个想法是加入一个新的数据系统–它可以是一个数据源或者是一个数据目的地–仅仅需要创建它与单一数据管道的连接,而不需要连接每一个数据的消费者。

经验让我专注于构建Kafka,用来将那些我们在消息系统中看到的东西和数据库及分布式系统内部流行的日志的概念结合起来。我们需要有样东西能够在所有活动的数据,以及最终的比如在Hadoop之外的数据部署,数据监控等各种用途中扮演中央管道的角色。

在很长的时间里,Kafka是一个很独特(或者说很奇怪)的基础设施产品–既不是一个数据库,也不是日志文件的搜集系统,也不是一个传统的消息系统。但是最近Amazon也提供了一个非常非常像Kafka的服务,叫做Kinesis。相似的地方包括分区处理的方式,数据持久化,和Kafka API里很奇怪的高层和低层次消费者的划分。我对此很高兴。AWS将它作为一种服务意味着你创建的是一种很好的基础设施抽象。他们对此的看法和我正在描述的东西极其相似:它是一种内容的管道,连接了他们所有的分布式系统–DynamoDB,RedShift,S3等–也包括使用EC2作为分布式流处理的基础。

ETL与数据仓库(Data Warehouse)的关系

让我们来聊聊数据仓库(data warehousing)。数据仓库是指一个由干净的,完整的数据结构组成的库房,用来支持分析。这是一个伟大的想法。不为人知的是,数据仓库需要定期的从各个数据源提取数据,然后改写成某种可以理解的表格,再将其加载到中央数据仓库中。拥有这种中央数据库,并在其上含有你所有数据干净的备份,对于数据集约型分析和处理来说是一笔巨大的宝贵财富。从高层次来看,这种方法,不论你使用传统数据仓库比如Oracle,Terdatal,Hadoop还是转而使用改变了加载和改写的顺序的ELT,都不会变太多。

一个含有干净的,完整的数据的数据仓库是一个巨大的财富,然后达到这一点采用的机械的方法却很落后。

中央数据组织的关键问题在于联结干净完整的数据到数据仓库。数据仓库是批处理查询基础设施的一部分,该基础设施适用于很多各种各样的报表或单个分析,尤其是当查询涉及简单的计数,聚合以及过滤。但是如果只拥有批处理系统作为干净完整数据的数据仓库,意味着这些数据无法适应系统的实时数据加载–实时数据处理,搜索索引,监控系统,等。

在我看来,ETL其实是两件事情。第一,它是一个数据萃取和清洁的过程–从本质上解放了那些锁在各种系统组织中得数据,除去了那些系统特有的,没用的信息。第二,数据的结构被重新调整成适应数据仓库查询的结构(比如调整成适应某种关系型数据库的形式,schema是星星的还是雪花的,也可能被调整成某种高效的格式等)。将这两者合二为一就是问题所在。干净的,完整的数据集应该是实时的,也就是低延迟处理的,并可以在其它实时存储系统中被索引。

我觉得这还带来了更多地好处:它使得数据仓库ETL具有更好地组织扩展性。数据仓库组的一个典型的问题就是他们负责收集和清理公司内所有其它组生成的数据。得到和付出并不成正比:数据生产者常常并不是很清楚数据仓库中数据的使用,常常不停地生成一些非常难提取,需要非常大量的,难以扩展的转化才能变成有用形式的数据。当然,负责中央数据库的组不可能很容易的管理和扩充数据库来以追随公司中其它组的脚步,所以数据的覆盖范围总是那么的参差不齐,数据流是那么的脆弱,变化是如此的慢。

一个更好的做法是拥有一个中央管道,即日志,配合一个良好设计的API,用来往里面添加数据。数据的生产者必须承担起为管道的集成提供干净,结构优良的数据来源。这意味着作为系统设计和实现的一部分,他们必须考虑要将得到的数据转化成一种良好的结构再发送到中央管道中。加入新的存储系统对数据仓库小组来说是易如反掌的事情,因为他们有一个中央集成的节点。数据仓库小组只用处理更简单的问题,即讲中央日志中的结构化的数据读出来然后转到一种他们系统需要的特有形式。

如果想要在传统数据仓库之外增加外部数据系统,那么组织的可拓展性变得尤为重要。比如说,有人想为组织内所有的数据增加可搜索的功能,或者说有人想要提供数据流亚秒级的监控,实时的趋势图和报警功能。在这些使用案例中,由传统数据仓库或者即使是Hadoop集群所构成的基础设施都显得不合时宜。更糟糕的是,为了支持数据库负载而构建的基于ETL的处理管道,对于向这些其它系统提供数据,可能排不上用场,这使得重构这些个基础设施的部分的开销大得没法承受,就像使用数据仓库一样。这种看上去的不可行性也多少解释了为什么大多数企业都没有这种能力,能很容易的访问它们所有的数据。相比之下,如果一个组织已经构建了提供统一,结构良好的数据基础,让新系统完全访问所有数据,需要的仅仅是一点点管道集成的工作了。

这种架构提供个别需要的清洁和转换数据有几种不同的方案:

  1. 可以由数据提供者来做,即在将数据写入整个公司日志之前进行处理。
  2. 可以由日志自己实时地做(将数据变成新的,转换后的日志)。
  3. 可以由数据处理终端做,可以作为其读取数据时功能的一部分。

最好的模式是由数据提供者在提交这些日志之前做这些清洁的工作。这样可以保证数据具有规范的形式,并且不保留任何为了连接它的生产者或它的存储系统而设置的特有的代码。这些细节最好由创建这些数据的人来做,毕竟他们最了解自己的数据。任何在这个阶段的逻辑应该确保是无损和可逆的。

任何形式的实时数据增加转换操作应该作为原始数据的又一个新的数据生产过程。这包括比如事件数据的会话或者增加的那些新的感兴趣的区域。原始数据依然存在,而实时处理产生并提交了携带额外数据的新日志。

最后,只有那些终端系统特殊需求的数据整合部分是应该由读取数据时来做的。这可能包括将数据转换成某种特殊的星或雪花结构,用于分析和做数据仓库的报表。在该阶段,基本上大部分的地方都已经在干净整洁,结构统一的数据流的支持之上和传统ETL处理对应上了,所以是很容易进行的。

日志文件和事件

让我们来讲讲这个架构带来的额外的好处:它提供了解耦,事件驱动的系统。

在web行业,典型的获取活动数据的方式就是讲其记录到一个文本文件中,这样它可以被用来提取并加入数据仓库或者Hadoop,然后聚合和查询。这和所有批处理ETL存在同样的问题:它伴随着数据流入数据仓库的兼容性和处理的时间计划。

在LinkedIn,我们使用以日志为中心的方式构件了我们的事件数据处理。我们使用Kafka作为中央,多订阅者的事件日志。我们定义了几百个不同的时间类型,每一种代表一种特定的行为的唯一属性。这涵盖了所有网页浏览(page views),广告展示,搜索,服务调用和应用程序异常。

要理解这样做的好处,想象一个简单地事件–显示一个发布在工作页面的工作。这个工作页面应该仅仅包含需要显示工作的逻辑。但是,在动态网站上,为了让展示工作的效果更好,很容易就加入了不属于展示工作部分的逻辑。比如说,我们需要整合如下系统:

  1. 我们需要讲数据发到Hadoop和数据仓库以提供离线处理需求。
  2. 我们需要计算查看的次数来确保访问者不是在做某些内容窃取。
  3. 我们需要统计访问量以展示在工作发布者的分析页面上。
  4. 我们需要记录访问来确保我们推荐给用户的工作可以给用户留下印象(我们不希望每次展示的都是同样地内容)。
  5. 我们的推荐系统可能需要记录下访问,才能正确地计算出一个工作的受欢迎程度。
  6. 等等。。

很快,简单地展现一个工作就变得非常的复杂。并且,我们再加一些其他用来展现工作的地方–手机客户端啥的–那这里的逻辑都需要一起拿过去,复杂程度继续增加。更糟糕的是,这些我们我们需要定义接口的系统现在变成了某种交织的形式–一个负责做显示工作功能的人需要知道很多其他系统和功能,才能确保这些功能都集成正确。这个例子仅仅是问题的一个玩具般的简单形态,真正的应用程序只会更复杂。

“事件驱动”的方式提供了一种简化这种问题的途径。工作显示页面现在仅仅显示一个工作和记录这个工作相应属性的情况,访问者和其他有用的关于这个工作的信息。每个其它感兴趣的系统–比如推荐系统,安全系统,工作发布者分析系统和数据仓库–都只是订阅这里的数据然后做它们自己的处理。展示部分的代码不需要知道其它部分,也不需要因为新加入了一个数据消费者而做改变。

构建可扩展的日志

当然,将生产者和订阅者分离并不是什么新东西。但是如果你想要让日志扮演在所有消费者扩展的网站发生的所有事情的实时的多订阅者的角色的话,那么拓展性将使一个主要的挑战。使用日志作为万能的集成机制,如果当我们需要拓展的时候,我们不能将它构建的又快,又便宜又具有足够的拓展性的话,那就没啥意思了。

搞系统的人们的一种典型的思维是把分布式日志看做一种慢,庞大的抽象(而且常常只把它当做某种Zookeeper关心的“元数据”)。但是如果真正好好想一想然后再专注的实践一下如果记录大的数据流的话,就会发现这不是事实的真相。在LinkedIn,我们每天要写到Kafka的不同的消息超过600亿个(如果算上数据中心间的镜像的话,那得有好几千亿).

为了支持这种拓展,我们使用Kafka的时候有几个技巧:

  1. 将日志分区
  2. 采用批处理写和读优化吞吐量
  3. 避免不必要的数据拷贝

为了支持水平拓展,我们将日志划分成了好几个分区:

每一个分区都是一个绝对排序的日志,但分区和分区之间并没有排序(除非你的消息中本身含有了某种时钟信息)。具体消息被发送到哪个分区是由发送者控制的,很多人选择使用某种键(比如用户的ID)来划分分区。分区使得那些没有前后关联性的日志分散到不同的片区中,并使得系统的吞吐量可以与Kafka集群的大小保持线性。

每个分区可以设置一个数字来进行冗余备份,每一个备份都是一个分区日志的独立的拷贝。在任何时候,他们中的一个扮演了领导的角色;如果领导挂了,另一个备份会变成新的领导。

在分区和分区之间没有全局的顺序多少是一个局限,但是我们不认为它是主要矛盾。事实上,与日志交互的过程常常是从成百上千的分布式的进程中来的,所以和他们谈什么全局顺序是完全没有意义的,毕竟他们自己也不考虑全局顺序。所以,我们转而只保证每一个分区内部自己是排序的,Kafka保证从一个发送者向统一分区追加消息会被按照他们发送的先后顺序进行追加。

日志,就如文件系统一样,可以很容易优化成读写线性的模式。日志可以把小的读请求和写请求合起来变成一个大得,高吞吐量的操作。Kafka一直积极的奉行这种优化。在从客户端到服务器端进行的传送数据中,批处理一直伴随在写硬盘,服务器间的冗余备份,数据到消费者的转换,和标记数据被发送的整个过程中。

最后,Kafka采用简单地二进制结构在内存日志,硬盘日志和网络数据转换之间进行管理。这使得我们可以实现很多大量的优化,包括零拷贝数据转换

在这些各种优化效果的累积下,你基本可以使用硬盘和网络所能支持的速度读和写消息,甚至能管理远远超过内存限制的数据集。

这篇文章的主要目的不是介绍Kafka,所以我们不准备对此讨论更多细节。你可以通过阅读《Building LinkedIn’s Real-time Activity Data Pipeline》来了解更多关于LinkedIn这方面的方法的细节,也可以通过阅读《Kafka Documentation关于设计的讨论章节》来更好地了解Kafka的设计概况。

第三部分:日志与实时流处理

到目前为止,我仅仅介绍了那些等同于用一种很花哨的方式将数据从一个地方拷贝到另一个地方的东西。但是在存储系统直接传输字节并不是故事的结尾。而是用它引出:“日志”是“流”的另一个名字,日志是流处理(stream processing)的心脏。

但是,等一下,流处理到底是个啥东西?

如果你是一个90年代末和21世纪初期数据库文学和半吊子的数据基础架构产品的粉丝,你可能可以将流处理与构建SQL引擎的工作或者事件驱动流程里面“盒子和箭头”组成的接口联系起来。

如果你追随过开源数据系统的井喷,你可能可以将流处理与一些这方面的系统–比如:StormAkkaS4Samza联系起来。但大多数人只是将这些视为某种异步消息处理系统,与支持集群RPC(远程过程调用)层没太大不同(事实上,这个领域的某些东西确实就是这样)。

这两种观点都有一些局限性。流处理和SQL一点关系也没有,而且也不局限在实时处理上。没有任何人规定说你不能用各种各样不同的语言来计算和对昨天或者一个月前的数据进行流处理。

我将流处理看做一种更加宽泛的东西:一种能持续数据处理的基础设施。我认为数据模型可以和MapReduce或者其他分布式处理框架那样宽泛,但是具有能在低延迟下生成结果的能力。

处理模块真正的驱动者是数据收集的方法。采用批处理方式收集的数据很自然地也是按批处理方式处理的。当数据收集是持续性,它很自然地处理也是持续性的。美国人口调查提供了一种好的例子来说明批处理。人口调查定期举行,并且采用暴力遍历的方式,让人挨家挨户的区调查。这种做法在1790年刚开始人口普查的时候还比较实用。当时的数据收集本质上就是批处理的,骑着马走遍所有的地方,把记录写在纸上。然后把这个片区的数据传达给中央,由人把所有的数据再加起来。而如今,当你描述人口普查的过程,首先想到的是为什么我们不记录每一个人的出生和死亡的时间,就可以持续的计算出人口总数,并且不论是哪种粒度上的统计。

这是一个极端的例子,但是很多数据转换过程依然依靠定期的收集,然后再一大堆的统一转换和集成。对于一大堆收集上来的东西最自然的方式只有批处理。而当这些过程被持续的输入所代替,自然的方法开始朝着持续处理迈进,这样让资源的使用更平滑,也降低了延时。

LinkedIn,比如,就基本上完全没有批数据收集。我们最主要的数据要么是活动数据要么是数据库修改,他和两者都是持续性发生的。事实上,当你想想那些做生意的,底层的机理基本都是持续处理–实时发生的事件,就如杰克.鲍尔会对我们说的那样。当数据是以批收集上来的,它往往因为是手工收集的,要么是缺乏数字化的,要么是由于某些没有数字化的过程导致的历史遗留问题。在邮递员时代,发送和接受数据常常都是非常缓慢的。自动化的第一次尝试往往携带了过去的流程,所以也常常会持续一个比较长的事件。

那些每天跑一次的批处理任务其实和设置了一天为一个窗口期的持续计算任务是一样的。底下的数据,当然,总是变化的。这在LinkedIn很常见(让他们跑在Hadoop上的机理非常诡异),我们实现了一整个框架,用来管理递增的Hadoop工作流程。

这样看来,很容易就对流处理有了新的认识:它仅仅是一个针对正在处理的带有时间戳的底层数据的处理过程,并且不要求数据的静态快照,所以它可以生成用户可以控制的输出流,而不用等待数据集处理完。在这种情形下,一个重要的概念,是流处理其实是一种将处理时间设置为实时的批处理。

那么为什么会有传统意义上的流处理系统被做成应用呢?我想最大的原因是以前缺乏实时数据收集导致持续的流处理不过是学院派的想法而已。

我想正是由于实时数据收集上的稀少,才导致了数据流处理的价值。他们的客户还依然在使用着以文件为导向,每日批处理的ETL和数据集成。公司专注于构建基于实时数据流处理引擎的流处理系统,可是在当时很少有人真正使用它。事实上,当我刚加入LinkedIn时,有一个公司卖给我们一个非常酷的流处理系统,但是由于我们的数据都是以文件的形式每个小时来收集的,我们能使用的最好的做法是每过一个小时将数据流入流处理系统!唯一的例外实际上证明了一个道理:金融领域,一个流处理已经取得了一些成功的领域,正是因为这个领域的实时流处理已经是常态,系统的瓶颈已经变成了在处理环境。

即使是在一个健康的批处理生态系统中,我认为其实流处理作为基础设施的风格也有很宽阔的空间。我觉得它弥补了基础设施中实时请求/响应服务和离线批处理操作之间的空缺。对于现代互联网公司,我认为大概有25%的情况是属于这个范畴。

事实证明,日志解决了一些在流处理中最关键的技术问题,一会儿我会说,但是最大的问题在于它恰好解决了让数据在实时的多订阅者环境下可被使用的问题。如果你对这方面更多细节感兴趣,我们有一个开源项目Samaza,这是一个运用了很多这方面的想法实实在在构建的一个流处理系统。在这篇文档中,我们讨论了更多这样的程序的更多细节。

数据流图

流处理最有意思的部分不在于流处理系统内部的构造,而在于早期关于数据集成的讨论,即我们如何扩展自己关于传输什么数据的想法。我们会讨论最主要的数据产生或者主要数据的日志–各种各样系统执行产生的事件和数据的行。但是流处理允许我们也可以将某个模块产生的数据作为另外一个模块的输入。这些派生出来的输入在数据消费者看来和它们处理的主要的数据来源没什么不同。这些派生输入可以封装任何复杂的东西。

让我们再对此多说两句。一个流处理任务,为了我们某种目的,会是一种从日志中读出来,然后写到日志或其他系统里去的东西。它们所使用的输入输出组合成一个处理过程的流程图。事实上,使用中心化日志的方式,你可以将所有组织内部的数据捕获,变形,流动看做一系列日志和那些向它们写数据的过程。

流处理器不需要多高深的框架:它可以是任何一个或多个进程,用来从日志中读和写,但是需要额外的基础设施来支持管理处理的代码。

集成中日志的目的主要有以下两点:

第一,它保证了数据集支持多个订阅者并且数据是严格排序的。回一下我们的“状态备份”原则,来重温以下排序的重要性。举个更具体的例子,考虑一个数据库的更新流–如果我们将两个更新调换顺序,并放到同样的进程中处理,我们可能产生完全错误的输出。这个顺序比起那些通过TCP提供来的东西更有保障,因为它不仅仅局限在单个点到点的链接或者建立在各种错误和重新链接的进程的基础上。

第二,日志提供了处理的缓冲区。这点是非常基本的。如果处理过程是一个异步方式,上层数据生产的速度很可能比下层数据消费的速度要快。当这种情况发生时,处理必须停止,缓存或者丢弃数据。丢弃数据显然是不可能的,暂停也会导致整个流程图的停顿。日志扮演了一个非常非常巨大的缓冲区,可以让某个进程重启,失败而不影响流程图中其他部分的处理。这种隔离手段,在多人协同开发,和大组织架构下扩展数据流时是非常重要的,我们不能运行由某个任务导致整个处理流停滞的情况出现。

Storm和Samza两者都是采用这种方式构建的,并且可以支持Kafka或其他类似系统作为他们的日志系统。

有状态的实时处理

一些实时流处理仅仅是无状态的在每次记录时做一些转换,但是很多使用的情况是一些更复杂的统计,聚合或者不同流之间的连接。一种可能,比如,想要增加一个流事件的属性(比如点击流),使用用户点击时的其他信息–将这些点击流加入用户账号的数据库中。不变的,这种类型的处理最终需要请求一些由处理器管理的状态:例如,当统计一个计数,你有一个计数的数字一直需要管理。如果一个处理器自己有可能出错挂机,你如何能保证这个状态能被管理的正确呢?

最简单的办法是把状态存在内存里。但是如果处理器宕机,就会失去他保持的中间状态。如果状态被跨区存储,处理器可以只是跳回日志中该区开始的时刻。但是,如果某一个任务是计算超过一个小时的计数,这也许是不切实际的。

另一个办法是将所有的状态存储到远程的存储系统中,然后通过网络回到本地存储。这个的问题在于没有本地数据,而且会产生有很多网络中的环。

我们如何才能支持一种类似“表”的东西,对于我们处理的问题进行分区?

重温我们关于表和日志双重性的讨论,它告诉我们的就正好是能够将流转换成共同位于我们处理过程中的表的工具,还有管理这些表中错误和误差的机制。

流处理器可以将其状态保存到本地的“表”或“索引”中–bdbleveldb,或甚至是一些不太常见的比如Lucenefastbit索引。这些存储的数据是从输入流来的(也许首先进过了一些改造)。它可以生产一个本地索引的更新日志(changelog),这个本地索引是用来当遇到宕机或重启时的状态恢复。这种机制使其对于输入流可以保持多分区状态,支持各种本地索引类型。

当某个进程挂掉,它就从更新日志(changelog)中重新读取它的索引。日志在这里扮演了一种将本地状态转换为按时间排序的递增备份记录的角色。

这种状态管理的途径有一个优雅的特点,就是处理器的状态同样也按日志的方式管理。我们可以把这种日志想象成我们将那些需要写到数据表里的变化写到日志中。事实上,处理器也有一些非常类似多分区表的东西。因为状态本身就是一个日志,其它处理器就可以订阅它。这在当处理的目标是更新最终状态且这个状态是处理的自然输出时非常有用。

当为了数据集成的需要,将从数据库中出来的日志进行结合时,日志/表的双重性的作用变的非常明显。对各种事件流的结合,更新日志可以从数据库提取并且对多种不同的流处理进行不同的索引。

我们在Samaza中给出了更多这种状态管理风格的细节,更多的实践请参阅这些例子

日志压缩

当然,我们不能奢望对任何时间的所有状态都保持一份完整的日志。除非有人要用无限的空间,所以日志有时是必须清洁处理一下的。为了更具体一些,我会在此讲一下在Kafka中我们是如何实现的。在Kafka中,日志的清洁有两个选项,取决于数据是否包含键控跟新(Kyed updates)或事件数据。对事件数据来说,Kafka只管理一个范围的数据。一般来说,被设置为几天,不过这个范围既可以设置为时间范围也可以是空间范围。对于键控数据来说,完整的日志的一个很好地特性是,你可以重放它来重新创建原系统的状态(比如可以用来在另一个系统里重现)。

但是,随着时间的推移,存储完整的日志需要耗费越来越多的空间,而且这种重放操作需要越来越久的时间。于是,在Kafka中,我们支持几种不同的存储。较之简单地丢弃陈旧的日志,我们转而删除不用的记录–比如一条主键最近被更新了的记录。在删除的同时,我们依然确保日志含有完整的数据源系统的备份,但是现在我们不能再重新创作“所有”原系统的状态,仅仅保持最近的几个状态。我们称这个功能叫做“日志压缩(Log Compaction)”。

第四部分:系统构建

最后一个话题,我们要讨论一下日志在线上数据系统的设计上扮演的角色。

这里关于日志的两种角色有一个类比,第一个角色是日志为一个分布式数据库中的数据流服务,还有一个角色是日志在一个大的组织架构下为数据集成服务。在两种情况下,它都负责数据的流动,一致性和重建。那么,如果是一个组织,但是没有非常复杂的分布式数据系统,又会如何呢?

解耦合(Unbundling)?

所以也许你觉得没啥,你可以将整个你公司系统和数据流看成一个分布式系统。你可以将所有单个的查询为导向的系统(Redis,SOLR,Hive tables等)仅仅看做你的数据的一种特殊的索引。你可以把流处理系统,比如Storm和Samza仅仅看做一个设计非常好的触发和显示的实体机制。搞传统数据库的人,我注意到,非常喜欢这种视角,因为它可以最终解释每个做各种不同数据系统的人都在干什么–那些东西不过是不同类型的索引!

不可否认今天的数据系统类型已经泛滥,但现实中复杂性确依然存在。尤其是在关系型数据库盛行的日子里,公司里有很多很多关系型数据库!所以也许在主要的也就是所有数据来到一个地方这个问题没解决之前,真正的基层并不存在。这里有很多种动机,需要将数据放到各种独立的系统中:为了扩展性,地理位置的限制,安全的考虑,以及性能隔离是最主要的几个原因。但这些问题都可以被一个好的系统解决:比如一个公司可以用一个单独的Hadoop集群来集中所有的数据并对大量各种各样的组织提供服务。

所以用一种简单地方式来把数据转移到分布式系统变得可能:将各种繁多但小型的系统实例归到一起组成大得集群。很多系统在这一点上并不那么容易:比如有的不具有安全性,或者无法保证性能隔离,或者有的无法很好地扩展。但是这些问题都是可以解决的。

我认为之所以各种系统的数目形成爆发性增长是因为构建分布式数据系统的复杂性导致的。把问题切成一个一个小的查询或者将每个系统的功能划分成一个一个子集,是为了每个部分的事情好构建。但是将这些一大堆的系统放在一起运行起来,那就太复杂了。

我认为,未来在这个问题上,有三个可以参考的角度。

第一个可能的选择是继续维持现状:采用各种独立的系统依然或多或少有它存在的原因。比如因为分布式很难攻克,或者独立的专用系统用起来更舒服或者强大。如果上述情况存在,那数据集成问题还有一个重要的核心问题就是如何很好地发挥数据的作用。在这种情况下,搞一个外部日志系统用来集成数据,是十分重要的。

第二个可能的选择是对于那些有足够一般性的单个系统,采用重新聚合的方式,将所有不同的功能放入一个超级系统中。这个超级系统表面上看起来就像关系型数据库,但是在组织内使用时,比起各种各样零散的系统,这样一个单一的系统用起来是完全不一样的。这样的话,世界上除了这个系统本身内部已经解决了的集成问题以外,再没有其他集成问题了。我认为在实际项目中,构建这样的一个复杂的系统,是不太可能的。

还有一个办法,作为工程师的我被它深深的吸引住了。一个很有趣的现象是,新一代的数据系统大多数都是开源的。开源带来了另一个可能性:数据基础设施可以分拆成服务和应用系统api的集合。比如你所看到的那些Java世界的东西:

如果你把这些东西放在一起,眯起眼看一下,你会发现分布式数据系统的构建变得像玩乐高玩具一样。你可以把这些零件组合起来创造各种各样的系统。当然这明显不是最终用户关心的主要问题,他们更关心最终的API和其实现,但是它却是一种通向构建可持续演化的可以提供各种需求的模块化的简单单一系统的道路。试想如果分布式系统的构建成本从几年变成了几个礼拜,且可靠,灵活,那么为合并成单一系统的压力自然就消失不见了。

日志在系统架构中的位置

假设我们有一个外部日志系统可以帮助每个独立系统在去除自身复杂性的同时享有可靠的共享的日志,以下事情我觉得是这个日志可以做的:

  • 负责数据一致性(不论是不是事件或零时数据),通过顺序并发更新节点。
  • 提供节点间数据的备份
  • 为数据消费者提供“提交”语义(例如只有当你确保信息没有丢失时才能发出“acknowleging”)
  • 系统提供外部数据订阅喂养功能。
  • 提供恢复节点的丢失信息已经重建新备份的能力。
  • 处理节点间的负载均衡。

这其实是一个分布式系统的重要部分之一。事实上,剩下的部分主要是关于面向客户的查询API和索引策略。这个部分系统和系统直接是非常不同的:例如,一个纯文本搜索查询也许需要查询所有的分区,但是一个根据主键的查询也许只需要差负责该健数据的一个节点即可。

以下是其工作原理。系统分为两个逻辑单元:日志层和服务层。日志按照顺序抓取状态跟新。服务层节点存储那些各种查询服务需要的索引(比如键值存储可能是诸如平衡多路树(btree),SSTable(Sorted Strings Table),而搜索系统可能采用相反的索引)。写操作,即使可能是由服务层代理完成,却也能直接写到日志层中。写日志产生了一个逻辑时间戳(比如日志中的位置)。如果这个系统是分区的,我假设它是,那么日志层和服务层的节点会有相同数量的分区,即使他们也许拥有完全不同数量的机器。

服务层节点订阅了日志,然后用最快的速度,以和日志存储中相同的顺序,将数据写入它们本地索引中。

客户端(Client)可以通过在查询中提供写数据时用的时间戳,以“读你所写”的语义从任何一个节点中读取数据–服务层节点收到该查询时,会比较目标时间戳和其索引点,如果需要,可以延时发送,直到它索引的时间达到至少该时间戳的时间,以避免提供过期的数据。

服务层的节点之间,也许或也许不需要有“主节点”或者“主节点选举”的概念。更多时候,服务层的节点可以完全不需要主节点,因为日志是所有人共同的源头。

一个分布式系统必须要做的比较棘手的问题是,重建那些挂掉的节点或者从节点上依次删除分区。一个典型的途径是让节点只维护一个固定时间窗口的数据,并且在分区之间通过快照(snapshot)的方式存储备份。日志基本上可以自身持有完整数据的拷贝并且自己处理日志的垃圾回收。这样就从服务层去除了很大一部分复杂性,因为服务层都各个不同,而到了日志里,就可以统一处理了。

通过构建这样一个日志系统,你得到了一个开发完全的订阅(subscription)API,用来访问那些存储着的本用以喂养ETL,而现如今已经转移到另外一个系统里的数据。事实上,很多系统通过提供不同的索引,可以互相分享同一个日志,如下图所示:

请注意看看这个以日志为中心的系统,是如何做到其自身是一个数据流的提供者的同时,也是一个从其它系统读数据的数据消费者的。同样地,一个流处理器能够读取多个流输入,然后通过使用其它系统索引数据后的输出结果来为他们提供服务。

我觉得这种将系统看成日志层和查询API层的想法非常发人深省,因为它使你将查询从已有的完善的系统概念中分离开。这种方法即便是对于理解那些不是用该方法构建的系统时也是大有裨益的。

Kafka和Bookeeper都是一致性日志,其实这并没有太大价值,因为这不是需求。你可以仅仅简单地将Dynamo类型的数据库当做一种一致性的AP日志和一种键值服务层。这种类型的日志有一点棘手的是,它会重复发送陈旧数据,并且依赖于数据订阅者来处理它(非常像Dynamo自己)。

备份日志(尤其是完全拷贝)被很多人视为一种浪费。可实际上,有很多因素让这个根本不成问题。首先,日志是一种可划分的高效存储机制。我们在生产环境的Kafka服务器上,每个数据中心存储了超过75TB的数据。与此同时,很多服务器需要非常多的内存来提供高效数据服务(文本搜索,比如,就常常全都放在内存里)。这种服务器常常需要优化硬件配置。比如大多数我们线上数据系统要么就加大内存,要么就是用固态硬盘。相反,日志系统仅仅只是线性读写,所以它只要有一个好几个TB的大硬盘就可以非常高兴了。最后,正如上面图片中的那样,一旦数据是由多个系统提供服务的,那日志的花费就由各个索引系统分摊。这种组合让一个外部日志系统的成本显得很廉价。

这个图正是LinkedIn构建的很多实时查询系统的模型。这些系统喂养数据(使用数据总线作为日志抽象或者来自Kafka这样的专用日志)并且提供特别的分区,索引,以及在数据流之上的查询能力。就是采用这样的方式,我们实现了我们的搜索,社交图,以及OLAP查询系统。事实上,在实际服务中,将单数据喂养(既可以是线上数据喂养也可以是从Hadoop来的派生喂养)转换成多个服务系统协调工作是很常见的。这已被证明是一种巨大的简化假设。这些系统根本不需要对外开放“写”的API,Kafka和数据库用来存储,并通过日志将变化流交给合适的查询系统。写操作由本地节点通过一个特别的分区进行处理。这些节点只用一直闭着眼睛将日志提供的喂养誊写到它们自己的存储就行。一个挂掉的节点可以通过重新读取之前的日志而被重新构建。

其中,每个不同的系统对日志的依赖程度是不一样的。一个完全依赖系统将日志用于数据分区,节点重构,重平衡,和数据一致性和传播的方方面面。在这种设置中,实际上服务层无外乎是一种“缓存”结构,使特定类型的写操作可以直接到日志中去。

结语

如果你一直看到这里,你已经知道了我所了解的关于日志的大部分知识。

这里还有几个有意思的东西或许你想要看看:

每个人好像对同一件事情会使用不同的术语,所以很难将各个不同企业,开源世界,数据库,分布式系统的同志们联系起来。但不论如何,这里有一些大方向上的指针。

学术论文,系统,演讲,以及博客:

  • 一个好的状态机 state machine原备份 primary-backup概论
  • PacificA是一个微软实现基于日志的分布式存储系统的通用框架
  • Spanner不是所有人都喜欢他们日志里的逻辑时钟。Google的新数据库采用物理时间,并通过将时间戳作为范围,对不确定时钟偏移直接进行了建模。
  • Datanomic:数据库的解构是一个来自Clojure的创造者Rich Hickey在其创业公司的数据库产品上的非常棒的演讲。
  • 回滚恢复协议的消息传递系统综述。我认为这是一篇在错误度量和用日志恢复外部数据库实践中非常有帮助的介绍。
  • Reactive Manifesto–其实我不太清楚什么叫做响应式编程,但是我觉得可能是和“事件驱动”编程差不多的东西。这个链接没有太多信心,但是这个Martin Odersky开的课程(基于Scala语言)看起来非常棒!
  • Paxos!
    • 原始论文在这里。Leslie Lamport在这儿有一段有趣的故事,这个算法其实早在80年代就已经创造出来了,但是一直到1998年才得以发表,原因是因为审稿人不喜欢论文中的希腊寓言,而Lamport又不想改变它。
    • 即使原始论文发布,但是算法还是不容易理解。Lamport又试了一遍,这次甚至包括了一些“没有意思的细节”,包括如何将这个算法应用到新一届的自动化计算机中。但它仍然没有被广泛理解。
    • Fred SchneiderButler Lampson各自给出了更多关于Paxos应用于真实系统的细节。
    • 一小撮Google的工程师在Chubby项目中总结了他们实现Paxos的经验
    • 我其实觉得所有关于Paxos的论文真的是十分难懂但是我一直坚持刻苦专研。不过现在你需要这样了,因为John Ousterhout的这个视频(关于日志结构的文件系统)让它变得非常简单易懂。不知道什么原因,这些一致性算法如果是通过将它们在讲解中画出来,比今天的呈现在纸上好理解的多。讽刺的是,这个视频当时是为了说明Paxos是如何如何的难以理解而制作出来的。
    • 使用Paxos来构建具有可扩展性和持久化的数据存储:这篇牛掰的论文讲述了使用日志来构建数据存储,作者是Jun,他是最早期Kafka的作者之一。
  • Paxos也有竞争者!事实上,这里面的每一个都更贴近日志的实现,可能更适合用在实际工作中:
  • 你可以看到日志在不同的真实的分布式数据库中起到的作用。
    • PNUTS是一个尝试用日志为中心的思想设计传统大型可扩展分布式数据库的尝试。
    • HBaseBigtable也都是日志在现代数据库中使用的例子。
    • LinkedIn自己的一个分布式数据库Espresso,就像PNUTs,使用日志作为备份,但有一点不同的是,使用底层表自己作为日志的数据源。
  • 如果你需要比较几个备份算法,那么这篇论文可以帮到你。
  • 备份:理论和实践是一本非常棒的书,汇总了很多关于分布式系统总结的文章。很多章节都可以在网上找到(比如:第145678章)。
  • 流处理。这个东西涵盖了太多的内容很难总结它,但下面这些是一些我喜欢的东西。

有一些不同名字的解决同样问题的企业软件,他们有更小的可扩展性。。还有XML。。哈哈,只是玩笑,嗯,算是吧。

有意思的开源项目:

  • Kafka是一个“日志作为服务”的项目,也是这篇文章大部分内容的基础。
  • BookeeperHedwig也是另外的两个“日志作为服务”的项目。他们似乎更针对数据系统内部,然后才是事件数据。
  • Databus是一个为数据库表提供了一个类似日志层的系统。
  • Akka是一个Scala语言的著名框架。它有一个插件,叫做eventsourced,提供了持久化和日志记录。
  • Samza是一个在LinkedIn我们正在搞的流处理框架。它使用了这篇文章很多的观点,并且很好地与Kafka集成,将其作为底层日志。
  • Storm是一个著名的与Kafka集成得很好的流处理框架。
  • Spark Streaming是一个流处理框架,是Spark的一部分。
  • Summingbird是一个在Storm或Hadoop之上的层,提供了一个容易使用的计算抽象层。

我努力在这个领域跟上时代,所以你如果知道什么新鲜的东西,请告诉我。

让我给你留一个消息:

Written on April 11, 2015