当需要两张表数据同步的时候,我们会想到几种方案? 最简单的一种方式就是触发器的方式。例如A 同步到B ,可以通过下面的sql来添加触发器
create trigger tri_trade_update
after UPDATE
on `A`
for each row
begin
update `B`
set
company_id = new.`company_id`,
supplier_id =new.`supplier_id`
WHERE id=old.`id`;
end;
但是这种方式有一定的弊端和局限性,首先就是只局限同库,并且会增大数据库开销,以及无法实现一些自定义的逻辑。
canal是阿里推出的一个开源的中间件,它的原理类似于mysql主从的原理,它把自己伪装成mysql的从库,这样就可以从主库中获取binLog来复制数据
接下来我们自己搭建一个canal同步mysql的一个demo。我们都通过docker来安装mysql和canal,这样比较方便
一、首先需要安装mysql容器
1、首先在docker搜索mysql的官方镜像。我们选择最新的版本latest然后拉取下来 2、运行这个镜像 3、使用命令把mysql容器内部存储数据文件拷贝到外部目录存储。
docker cp mysql:/etc/mysql/my.cnf /Users/admin/WORK/docker/mysql/config
docker cp mysql:/var/lib/mysql /Users/admin/WORK/docker/mysql/data
4、 删除运行的mysql容器 5、 重新启动mysql容器 -v 挂载容器文件到外部目录,这样我们就可以持久化mysql的数据不会随着容器关闭而消失。 -p 将容器内mysql的端口映射到外部端口上 -e 设置环境变量
docker run -d \
--name mysql \
-p 3306:3306 \
-v /Users/admin/WORK/docker/mysql/config/my.cnf:/etc/mysql/my.cnf \
-v /Users/admin/WORK/docker/mysql/data/mysql:/var/lib/mysql \
-e MYSQL_ROOT_PASSWORD=123456 \
mysql:latest
二、其次我们安装canal容器
1、首先在docker搜索canal-server的官方镜像。选择最新的版本latest然后拉取下来 2、运行这个镜像 3、使用命令把mysql容器内部存储数据文件拷贝到外部目录存储。
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /Users/admin/WORK/docker/cancal/conf
4、 删除运行的canal容器 5、 重新启动canal容器 -v 挂载容器文件到外部目录,这样我们就可以持久化mysql的数据不会随着容器关闭而消失。 -p 将容器内mysql的端口映射到外部端口上 -e 设置环境变量
docker run -d \
--name canal \
-p 11111:11111 \
-v /Users/admin/WORK/docker/cancal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
canal/canal-server:latest
三、同步canal和mysql
修改本地my.cnf文件。将canal同步需要的配置添加上
log-bin=mysql-bin
binlog-format=ROW
server_id=1
创建canal账号,并赋予同步权限
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
create database canal;
show variables like 'log_bin';
修改instance.properties文件
修改canal.instance.master.address配置,ip改成mysql容器的内部ip。注意容器跟容器访问不能使用127.0.0.1
重启mysql容器,重启canal容器
进入canal容器中,观察日志more canal-server/logs/example/example.log 查看是否报错
四、java编写运行canal客户端代码
package logistics.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class CanalTest {
public static void main(String[] args) {
String ip = "127.0.0.1";
String destination = "example";
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(ip, 11111), destination, "", ""
);
canalConnector.connect();
canalConnector.subscribe();
int batchSize = 5 * 1024;
while (true) {
Message message = canalConnector.getWithoutAck(batchSize);
long id = message.getId();
int size = message.getEntries().size();
System.out.println("当前监控到的binLog消息数量是:" + size);
if (id == -1 || size == 0) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
List<Entry> entries = message.getEntries();
for (Entry entry : entries) {
System.out.println("----------------------------------------");
System.out.println("当前的二进制日志的条目(entry)类型是:" + entry.getEntryType());
if (entry.getEntryType() == EntryType.ROWDATA) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
System.out.println("事件类型是:" + rowChange.getEventType());
for (RowData rowData : rowChange.getRowDatasList()) {
System.out.println("改变前的数据:" + rowData.getBeforeColumnsList());
System.out.println("改变后的数据:" + rowData.getAfterColumnsList());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
canalConnector.ack(id);
}
}
}
}
测试
开启客户端,mysql执行sql语句。我们可以看到客户端日志变化,说明成功了 
碰到的问题
1、mysql启动时报错。Different lower_case_table_names settings for server (‘1‘) and data dictionary (‘0‘)
解决方法: my.cnf里面添加lower_case_table_names = 1。同时删除data里面的内容,去掉data目录的挂载,重新启动容器再复制一份到data中。这样再次启动就不会报错了
2、canal日志报错MySQL8.0 caching_sha2_password Auth failed,无法连接mysql
解决方法: mysql执行下面语句
ALTER USER 'canal'@'%' IDENTIFIED BY 'canal' PASSWORD EXPIRE NEVER;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
FLUSH PRIVILEGES;
|