Design Data Intensive Applications读书笔记

系统设计读书笔记。祝我好运。

Chapter 1: Relable, Scalable and Maintainable

Reliability

系统能持续正常工作,即便遇到hardware/software faults甚至human errors.
fault: 系统中的一个component deviated from its spec.
failure: system as a whole stops providing required service.
由于出现fault的几率不可能为0,因此倾向于设计fault-tolerant而不是fault-preventing的系统。

Hardware Faults

  • 计算机系统中的硬盘、内存、power grid等等零件都可能出问题,这可以通过redundancy来减少整个系统宕机的概率。
  • 随着数据量增加,增加备份也无法解决,就需要考虑如何在某个机器宕机的情况下通过软件调度来防止整个服务崩溃。

Software Faults

  • 软件bug通常隐藏很深,直到某种特殊情况出现触发了bug。
  • 没有快速的解决办法,只能在实现的时候多考虑assumptions和interactions、写更全面的测试、process isolation、crash and auto-restart、持续measure/monitor/anlayze behaviors in production.

Human Errors

  • 用户总是不可靠的,operator的不当操作占系统崩溃的75%~90%。
  • 在设计时就要多考虑人为因素,让他们自然地do the right thing, minimize opportunities for error; 提供sandbox让他们熟悉使用方法,随意犯错而不会影响production; 全面测试unit/integration/manual; 假设有human error提供快速recovery的机制; 设置detailed and clear monitoring.

Scalability

系统不见得可以一致reliable,例如随着数据量暴增,系统需要cope with increased load.

如何衡量Load

  • 需要根据系统特性很简洁地描述。服务器的RPS? 数据库的读写比例? 用户同时在线数? 缓存的hit rate?
  • Twitter例子
    • 需求:实现用户发推(max 12k RPS)和看个人timeline(300k RPS).
    • 挑战:如何快速为用户提供timeline。直接从数据库读其实也就是join,但是请求多的话DB无法handle,因此可以考虑为每个用户维护timeline的cache,只是发推的时候就需要更新对应用户的cache了。由于发推的RPS远小于读,cache的方法对读更有优势,但是对于follower众多的大号,发推需要更新的cache可能非常多,因此可以考虑针对这种大号直接写入DB,在timeline单独从DB读大号的推。

如何衡量Performance

  • 两个问题:维持当前系统软硬件不变,load可以提高到多少?为了提供某种load和相应的performance,需要增加多少resources?
  • Throughput: 单位时间内处理的records数量,或处理一定规模数据所需要的时间;
  • Response time: 网络延迟 + queuing + service time; 虽然用起来基本是同义词,但Latency特制其中的awaiting time.
  • Percentile: 响应时间如果直接数学平均,无法反映大致有多少请求受到影响。p95, p99, p999表示有95%, 99%, 99.9%的请求的响应时间快于某个值。
  • Tail latencies: outliers的响应时间有时也很重要,对于亚麻,大客户请求的数据量大,响应时间也就更长,不能不关注。
  • SLO/SLA: Service Level Objective/Agreements, 用percentile来判断系统是否up。
  • Load generating: 一定要independent of response time才能模拟真实情况,不能等到收到响应才轰下一个请求。

Maintainability

软件开发本身开销不算最大,日后的维护、升级、迁移是大头。

Operability

保证operation组能让系统保持运行.

  • 高质量的monitoring
  • 适当的logging来track down出问题(结果不对或者速度太慢)的根源
  • 保证系统能升级,尤其是security patches.
  • 正确的configuration
  • 开发时遵循best practice

Simplicity

保证代码简介,让新加入的码农也能理解代码。一个很有效的方法是abstraction,用high-level的类来描述统一的行为。

  • 防止explosion of the state space
  • 让系统中的modules尽量decouple,减少dependencies
  • 保证一致的naming
  • 减少短视的hacks

Evolvability

方便后续rafactor、加入新feature,与前面的simplicity息息相关。

  • Agile working patterns比较方便change
  • TDD: test-driven development

Chapter 2: Data Models and Query Languages

分层式系统中,每一层都隐藏下层的complexity,向上层提供clean data model。

Relational Model v.s. Document Model

  • relational有严格的schema,且方便处理transactional process.
  • NoSQL(Not only SQL)对于大数据、write heavy的场景更具有scalability,且可定制query
    • schema flexibility
    • better performance due to locality
    • closer to the data structure in the application code level
  • Object-Relational Mismatch: 在代码中和在数据库中的model并不是一致的,尽管有ORM帮忙转换,但本质上还是无法保证一一对应。例如领英中一个人物毕业的学校、就职的公司通常都会有另外的table来存,再通过各种join来取,而不是直接存在用户table里。
  • Normalization: 在relationalDB中通常用ID来定位records,是因为它只在数据库中有意义,而对人类有意义的内容存在table中,防止了存储重复信息到每一行中。此外用ID而不是plain-text还可以防止ambiguity,提高update的容易程度,更好地localization,更方便搜索等。
  • Many-to-One: Normalization需要有多个entity归属于同一个category的情况才值得normalize,否则就还是像document一样save nested records within their parent record.

SQL/NoSQL的优势劣势

  • 若系统中的数据本身就有类似document的一对多结构,比如a tree of one-to-many && the tree is loaded at once,或者records之间没有关联,那么用Document DB就好,relational需要shredding,即把数据打碎存在不同的table中。
  • 若数据是interconnected, many-to-many,那还是relational自带的join比较方便管理数据之间的关系,强行用document还需要自己模拟join的代码(多次query)
  • Schema flexibility: 准确地说是schema-on-read(结构是implicit的,只有读取的时候才会interpret);而relational则是schema-on-write(explicit structure, 数据库会确保存入时符合格式)。当需要改变data format的时候,schema-less只要在代码层面加上针对新、旧数据读取的判断代码即可,而relational就需要migration进行ALTER(通常几ms就完成,除了mysql会复制所有数据)。此外,如果存储对象中有很多种类的数据,或者数据的结构时收外部系统控制的,这时规定schema的话hurt more than it helps.
  • Data Locality: document存储通常会直接读取整个block,如果确实需要这么多数据(如渲染网页)那么就有locality的优势,relational需要在各个table间收集再返回整个block(虽然Google’s Spanner DB也可以一次读一个block)。但document的缺陷是写入时需要对整个block重写,因此要尽量避免增加row size,在不改变encoded size of a document的情况下修改可以in-place完成。
  • Convergence: relational DB逐渐支持存储JSON blob,document DB也逐渐支持join操作,两者正在融合各自的优势,代码层面就可以根据应用场景选择代码实现。

Query Language for Data

  • Imperative: 按照一定顺序执行给定操作,如代码中的for-loop。
  • Declarative: SQL只给定所需data的pattern(取的条件和返回的顺序等)。更简洁且隐藏了底层的优化,且支持并行计算。
  • CSS selector就是个declarative例子,不需要javascript循环查找特定的元素。
  • Map-Reduce Query: map和reduce两个纯粹的function实现query,介于imperative和declarative之间(需要写函数,支持并行)

Graph-Like Data Models

  • 如果many-to-many关系很强,需要考虑用graph存放数据。
  • 图由vertices和edges组成。vertices并不一定是相同类型的东西,例如fb中每个节点可以是用户/评论/登陆等事件。
  • 若用relational DB表示图,需要两个表:一个存放vertices(含id和属性键值对),一个存放边(含id、始终点、label、属性键值对)。
  • 任何vertex都可以连接到任何vertex;给定任何vertex,可以知道输入/输出的边,从而遍历整个图(双向);合理使用label可以将大量信息存在一个图中。
  • Neo4j目前最流行,index free graph DB.

Chapter 3: Storage and Retrieval

虽然我们并不会实现数据存储引擎,但有必要了解底层具体实现,以选择更适合应用(如analytics为主还是transactional workload为主)的引擎。
Access pattern? Read heavy, write heavy, read/write ratio? Read from id/name? Write key uniqueness?

Data Structures in Relational DB

主流存储引擎分为log-structured和page-oriented。log-strutured底层实现其实就是log(append-only sequence of records),涉及concurrency control, reclaiming disk space, hanling errors, etc. log的特点是写入很快,但查找如果不加优化就是O(N)的,因此需要开发者根据query的pattern加上合适的index提升查找速度。

Hash Indexes

  • 假设存储的是key-value型的数据,在内存中维护一个hashmap,每次写入数据时往hashmap里放key和对应在log文件中的byte offset方便查找即可。注意这个log只能append,因此每次都无脑地往hashmap里更新key-offset即可。
  • 为了防止log过大,可以用segment的方法拆分到不同file中,再定时执行compaction只保留最末的key并merge。
  • 删除key则需要一个特殊的标识tombstone,这样在后续的merge中对应的key就会被删。
  • crash recovery: 内存一崩就全没,因此可以dump到硬盘上,崩后重新读入内存。由于只允许append,因此不需要检测更新到一半崩溃的问题。
  • partially written data: Bitcask files含有checksum保证文件准确完整。
  • concurrency control: 既然只允许append,就只允许一个writer就好了。
  • 不足:a) index必须全部容纳到内存中,因此key的数量有限。 b) Range query并不高效,需要现在hashmap中作一个range查找再去file中读取。
  • multi-field index: Postrgres支持R-tree算法,利用rectangular来定位索引。

Sorted String Tables(SSTables) and Log-Structured Merge-Trees

  • 将log中的key排序存放,在merge segment的时候就用mergesort即可快速合并,如果在多个segment中出现相同的key则保留最晚被修改的segment即可。
  • 内存中hashmap只需要存一部分key的offset即可,快速定位到前后两个key之后查找即可。
  • 在disk上维持sorted structure是可行的,如B-tree,但更方便的还是用in-memory的red-black tree或者AVL Tree,之后再写入logfile。
    1. 每次写入都存入in-memory balanced tree structure(memtable).
    2. 到达一定阈值后存入disk,这就是most recent segment。
    3. 读取时先尝试从in-memory读,没有再按照时间顺序从最近到最古老的logfile中查找。
    4. 时不时进行merging and compaction来取出重复的/删除了的key。
    5. 为了防止in-memory的宕机丢失,可以纯append到一个单独的file当中,不必维持顺序,宕机后重新读取再排序即可。

B-Trees(最流行的index方式。)

  • 也需要维护排好序的key,不是将数据存到segment中,而是将数据拆分成固定大小(通常4KB左右)的disk blocks/pages,每个block通过地址来定位。每个block中存放key-value/addr,有点类似指针的概念,但不是内存而是disk。
  • 查找是根据key,在某个范围内的需要跳转到某个地址查找,这样一路查找到leaf page就可以得到value或者存放的地址。branching factor通常是几百。
  • 增加则是到leaf page直接加,如果该page剩余空间不足,则将它拆分成两个half full page,插入后更新parent page,加入一个key指向拆分出来的新page的头部。这一系列操作一旦崩了,就会丢失数据,因此需要额外的write-ahead log,一个append-only的存放所有修改操作的文件,方便恢复数据。
  • concurrency control: 需要latches(轻量级锁)来保证数据一致。

B-Tree v.s. LSM-Trees

  • LSM写入更快,B-Tree写入时由于WAL的存在需要写两次,如果涉及page split那就更复杂了。Log-structure由于merging和compaction也需要多次写数据,但这个write-amplification还是更快,因为是sequential的在disk上优势更大。
  • LSM产生的文件更小,B-Tree可能在page末尾有没用上的空间,LSM一波compaction能省很多空间。
  • LSM的compaction过程可能影响正常的读写速度,毕竟disk资源有限。
  • LSM的compaction可能跟不上正常写入的速度,这样disk会最终被占满,而读取也会变慢。
  • LSM的key存在多个地方,B-tree的一个优势是每个key只会在index中出现一次,这样可以保证强transaction.

Other Index Structures

  • 除了key-value这种primary index,很多时候还需要加secondary index,即允许重复的key对应不同的value。实现方式可以是将value存成list,或者将key加上id使其unique.
    • global secondary index: write慢,因为需要存入各个partition;read就很快,因为明确知道存在哪个partition。
    • local secondary index: write会很快,因为只需要写入一个partition,但read就慢,因为需要fan-out找所有partition。
  • Heap file index: key指向一个heapfile,值统一存在heap中,每个index就指向heap中的一个位置即可。
  • clustered index: 将所有数据直接存储在index之内,而不只是一个reference。这样可以减少在heapfile中查找的时间。
  • covering index: 二者的结合,部分indexes cover了数据,其他的则还是需要到实际file中查找。
  • multi-column indexes: 同时查找多个keys。例如concatenated index就是将多个index拼接起来如(a-b-c),但无法高效查找b/c/b-c. 当然也可以将多维的转换为一维的再使用B-Tree,另外是更高级的multi-dimentional indexes如R-Tree.
  • full-text search and fuzzy indexes: 允许模糊搜索,不用完全和key吻合(拼错)或允许同义词。例如Lucene就允许搜索一定edit distance之内的词。
  • in-memory DB: Memcache本意是用来缓存的,但也可以作为DB,只需要提升durability通过在disk写changelog,或时不时往disk写snapshot,或replicate到其他机器。需要指出的是in-memory速度快主要不是来自于从内存中读数据,而是因为它避免了将数据encode成方便存入disk的结构。例如Redis可以存priorityQueue和set等。in-memoryDB可容纳数据的大小也可以超过物理内存,因为可以swap(系统的swap是以memory page为单位,这里的swap是以每一行record为单位)。
  • NVM: non-volatile memory,不需要swap也可以在内存里长久地存数据了。

Transaction Processing or Analytics?

transaction意思是a group of reads and writes that form a logical unit. 它并不需要保证ACID,只需要提供低延迟的读写。
| Property | Transaction processing systems(OLTP) | Analytic systems(OLAP) |
| :——-: | :——-: | :———-: |
|读|每次按照key来query少量数据|aggregate大量数据|
|写|random-access, 来自用户的低延迟写|Bulk import(ETL)或事件流|
|主要用于|用户通过web访问|内部数据分析,支持决策|
|数据表示的是|数据当前最新的状态|history of events over time|
|Dataset size|Gigabytes to Terabytes|Terabytes to Petabytes|

Data Warehousing

  • 如果直接在OLTP数据库上进行数据分析,很可能会影响正常执行的来自用户的query。因此需要利用Extract-Transform-Load将数据转入独立的Data Warehouse后再做分析。
  • indexing对于OLTP数据库有很大帮助,但对于analytics不是很好。
  • DB vendors要么专注transaction processing,要么专注analytics workloads.
  • star schema: 将各个独立的table之间的关系visualize了。dim_table就是各个独立的表(who, where, when ,how, why),fact_table就集合了所有dim表的column,显示地表示数据之间的关系。

Column-Oriented Storage

  • 通常fact表的列多大100+,行数达到trillions,因此在fact表上做query需要优化。
  • analytics中每次select用不到那么多列,通常只需要4~5列。
  • 通常OLTP DB都是row-oriented,在不需要用到所有列的情况下还是会load很多无用的属性进内存,然后再根据WHERE筛选。因此可以存成column-oriented,这样可以减少很多不必要的load.
  • Column Compression: 每个column中所有的值可能有很多重复的,因此可以利用bitmap进行encoding,减少每一列需要存储的数据量。
  • vectorized processing: bandwidth瓶颈除了从disk载入内存外,还有内存到CPU cache. 为了充分利用CPU的cache,需要尽量让数据能fit到L1 cache里面。
  • 写入column-oriented的DB不是很直接,例如B-Tree涉及compression就很可能需要将所有column files都重写一遍了。但LSM可以先存入内存,然后才写入diskfile,之后才会merge,这正是Vertica底层实现。
  • Wide columne Database不是column oriented storage. 比如Cassandra和HBase是wide column DB但他们不是column oriented,因为他们同一个column family是一起读取的。真正的column oriented会是每个column一个file。
  • parquet是一个column oriented非常高效的一种file format,兼容hadoop、spark计算框架,和查询引擎hive。

Sort Order in Column Storage

  • 根据query pattern选择对某些列进行排序。每次需要一整行地进行排序才能保证同一行的来自同一个entity。
  • sorted的好处是bitmap压缩时可以节省bit,因为可以保证相同的value排序后是相邻的。
  • 在设置replica时可以将不同的sorted order存入不同的replica中,按需选择即可。

Materialize Aggregation

  • data warehouse中很可能会反复调用相同的count/sum/avg/min/max,因此可以将这些常用的query缓存起来。
  • materialized view: 将常用结果存入独立的表中。OLTP中不常用,因为write-heavy,每次写都需要更新这些缓存的数据,速度慢。
  • data cube: 额外维护一些cell,每个cell存放特定column组合的aggregate数据,在这些cell上进行aggregate就快多了。

总结与比较

  • Analytics DB: column oriented,主要是data analysis用,如Presto, Hive, Parquet格式
  • Transactional DB: row oriented.
    • B-tree: performance稳定,transaction handle比较好,read比较fragmented。如MySQL、PostgreSQL、MongoDB。
    • Log Structured Storage: sequential write效率高,但tune得不好可能由于compaction影响性能甚至不predictable。如Cassandra、HBase。

Chapter 4: Encoding and Evolution

  • 系统总是在变化的,因此DB也难免需要更改schema。对于schema-on-write,DB有唯一指定的schema,可以通过migration(ALTER)实现;schema-on-read则涉及对代码的升级。
  • 对于server-side应用,可以用rolling upgrade,将新代码部署到一个node上再逐步推广,防止全部down。对于client-side应用,就at the mercy of the user了,只能等客户自己更新。
  • 由于可能同时存在新旧版本的代码/数据,需要注意backward compatibility(新代码可以读旧代码写入的数据)和forward compatibility(旧代码也能读新代码写入的数据)。

Formats for Encoding Data

Encoding/Decoding指的是将数据在内存结构(objects/structs/lists/arrays/hashtables/trees)和传输存储结构(self-contained sequence of bytes)相互转换。

Language-Specific Formats

部分编程语言自带将in-memory object转成byte sequence的库,使用倒是非常方便,但弊端也很多,因此尽可能避免使用。

  • 生成出来的byte seq通常和语言绑定,无法简单地在另一个语言上decode。
  • decode的时候需要instantiate各种类,攻击者可能利用这个随机decode并执行随机代码。
  • versioning data也是问题,通常不支持compatibility.
  • 效率不高,如CPU处理时间、生成的byte seq占据空间等。

JSON, XML and Binary Variants

  • 通常他们已经是不错的选择了,虽然仍有一些缺陷需要指出:
    • encoding for numbers比较模糊,例如XML和CSV不区分数字和字符串,JSON不区分整数、小数,对于大数字支持不佳等。
    • JSON和XML支持unicode,但是不支持binary strings,往往需要转成Base64存入,这样占空间多了33%
    • XML和JSON可能需要在代码层面写死schema才能正确地读写。
    • CSV没有schema,但本身比较模糊,对于escape有正式的规定但很多库没有正确实现。
  • Binary Encoding: JSON比XML更简洁,但还是占用较多空间。JSON的binary encoding “MessagePack”利用byte头部规定后续的种类、长度,从而将JSON转换成binary encoding.
  • Thrift:Facebook发明的encode/decode库,定义schema,其中包含了tag、数据类型。
    • BinaryProtocol:和MessagePack相比,无需在binary中重复存tag,只有tagNumber和数据类型。
    • CompactProtocol: 将数据类型和tagNumber压缩到一起
  • Protocol Buffer: Google发明的encode/decode库,定义schema,其中包含了tag、数据类型。和Thrift CompactProtocol类似,将fieldType和tagNumber融合。
  • Field Tags Evolutions: 可以任意往schema中添加新的field,只是不能规定是required,因为旧代码也需要读懂新的schema。而只要有不重复的tagNumber,新代码也可以读懂旧代码写的数据。如果想删除field,则只能删除optional的,且不能再使用被删掉的tagNumber.
  • Datatypes Evolutions: 一定程度是允许的,但需要注意32-bit和64-bit只能是单向的转换,不能降低精度。
  • Avro: Apache发明的encode/decode库,定义schema,其中没有tagNumber,直接存域名、数据类型。encode后的binary直接是长度+内容的形式拼接而成,需要读写双方都按照约定的schema严格操作。
    • 读写双方的schema不用完全一致,只需要compatible即可。
    • schema evolution也是可行的,只是需要注意前后兼容性,只能添加或删除带有默认值的field。注意null在Avro中只有当它在union branch中才是合法的默认值。
    • reader如何知道writer的schema呢?毕竟在每一个record都附带schema很浪费空间。这就需要直冲Avro的应用场景了:(1) 大文件,含有大量records,就只需要一份schema; (2) 含有独立写入record的数据库,维护不同version的schema,按需读取; (3) 通过网络发送数据时可以在建立连接时就确定schema,之后的数据就按照该schema解读。
    • Dynamiccaly generated schemas: Avro对频繁更新的schema更友好,因为每次改变后只需要从DB export成Avro schema即可。而Thrift/ProtoBuf则需要手动改变DB -> field tags的映射。
    • Code generation: 静态类型语言如Java/Cpp/C#都需要在读取数据时指明类型,但JS/Ruby/Python就不用。Thrift和ProtoBuf依赖于code generation,而Avro可以在不生成代码的情况下正常进行analyze.

Modes of Dataflow

Dataflow Through Databases
  • 可以理解为sending a message to a future self,先写入,之后读出来,因此backward compatible很重要。另一方面如果rolling upgrade,新的代码写入而旧的代码需要读取,则需要考虑forward compatible.
  • 数据可能在DB中存放很久,若更新schema,理论上可以全部重新写一遍,但更普遍的做法是在读的时候填充以前不存在的field,这样就变相允许schema evolution了。
  • Archival存储通常就直接用最新的schema dump到数据库,这样就包含了各种不同历史时期写入的数据了,用Avro进行分析比较合适。
Dataflow Through Services
  • 最常见的就是server-client结构了,通过API call进行数据交换。server本身也可以是client,这样就把一个复杂的系统decompose成独立的子系统,每个子系统可以独立维护、改变,这就是SOA/微服务架构。
  • REST: 是一种phylosophy, 思想是用url来辨认资源,并用HTTP的特性实现缓存管理、身份验证、内容类型交换等,efficiency较低,主要针对public。
  • SOAP: 基于XML的一种规范,尽可能不依赖HTTP的特性而使用自身的WS-*特性。
  • RPC: remote procedure call本意是想让请求远程服务和调用本地函数一样方便, binary encoding使得它efficiency较高,主要用于internal的。但缺陷很明显:unpredictable, waste of resource for encoding
    • 本地函数调用要么成功要么失败,远程调用无法控制,因为牵扯网络问题,可能需要重发
    • 本地函数调用要么返回结果、抛异常、或死循环毫无回应;而远程调用可能timeout无法回复,也可能request没发成功,无法知道原因
    • 无脑重发请求也不科学,可能请求已经执行成功而回复丢失,并不需要重发
    • 本地函数执行时间基本恒定,而远程调用牵扯的因素就多多了
    • 本地调用时参数可以直接用索引、指针传,发送远程就要encode,复杂很多,占用空间也大
    • 远程函数的实现可能是不同的语言,需要将参数翻译成对应的type才能执行
  • 进化版RPC: 承认远程调用与本地调用的区别,例如引入Futures/Promoises来实现asynchronized调用,可以并行调用多个远程服务。
Message-Passing Dataflow
  • 异步传送消息到一个中间的message broker/message queue中,由接收方从中读取,注意是单向的,要想回复就得走另一个中间件。这样比直接用RPC发送有以下好处
    • 中间件有buffer作用,防止接收方爆炸
    • 自动重发,如果监测到crash
    • 发送者不需要知道接受者的ip/端口
    • 同一个消息可以同时发给多个接收方
    • 解耦合:发送方只负责发,不需要关心谁consume.
  • Message Broker: 一个进程发送到一个named queue/topic,broker确保这些信息最终会递送到consumers/subscribers处。如果需要,接收方可以chain一个另外的queue/topic进行回复。在queue中的通常就是byte和metadata,不是特定的data model.
  • Distributed Actors Framework: actor model用于单一进程的concurrency控制,逻辑不在thread level而是封装到了actor中。一个actor就是一个client/entity,通过发送消息和其他actors互动。若将服务分布到多个node上就形成了分布式actor,如果需要跨node就需要encode成byte sequence通过网络发送。这时候如果要保证前后兼容性和rolling upgrade,可能需要对框架做一些customize,例如替换掉内置的encode库。

Chapter 5: Replication

为了给用户提供地理上更近的dc、保证部分dc宕机后仍能工作、增加读吞吐量,我们需要将数据复制多几分存入不同的node中,难点在于如何更新这些replica。replication vs isolation: replication是leader告诉follower做什么就什么,isolation/concurrency_control是多个request到同一个节点需要决定采用谁作为winner。

Leader and Follower

  • leader负责存write(当然也可以read),follower可以read。每次有update时由leader推送给followers
  • 通常可以设置是synchronous还是async,通常follower都是async的不然delay太大。设置成sync实际指的是一个follower是sync其余的全部是async。
  • SQL(PostgreSQL, MySQL), NoSQL(MongoDB, Expresso), distributed message brokers(Kafka, RabbitMQ)都采用这种模式
Add Followers
  1. 在某一时刻(traffic少时)take a snapshot。尽可能不lock写操作以防损害availability
  2. 将snapshot从leader复制到follower
  3. follower向leader请求从snapshot到当前的所有更新,需要提供取snapshot的时刻(Postgres叫log sequence number)
  4. follower同步完后就caught up了
Follower Outages (Catch-up Recovery)
  • 每个节点都会维护一个log,因此可以知道从什么时刻开始损失data,起来后找leader要损失的data即可
  • statement: 最compact的方式,但可能为了一个很小的更新跑一个开销很大的query。
  • Write-ahead log: 本身是用于durablilty的(B-tree write时会写log,crash-recovery用的),格式是byte sequence,需要node内部的实现、版本是相兼容的。Postgres/oracle用这种。
  • Logical log: 专门用于replication的,描述row-level write的,有先后顺序,兼容性好。MySQL用这种。
  • Trigger based: overhead较高
Leader Outages (Failover)
  1. 确认leader挂了:通过headtbeat看看能否timeout之内收到响应。但这个timeout时间需要tune
  2. 找新的leader:通常取拥有最新data的。损失掉的data无法挽回,可能会导致corrupted data(例如self-increment id重复使用造成数据泄漏)
  3. 配置剩余node让他们承认新leader,包括挂掉的leader起来后也需要知道自己成了follower。当系统同时出现两个leader就很糟糕了。
Replication Lag
  • 当数据更新不及时,follower的数据和leader不一致。eventual consistency并不在意多久会同步。
  • Read-after-write consistency: 读必须在写之后,通过只从leader读用户可能修改过的data、在多少多少秒之内从leader读、记录最后一次写的timestamp等
  • Monotonic reads: 不保证strong consistency,但保证用户连续多次读取的结果是按照时间先后顺序的(因果性casuality),不会read back of the time,例如先看见一条评论刷新一下就不见了。可以通过route to the same follower实现。
  • Consistent prefix reads: 一系列write发生时,任何后续的读取都会按照写入顺序读到结果,尽管不一定是最新的。可以通过route to the same leader实现。

Multi-Leader Replication

  • 由于有多个leader,意味着write可能出现conflict,如先后修改某个record为不同值、或预定系统中出现多个leader同时占用某个资源。
  • transaction为了防止double spending,为了避免出现写conflict造成$$问题,通常只允许一个datacenter,只有一个leader,这样就不会出现两次交易都ok,但covergent的状态是不合理的(如账面为负数了)。
  • detect conflict: 关键在于apply update的顺序,用timestamp不够可靠(每个机器自身的时间不准确)、version vectors也不够靠谱,因此尽量避免使用multi-leader replication.
  • CRDT(conflict-free replicated data type): 接受写入的所有东西,读的时候把所有存下来的内容都返回,是client的responsbility来确定最终采用什么数据,有点像unordered hashmap。由于没有data loss所以没有conflict,但client的逻辑会非常复杂。
  • Event Sourcing: 不存snapshot,存full history,按需生成snapshot.
  • Mergeable persistent data structures: 显示记录history,用类似git版本管理的three-way merge function。
  • Operational Transformation: 用于并发编辑ordered list的,google doc用的这种。

Leaderless Replication(Dynamo Style)

  • 没有所谓的leader,所有node都接受read和write,但read和write会分发给每一个节点确保一致,通过version确定最新版数据。Dynamo, Cassandra都是这种。
  • Read repair: 在read的时候可能发现一部分replica数据过时了,这时会触发write进行更新
  • Anti-entropy process: 在后台定时比较replica的数据,如有需要就复制
  • 不能保证transaction,是一个best effort的DB。
Quorum
  • 写的时候定义n个节点里 >= w个节点成功才是valid的、读的时候定义 >= r个节点成功才是valid的,这里的w和r通常是ceil(n+1 / 2)保证至少有一个节点是最新数据。w + r > n时就能tolerate一些down的节点,同时数据至少一个是最新的。
  • request都是同时发出的,因此当写收到的response >= w或读收到的 >= r就可以直接返回了。
  • 实际使用中有很多edge cases无法保证返回的是最新数据,因此只能保证eventual consistency
  • 由于在leader-less中写的顺序并不是保证按照发生顺序的,因此就operability来说只能估计数据有多stale。
  • Sloppy Quorum可以理解为不那么严格执行的restriction,当没有w个好的节点时能够暂时写入不在n个节点中的其他节点,之后在hinted handoff时才会写回之前fail的节点。
Detecting Concurrent Writes
  • 虽然leaderless说是可以保证eventual consistency,他们自动同步非常不靠谱
  • Dynamo将conflict resolution甩锅给用户自己定义/实现。

Chapter 6: Partition/Sharding (driven by scalability)

将数据拆分开分布在不同的分区里,每个数据只能属于一个分区,加强scalability和fault-tolerence,通常结合replication使用,这样数据虽然只在一个分区里,但在不同节点都有备份。

Partitioning of Key-Value Data

分区不希望skew造成一个分区特别臃肿(hotspot),也不能随机分不然查找很慢。

By Key Range (Bigtable, HBase)

  • 类似于字典里排序,将key按照一定的range划分存储,需要有pre-knowledge。这个range可能是admin定义、也可能是DB自动选择并更新
  • 在每个partition内部key也是排好序的方便查找
  • 依然有hotspot,例如连续读取的传感器数据,按照timestamp分区的话就是在最新的分区疯狂write/read,可以结合别的key进一步分区(先按照传感器id、再按时间),不过查找时就需要range search了。

By Hash of Key (Cassandra, MongoDB)

  • 好的hash可以对于skew的input生成均匀分布的output
  • 语言内置的hash不能用如java、ruby,因为不同的process可能hash结果不同。
  • 正是因为hash出来结果分布均匀,进行range query时很低效,因为原本相邻的key都打散了,因此Riak之类的都不支持primary key的range query。
  • Cassandra对compound primary key的第一个column做hash划分、再对剩余column做Key Range划分。

Relieve Hot Spots

  • Hash只能缓解skew但没法避免,因为只要是相同的id就一定会hash成相同的key,对于名人推特的爆炸评论可能就会同时写到一个partition。
  • 可以在写时加个随机数(两位数以内就能有100种可能),在读的时候就需要从这100个里面都读
  • 需要bookkeeper分辨哪些id需要hash时加随机数。

Partitioning Secondary Indexes

index不能uniquely identify a record,但可以高效查找某个值的occurence。89

By Document (Local Index)

  • 每个partition完全独立,维护的secondary index也只会指向自己持有的record
  • 写入新纪录的时候只涉及当前id所在的partition。
  • 读(根据index进行filter)的时候需要到所有的partition读取所有符合条件的record,这个过程叫scatter/gather。
  • MongoDB, Riak, Cassandra, ElasticSearch用的这个

By Term (Global Index)

  • 每个index包含了所有符合条件的record id,无论是否在当前的partition上。
  • index按照term或hash of term进行partition,这个index不会出现在别的partition里。
  • 读很高效,根据term取对应的partition找到所有id,再到对应各个partition取record即可。
  • 写比较复杂,因为一条record里的各个index可能分布在不同的partition里,需要distributed transaction across all partitions,通常是async的。
  • DynamoDB用的这种。

Rebalancing

当node的硬件需要升级或者故障需要替换,就需要migrate现有的partition。需要尽量保证各个partition的分布均匀,同时随着node数量继续增加不会出现不必要的migrate。

Fixed number of partitions

  • 将每个node的partition切分得较小,这样当新的node加进来就只需要从每个node弄一小部分出来,每一个partition可以理解为一个独立的db系统比如mysql,因此切割过细会有很大overhead。
  • 注意变的是assignment of partition to the nodes;key->partition和partition数量都不变。
  • partition数量不变,决定每个partition的size非常关键,需要对数据/traffic有足够的认识和估计,不适用于data会疯狂增长的数据(否则重新split非常麻烦)。
  • Riak, Elasticsearch, Couchbase, Voldemort用这种.

Dynamic partitioning

  • key range DB要是没有设置好boundary,很可能在fixed num里面把所有数据都存在一个partition里面了。
  • 每个partition的volume会根据size of the dataset决定是否merge或者split。
  • 适用于key range DB(HBase)和hash key(MongoDB).

Partition proportional to nodes

  • 每个node内的partition数量是固定的,当加入新的node时也就顺势增加partition了。
  • unfair split: 加入node的时候会随机选现有的partition进行split然后存入新加入的node,这个过程可能会不均衡,随着时间会慢慢重新均衡。
  • Cassandra和Ketama用的这个。

Routing Requests

随着rabalance,每个partition存放的node都一直在变,需要知道怎么找到partition所处的node才能取数据。有以下几种方法:

  • knowledge在每个node(传话筒):client随机连接节点(RoundRobin LB),该节点确定partition在哪并发送query到正确地址,最后返回结果。Cassandra, Riak用这种。可以用gossip,也可以用ZooKeeper.
  • knowledge在routing tier: 有一个转发层维护数据的存储位置,定向转发request。通常会结合一个独立的coordination service如ZooKeeper来维护cluster的metadata(partition -> nodes),每当partition有变化就会通知routing tier. Expresso, HBase, Kafka用这种。
  • knowledge在client(直连):client需要维护数据存储位置,直接找到节点取数据,注意这个client不是终端用户,而是data center里的另一个service,不然就泄露了。
  • Kubernete也提供service discovery

References