Databus是一个低延迟、可靠的、支持事务的、保持一致性的数据变更抓取系统。由LinkedIn于2013年开源。Databus通过挖掘数据库日志的方式,将数据库变更实时、可靠的从数据库拉取出来,业务可以通过定制化client实时获取变更并进行其他业务逻辑。
Databus有以下特点:
数据源和消费者之间的隔离。
数据传输能保证顺序性和至少一次交付的高可用性。
从变化流的任意时间点进行消费,包括通过bootstrap获取所有数据。
分区消费
源一致性保存,消费不成功会一直消费直到消费成功
databus整体架构
上图中介绍了Databus系统的构成,包括Relays、Bootstrap Service和Client lib等。Bootstrap Service中包括Bootstrap Producer和Bootstrap Server。快速变化的Consumer直接从Relay中取事件。如果一个Consumer的数据更新大幅落后,它要的数据就不在Relay的日志中,而是需要请求Bootstrap Service,返回的将会是自Consumer上次处理变更之后的所有数据变更Snapshot。
Source Databases:MySQL以及Oracle数据源
Relays:负责抓取和存储数据库变更,全内存存储,也可配置使用mmap内存映射文件方式
Schema Registry:数据库数据类型到Databus数据类型的一个转换表
Bootstrap Service:一个特殊的客户端,功能和Relays类似,负责存储数据库变更,主要是磁盘存储
Application:数据库变更消费逻辑,从Relay中拉取变更,并消费变更
Client Lib:提供挑选关注变更的API给消费逻辑
Consumer Code:变更消费逻辑,可以是自身消费或者再将变更发送至下游服务
DataBus Client 状态机
以下两个图描述了从relay server获取events到event buffer所涉及的事件序列和状态转换
- 图1是启动顺序概述
- 图2是事件记录的详细流程
在线消费模式
总体概述如下
- 客户端启动时,状态为pick-server,客户端试图随机连接一个relay,异常时重试,直到最大重试次数
- relay选定后,客户端进入Request sources状态并且开始从relay检索资源列表
- source匹配错误,如果收到的响应中的sourceID校验错误,客户端切回到pick-relay状态
- source匹配正确,客户端进入request register状态,同样,如果返回的schema size错误,客户端切回pick-relay状态
- 如果资源列表和schema都正确,设置dispatcher开始调度从stream中接收的事件,并且客户端状态按照配置切换为BOOTSTRAP或者REQUEST STREAM
5.1 Null relay filter ,relay filter未定义,从配置中初始化relay-filter,并且执行合并和重复数据消除
5.2 Null channel,和relay丢失链接,抛出异常并重启流程
三级状态机设计,databus ConnectionState,ServerSetChangeMessage,LifecycleMessage,3层分工明确,层层递进,ConnectionStateMessage负责具体和relay连接的各种状态表示,比如流的读写,连接的打开关闭,shcema的注册,ServerSetChangeMessage负责serverInfo状态变更处理,LifecycleMessage负责整个组件生命周期管理,三种状态分别对应三个状态机,
RelayPullThread, BasePullThread,AbstractActorMessageQueue,三者为顺序继承关系,RelayPullThread extend BasePullThread extend AbstractActorMessageQueue,
- 状态分级处理的设计思路很巧妙,值得我们借鉴,类似的思想比如java类加载中的双亲委派,原则都是将基础的,顶层的资源交给基础的,顶层的管理者来管理。
- 多维状态机的思路可以用到电商业务早期的订单状态机模型,一个订单包含多个资源,上层是订单状态机,下层是资源状态机
在线消费-故障模式
大部分对relay的请求,一般可能发生两种错误,http请求阶段的Request error和Response error
BOOTSTRAP 模型
以下两个图描述了获取客户端bootstrap所涉及的事件序列和状态转换
- 图1是启动顺序概述
- 图2是事件记录的详细流程
总体描述如下
- 如果没有可用的checkpoint,将会重置为window scn,同时会选取一个bootstrap,逻辑和pick relay类似
- 连接上bootstrap server之后,如果没有有效的resume checkpoint,客户端请求开始Svn
BOOTSTRAP 失败模型
Databus 2.0 Client Design
databus客户端关注从relay消费events,必须整合一个databus client library,提供API以选择他们关注的变更流
Relay Connection
relay connection使用relay http interface来从relay获取实时更新流
bootstrap Connection
bootstrap Connection使用bootstrap http interface从bootstrap server 追溯早期更新,持久化可以在本地(local file),也可以是共享空间(zookeeper)
Dispatcher
dispatcher读取从online streams和bootstrap进来的events,消费代码回调,主要返回如下
1: 确定正确的回调
2:错误与超时监控
3:确保客户端保持事件消费的进程
Consumer Code Callbacks
Checkpoint persistence
ckpt是一个消费者消费更新流的内部表示,ckpt对象默认为json格式1
{"windowOffset":-1,"snapshot_offset":-1,"prevScn":-1,"windowScn":5984508975840,"consumption_mode":"ONLINE_CONSUMPTION"}
Bootstrap checkpoint
1 | {"snapshot_offset":0,"prevScn":5984488321377,"windowScn":5984488321377,"consumption_mode":"BOOTSTRAP_SNAPSHOT","windowOffset":-1,"bootstrap_since_scn":5984488321377,"bootstrap_start_scn":-1,"bootstrap_target_scn":-1,"bootstrap_snapshot_source_index":0,"bootstrap_catchup_source_index":0,"bootstrap_server_info":null,"snapshot_source":"com.linkedin.databus.example.Person"} |
Databus 2.0 Relay Design
Databus Relays负责如下工作:
- 读取数据库或者其他数据源变更行,并且序列化为内存buff中的databus数据变更事件
- 监听来自客户端的请求(包括bootstrap等其他链接的relay或者事件消费者),并且将数据变更事件流式传入到客户端
一个relay包含一个或者多个circular event buffer用于存储databus事件,事件按照系统变更号(SCNS)递增,这些buffer可以存储在直接内存或者通过MMP映射到文件系统。
每个缓冲区还有一个对应的内存稀疏索引,称为scn索引,以及一个maxscn读写器, maxscn读写器周期性地保持在被拉入relay的事件中看到的最高scn的值。
relay通过一个请求处理器(它监听一个netty通道)将来自databus客户端的请求字段化,relay公开了一个restful接口,既可用于拉去event,也可用于relay的自身管理
如果事件缓冲区的内容被配置为MMP,那么在关闭时,中继还会保存scn索引和一些元数据,以便在重新启动时保留事件。
Database Event Producer
DB event producer定时轮询数据源的变更,如果检测到这些变更,会将变更转换为Avro记录
从jdbc行到avro记录的转换是基于存储在shcema registry中的Avro schemas自动完成的,schemas从数据源的oracle schema自动生成的。
Avro记录序列化到包含特定的databus元数据和二进制序列化信息的databus events中,
AbstractEventProducer 类为开发事件生产者提供了一个框架,OracleEventProducer在读取oracle数据变更的txlog表时生产dbusEvents
MaxSCN Reader/Writer读写器
maxscn读写器用于跟踪 Database Event Producer(dbep)的进度。读取器在dbep启动期间使用。如果未指定起始scn,则读取器读取最新的scn
JMX 暂不做介绍
在从dbep读取并存储在relay event buffer中的每一批更新之后,dbep使用maxscn writer存储最后一个已处理的scn,目前,MAXSCN读写器将SCN持久化到本地文件中,内容如下
5984592768094:Thu Dec 13 00:54:07 UTC 2012,除了本地文件存储,还可以选择RDBMS或者zookeeper
Schema Registry
schema register是一个包含所有被databus感知到的数据表schemas的集合