在最近学习databus的过程中,除了看源码,还有很重要的就是实操,databus搭建,测试,链接msyql,orcle,将整个系统跑起来,有助于源码的理解,下面以msyql为例,将整个流程执行一遍
准备mysql
1:首先确认据库是否开启binlog,如果binlog没有开启,可以通过set sql_log_bin=1命令来启用
2:配置数据库binlog_format=ROW, show variables like ‘binlog_format‘可查看日志格式, set globle binlog_format=ROW可修改,最好在my.cnf中修改
3:binlog_checksum设置为空,show global variables like ‘binlog_checksum’命令可查看,set binlog_checksum=none可设值,同样最好在my.cnf中修改
4:确认my.cnf中有server_id配置,设置之后需要重启
下载源码
Databus官网下载源码,下载地址https://github.com/linkedin/databus.git.
我们需要用到databus目录下的databus2-example文件夹,在此基础上改造并运行,目录结构及介绍如下:
database:数据库模拟相关的脚本和工具
databus2-example-bst-producer-pkg:bootstrap producer的属性配置文件夹,包括bootstrap producer和log4j属性文件,build脚本以及bootstrap producer的启动和停止脚本。
databus2-example-client-pkg:client的属性配置文件夹,包括各种属性文件和启动和停止脚本。
databus2-example-client:client源代码,包含启动主类和消费者代码逻辑。
databus2-example-relay-pkg:relay的属性配置文件夹,包含监控的表的source信息和Avro schema。
databus2-example-relay:relay的启动主类。
schemas_registry:存放表的avsc文件。
Relay端配置
Relay属性文件:databus2-example-relay-pkg/conf/relay-or-person.properties为relay配置,包括端口号,buffer存储策略,maxScn存放地址等信息,重点关注maxScn的存放地址,relay启动之后会在改目录下创建MaxScn_Person文件,需要确保有权限写入
databus.relay.dataSources.sequenceNumbersHandler.file.scnDir=/tmp/maxScn
被监控表配置文件:databus2-example-relay-pkg/conf/sources-or-person.json
其中URI format:mysql://username/password@mysql_host[:mysql_port]/mysql_serverid/binlog_prefix,注意%2F为转义字符,用户名为root,数据库密码为root。
1 | { |
注册Avro schema到index.schemas_registry文件,databus2-example-relay-pkg/schemas_registry/index.schemas_registry文件中添加行com.linkedin.events.example.or_test.Person.1.avsc ,每定义一个Avro schema都需要添加进去,relay运行时会到此文件中查找表对应的定义的Avro schema。
Client端配置
Client属性文件:databus2-example-client-pkg/conf/client-person.properties的内容如下配置,包括端口号,buffer存储策略,checkpoint持久化等信息:
databus2-example-client/src/main/java下的PersonConsumer类是消费逻辑回调代码,主要是取出每一个event后依次打印每个字段的名值对,主要代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private ConsumerCallbackResult processEvent(DbusEvent event, DbusEventDecoder eventDecoder)
{
GenericRecord decodedEvent = eventDecoder.getGenericRecord(event, null);
try {
Utf8 firstName = (Utf8)decodedEvent.get("firstName");
Utf8 lastName = (Utf8)decodedEvent.get("lastName");
Long birthDate = (Long)decodedEvent.get("birthDate");
Utf8 deleted = (Utf8)decodedEvent.get("deleted");
LOG.info("firstName: " + firstName.toString() +
", lastName: " + lastName.toString() +
", birthDate: " + birthDate +
", deleted: " + deleted.toString());
} catch (Exception e) {
LOG.error("error decoding event ", e);
return ConsumerCallbackResult.ERROR;
}
return ConsumerCallbackResult.SUCCESS;
}
databus2-example-client/src/main/java下的PersonClient类是relay的启动主类,主要是设置启动Client的配置信息,将消费者实例注册到监听器中,后续可对其进行回调,主要代码如下:
1 | public static void main(String[] args) throws Exception |
build-启动-测试
Build:Databus采用gradle进行编译,所以需要安装gradle环境,安装安成后进入databus根目录执行命令gradle -Dopen_source=true assemble即可完成build,成功后在databus根目录下生成名为build的文件夹
启动Relay:
cd build/databus2-example-relay-pkg/distributions
tar -zxvf databus2-example-relay-pkg.tar.gz解压
执行启动脚本 ./bin/start-example-relay.sh or_person -Y ./conf/sources-or-person.json
执行命令 curl -s http://localhost:11115/sources返回如下内容说明启动成功:
启动Client:
cd build/databus2-example-client-pkg/distributions
tar -zxvf databus2-example-client-pkg.tar.gz解压
执行启动脚本 ./bin/start-example-client.sh person
执行命令 curl http://localhost:11115/relayStats/outbound/http/clients返回如下内容说明启动成功:
测试
Relay和Client启动成功后,就已经开始对person表进行数据变更捕获了,现在向person表插入一条如下记录,databus2-example-relay-pkg/distributions/logs下的relay.log记录如下:
问题
Could not find first log file name in binary log index file
1: 首先检查binlog是否存在,确认my.cnf中binlog目录,如果没有重新添加目录,重启mysql
2:检查当前用户是否有权限读取binlog,binlog的读取需要赋权,具体如下1
2mysql>CREATE USER 'user'@ 'X.X.X.X' IDENTIFIED BY 'password';
mysql>GRANT REPLICATION SLAVE ON *.* TO 'user'@'X.X.X.X' IDENTIFIED BY 'password';
3: 检查databus-relay-conf中配置的binlog前缀是否可mysql中my.cnf中binlog前缀一致,例如两者都应该为mysql-bin1
2relay-conf: "uri" : "mysql://root%2Froot@127.0.0.1:3306/1/mysql-bin"
my.cnf : log_bin=/app/mysql/datalog/mysql-bin
3: 如果是主从复制遇到这个问题,一般是由于master重启之后导致两边binlog-index不同步,操作步骤如下
slave执行 :mysql> slave stop;
master执行:mysql>SHOW MASTER STATUS;
mysql> flush logs;
slave执行 :mysql> CHANGE MASTER TO MASTER_LOG_FILE='mysqld-bin.000011',MASTER_LOG_POS=106;
mysql> slave start;
mysql> show slave status\G;