databus系统架构

Databus是一个低延迟、可靠的、支持事务的、保持一致性的数据变更抓取系统。由LinkedIn于2013年开源。Databus通过挖掘数据库日志的方式,将数据库变更实时、可靠的从数据库拉取出来,业务可以通过定制化client实时获取变更并进行其他业务逻辑。

Databus有以下特点:
数据源和消费者之间的隔离。
数据传输能保证顺序性和至少一次交付的高可用性。
从变化流的任意时间点进行消费,包括通过bootstrap获取所有数据。
分区消费
源一致性保存,消费不成功会一直消费直到消费成功

databus整体架构

avatar
上图中介绍了Databus系统的构成,包括Relays、Bootstrap Service和Client lib等。Bootstrap Service中包括Bootstrap Producer和Bootstrap Server。快速变化的Consumer直接从Relay中取事件。如果一个Consumer的数据更新大幅落后,它要的数据就不在Relay的日志中,而是需要请求Bootstrap Service,返回的将会是自Consumer上次处理变更之后的所有数据变更Snapshot。

Source Databases:MySQL以及Oracle数据源
Relays:负责抓取和存储数据库变更,全内存存储,也可配置使用mmap内存映射文件方式
Schema Registry:数据库数据类型到Databus数据类型的一个转换表
Bootstrap Service:一个特殊的客户端,功能和Relays类似,负责存储数据库变更,主要是磁盘存储
Application:数据库变更消费逻辑,从Relay中拉取变更,并消费变更
Client Lib:提供挑选关注变更的API给消费逻辑
Consumer Code:变更消费逻辑,可以是自身消费或者再将变更发送至下游服务

DataBus Client 状态机

以下两个图描述了从relay server获取events到event buffer所涉及的事件序列和状态转换

  1. 图1是启动顺序概述
    avatar
  2. 图2是事件记录的详细流程
    avatar

在线消费模式

总体概述如下

  1. 客户端启动时,状态为pick-server,客户端试图随机连接一个relay,异常时重试,直到最大重试次数
  2. relay选定后,客户端进入Request sources状态并且开始从relay检索资源列表
  3. source匹配错误,如果收到的响应中的sourceID校验错误,客户端切回到pick-relay状态
  4. source匹配正确,客户端进入request register状态,同样,如果返回的schema size错误,客户端切回pick-relay状态
  5. 如果资源列表和schema都正确,设置dispatcher开始调度从stream中接收的事件,并且客户端状态按照配置切换为BOOTSTRAP或者REQUEST STREAM
    5.1 Null relay filter ,relay filter未定义,从配置中初始化relay-filter,并且执行合并和重复数据消除
    5.2 Null channel,和relay丢失链接,抛出异常并重启流程

三级状态机设计,databus ConnectionState,ServerSetChangeMessage,LifecycleMessage,3层分工明确,层层递进,ConnectionStateMessage负责具体和relay连接的各种状态表示,比如流的读写,连接的打开关闭,shcema的注册,ServerSetChangeMessage负责serverInfo状态变更处理,LifecycleMessage负责整个组件生命周期管理,三种状态分别对应三个状态机,
RelayPullThread, BasePullThread,AbstractActorMessageQueue,三者为顺序继承关系,RelayPullThread extend BasePullThread extend AbstractActorMessageQueue,

  • 状态分级处理的设计思路很巧妙,值得我们借鉴,类似的思想比如java类加载中的双亲委派,原则都是将基础的,顶层的资源交给基础的,顶层的管理者来管理。
  • 多维状态机的思路可以用到电商业务早期的订单状态机模型,一个订单包含多个资源,上层是订单状态机,下层是资源状态机

在线消费-故障模式

avatar
大部分对relay的请求,一般可能发生两种错误,http请求阶段的Request error和Response error

BOOTSTRAP 模型

以下两个图描述了获取客户端bootstrap所涉及的事件序列和状态转换

  1. 图1是启动顺序概述
    avatar
  2. 图2是事件记录的详细流程
    avatar

总体描述如下

  1. 如果没有可用的checkpoint,将会重置为window scn,同时会选取一个bootstrap,逻辑和pick relay类似
  2. 连接上bootstrap server之后,如果没有有效的resume checkpoint,客户端请求开始Svn

BOOTSTRAP 失败模型

avatar

Databus 2.0 Client Design

databus客户端关注从relay消费events,必须整合一个databus client library,提供API以选择他们关注的变更流
avatar
Relay Connection
relay connection使用relay http interface来从relay获取实时更新流

bootstrap Connection

bootstrap Connection使用bootstrap http interface从bootstrap server 追溯早期更新,持久化可以在本地(local file),也可以是共享空间(zookeeper)

Dispatcher

dispatcher读取从online streams和bootstrap进来的events,消费代码回调,主要返回如下
1: 确定正确的回调
2:错误与超时监控
3:确保客户端保持事件消费的进程

Consumer Code Callbacks

Checkpoint persistence

ckpt是一个消费者消费更新流的内部表示,ckpt对象默认为json格式

1
{"windowOffset":-1,"snapshot_offset":-1,"prevScn":-1,"windowScn":5984508975840,"consumption_mode":"ONLINE_CONSUMPTION"}

Bootstrap checkpoint

1
{"snapshot_offset":0,"prevScn":5984488321377,"windowScn":5984488321377,"consumption_mode":"BOOTSTRAP_SNAPSHOT","windowOffset":-1,"bootstrap_since_scn":5984488321377,"bootstrap_start_scn":-1,"bootstrap_target_scn":-1,"bootstrap_snapshot_source_index":0,"bootstrap_catchup_source_index":0,"bootstrap_server_info":null,"snapshot_source":"com.linkedin.databus.example.Person"}

Databus 2.0 Relay Design

Databus Relays负责如下工作:

  1. 读取数据库或者其他数据源变更行,并且序列化为内存buff中的databus数据变更事件
  2. 监听来自客户端的请求(包括bootstrap等其他链接的relay或者事件消费者),并且将数据变更事件流式传入到客户端
    avatar
    一个relay包含一个或者多个circular event buffer用于存储databus事件,事件按照系统变更号(SCNS)递增,这些buffer可以存储在直接内存或者通过MMP映射到文件系统。
    每个缓冲区还有一个对应的内存稀疏索引,称为scn索引,以及一个maxscn读写器, maxscn读写器周期性地保持在被拉入relay的事件中看到的最高scn的值。
    relay通过一个请求处理器(它监听一个netty通道)将来自databus客户端的请求字段化,relay公开了一个restful接口,既可用于拉去event,也可用于relay的自身管理
    如果事件缓冲区的内容被配置为MMP,那么在关闭时,中继还会保存scn索引和一些元数据,以便在重新启动时保留事件。

Database Event Producer

DB event producer定时轮询数据源的变更,如果检测到这些变更,会将变更转换为Avro记录
从jdbc行到avro记录的转换是基于存储在shcema registry中的Avro schemas自动完成的,schemas从数据源的oracle schema自动生成的。
Avro记录序列化到包含特定的databus元数据和二进制序列化信息的databus events中,
AbstractEventProducer 类为开发事件生产者提供了一个框架,OracleEventProducer在读取oracle数据变更的txlog表时生产dbusEvents

MaxSCN Reader/Writer读写器

maxscn读写器用于跟踪 Database Event Producer(dbep)的进度。读取器在dbep启动期间使用。如果未指定起始scn,则读取器读取最新的scn

JMX 暂不做介绍

在从dbep读取并存储在relay event buffer中的每一批更新之后,dbep使用maxscn writer存储最后一个已处理的scn,目前,MAXSCN读写器将SCN持久化到本地文件中,内容如下
5984592768094:Thu Dec 13 00:54:07 UTC 2012,除了本地文件存储,还可以选择RDBMS或者zookeeper

Schema Registry

schema register是一个包含所有被databus感知到的数据表schemas的集合