基于Docker结合Canal实现MySQL实时增量数据传输功能

Canal的介绍

Canal的历史由来

在早期的时候,阿里巴巴公司因为杭州和美国两个地方的机房都部署了数据库实例,但因为跨机房同步数据的业务需求 ,便孕育而生出了Canal,主要是基于trigger(触发器)的方式获取增量变更。从2010年开始,阿里巴巴公司开始逐步尝试数据库日志解析,获取增量变更的数据进行同步,由此衍生出了增量订阅和消费业务。

当前的Canal支持的数据源端MySQL版本包括:5.1.x 、5.5.x 、5.6.x、5.7.x、8.0.x。

Canal的应用场景

目前普遍基于日志增量订阅和消费的业务,主要包括:

  1. 基于数据库增量日志解析,提供增量数据订阅和消费
  2. 数据库镜像 数据库实时备份
  3. 索引构建和实时维护(拆分异构索引、倒排索引等)
  4. 业务Cache刷新
  5. 带业务逻辑的增量数据处理
  6. Canal的工作原理

在介绍Canal的原理之前,我们先来了解下MySQL主从复制的原理。

MySQL主从复制原理

  • MySQL Master将数据变更的操作写入二进制日志binary log中, 其中记录的内容叫做二进制日志事件binary log events,可以通过show binlog events命令进行查看
  • MySQL Slave会将Master的binary log中的binary log events拷贝到它的中继日志relay log
  • MySQL Slave重读并执行relay log中的事件,将数据变更映射到它自己的数据库表中

了解了MySQL的工作原理,我们可以大致猜想到Canal应该也是采用类似的逻辑去实现增量数据订阅的功能,那么接下来我们看看实际上Canal的工作原理是怎样的?

Canal工作原理

  • Canal模拟MySQL Slave的交互协议,伪装自己为MySQL Slave,向MySQL Master发送dump协议
  • MySQL Master收到dump请求,开始推送binary log给Slave(也就是Canal)
  • Canal解析binary log对象(数据为byte流)

基于这样的原理与方式,便可以完成数据库增量日志的获取解析,提供增量数据订阅和消费,实现MySQL实时增量数据传输的功能。

既然Canal是这样的一个框架,又是纯Java语言编写而成,那么我们接下来就开始学习怎么使用它并把它用到我们的实际工作中。

Canal的Docker环境准备

因为目前容器化技术的火热,本文通过使用Docker来快速搭建开发环境,而传统方式的环境搭建,在我们学会了Docker容器环境搭建后,也能自行依葫芦画瓢搭建成功。由于本篇主要讲解Canal,所以关于Docker的内容不会涉及太多,主要会介绍Docker的基本概念和命令使用。 如果你想和更多容器技术专家交流,可以加我微信liyingjiese,备注『加群』。群里每周都有全球各大公司的最佳实践以及行业最新动态 。

什么是Docker

相信绝大多数人都使用过虚拟机VMware,在使用VMware进行环境搭建的时候,只需提供了一个普通的系统镜像并成功安装,剩下的软件环境与应用配置还是如我们在本机操作一样在虚拟机里也操作一遍,而且VMware占用宿主机的资源较多,容易造成宿主机卡顿,而且系统镜像本身也占用过多空间。

为了便于大家快速理解Docker,便与VMware做对比来做介绍,Docker提供了一个开始,打包,运行APP的平台,把APP(应用)和底层infrastructure(基础设施)隔离开来。Docker中最主要的两个概念就是镜像(类似VMware的系统镜像)与容器(类似VMware里安装的系统)。

什么是Image(镜像)

  • 文件和meta data的集合(root filesystem)
  • 分层的,并且每一层都可以添加改变删除文件,成为一个新的image
  • 不同的image可以共享相同的layer
  • Image本身是read-only的

什么是Container(容器)

  • 通过Image创建(copy)
  • 在Image layer之上建立一个container layer(可读写)
  • 类比面向对象:类和实例
  • Image负责APP的存储和分发,Container负责运行APP

Docker的网络介绍

Docker的网络类型有三种:

  • Bridge:桥接网络。默认情况下启动的Docker容器,都是使用Bridge,Docker安装时创建的桥接网络,每次Docker容器重启时,会按照顺序获取对应的IP地址,这个就导致重启下,Docker的IP地址就变了。
  • None:无指定网络。使用 --network=none,Docker容器就不会分配局域网的IP。
  • Host:主机网络。使用--network=host,此时,Docker容器的网络会附属在主机上,两者是互通的。例如,在容器中运行一个Web服务,监听8080端口,则主机的8080端口就会自动映射到容器中。

创建自定义网络:(设置固定IP)

docker network create --subnet=172.18.0.0/16 mynetwork

查看存在的网络类型docker network ls:

搭建Canal环境

附上Docker的下载安装地址==> Docker Download

下载Canal镜像docker pull canal/canal-server

下载MySQL镜像docker pull mysql,下载过的则如下图:

查看已经下载好的镜像docker images:

接下来通过镜像生成MySQL容器与canal-server容器:

##生成mysql容器
docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root mysql
##生成canal-server容器
docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server
## 命令介绍
--net mynetwork #使用自定义网络
--ip #指定分配ip

查看Docker中运行的容器docker ps:

MySQL的配置修改

以上只是初步准备好了基础的环境,但是怎么让Canal伪装成Salve并正确获取MySQL中的binary log呢?

对于自建MySQL,需要先开启Binlog写入功能,配置binlog-format为ROW模式,通过修改MySQL配置文件来开启bin_log,使用find / -name my.cnf查找my.cnf,修改文件内容如下:

[mysqld]
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 选择ROW模式
server_id=1 # 配置MySQL replaction需要定义,不要和Canal的slaveId重复

进入MySQL容器docker exec -it mysql bash。

创建链接MySQL的账号Canal并授予作为MySQL slave的权限,如果已有账户可直接GRANT:

mysql -uroot -proot
# 创建账号
CREATE USER canal IDENTIFIED BY 'canal';
# 授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
# 刷新并应用
FLUSH PRIVILEGES;

数据库重启后,简单测试 my.cnf 配置是否生效:

show variables like 'log_bin';
show variables like 'log_bin';
show master status;

canal-server的配置修改

进入canal-server容器docker exec -it canal-server bash

编辑canal-server的配置vi canal-server/conf/example/instance.properties

更多配置请参考==>Canal配置说明

重启canal-server容器docker restart canal-server 进入容器查看启动日志:

docker exec -it canal-server bash
tail -100f canal-server/logs/example/example.log

至此,我们的环境工作准备完成!

拉取数据并同步保存到ElasticSearch

本文的ElasticSearch也是基于Docker环境搭建,所以读者可执行如下命令:

# 下载对镜像
docker pull elasticsearch:7.1.1
docker pull mobz/elasticsearch-head:5-alpine
# 创建容器并运行
docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1
docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine

环境已经准备好了,现在就要开始我们的编码实战部分了,怎么通过应用程序去获取Canal解析后的binlog数据。首先我们基于Spring Boot搭建一个canal demo应用。结构如下图所示:

Student.java

package com.example.canal.study.pojo;
import lombok.Data;
import java.io.Serializable;
// @Data 用户生产getter、setter方法
@Data
public class Student implements Serializable {
private String id;
private String name;
private int age;
private String sex;
private String city;
} 

CanalConfig.java

package com.example.canal.study.common;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
/**
* @author haha
*/
@Configuration
public class CanalConfig {
// @Value 获取 application.properties配置中端内容
@Value("${canal.server.ip}")
private String canalIp;
@Value("${canal.server.port}")
private Integer canalPort;
@Value("${canal.destination}")
private String destination;
@Value("${elasticSearch.server.ip}")
private String elasticSearchIp;
@Value("${elasticSearch.server.port}")
private Integer elasticSearchPort;
@Value("${zookeeper.server.ip}")
private String zkServerIp;
// 获取简单canal-server连接
@Bean
public CanalConnector canalSimpleConnector() {
 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, "", "");
 return canalConnector;
}
// 通过连接zookeeper获取canal-server连接
@Bean
public CanalConnector canalHaConnector() {
 CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, "", "");
 return canalConnector;
}
// elasticsearch 7.x客户端
@Bean
public RestHighLevelClient restHighLevelClient() {
 RestHighLevelClient client = new RestHighLevelClient(
   RestClient.builder(new HttpHost(elasticSearchIp, elasticSearchPort))
 );
 return client;
}
} 

CanalDataParser.java

由于这个类的代码较多,文中则摘出其中比较重要的部分,其它部分代码可从GitHub上获取:

public static class TwoTuple<A, B> {
 public final A eventType;
 public final B columnMap;
 public TwoTuple(A a, B b) {
  eventType = a;
  columnMap = b;
 }
}
public static List<TwoTuple<EventType, Map>> printEntry(List<Entry> entrys) {
 List<TwoTuple<EventType, Map>> rows = new ArrayList<>();
 for (Entry entry : entrys) {
  // binlog event的事件事件
  long executeTime = entry.getHeader().getExecuteTime();
  // 当前应用获取到该binlog锁延迟的时间
  long delayTime = System.currentTimeMillis() - executeTime;
  Date date = new Date(entry.getHeader().getExecuteTime());
  SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  // 当前的entry(binary log event)的条目类型属于事务
  if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
   if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
    TransactionBegin begin = null;
    try {
     begin = TransactionBegin.parseFrom(entry.getStoreValue());
    } catch (InvalidProtocolBufferException e) {
     throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
    }
    // 打印事务头信息,执行的线程id,事务耗时
    logger.info(transaction_format,
      new Object[]{entry.getHeader().getLogfileName(),
        String.valueOf(entry.getHeader().getLogfileOffset()),
        String.valueOf(entry.getHeader().getExecuteTime()),
        simpleDateFormat.format(date),
        entry.getHeader().getGtid(),
        String.valueOf(delayTime)});
    logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
    printXAInfo(begin.getPropsList());
   } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
    TransactionEnd end = null;
    try {
     end = TransactionEnd.parseFrom(entry.getStoreValue());
    } catch (InvalidProtocolBufferException e) {
     throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
    }
    // 打印事务提交信息,事务id
    logger.info("----------------\n");
    logger.info(" END ----> transaction id: {}", end.getTransactionId());
    printXAInfo(end.getPropsList());
    logger.info(transaction_format,
      new Object[]{entry.getHeader().getLogfileName(),
        String.valueOf(entry.getHeader().getLogfileOffset()),
        String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
        entry.getHeader().getGtid(), String.valueOf(delayTime)});
   }
   continue;
  }
  // 当前entry(binary log event)的条目类型属于原始数据
  if (entry.getEntryType() == EntryType.ROWDATA) {
   RowChange rowChage = null;
   try {
    // 获取储存的内容
    rowChage = RowChange.parseFrom(entry.getStoreValue());
   } catch (Exception e) {
    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
   }
   // 获取当前内容的事件类型
   EventType eventType = rowChage.getEventType();
   logger.info(row_format,
     new Object[]{entry.getHeader().getLogfileName(),
       String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
       entry.getHeader().getTableName(), eventType,
       String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
       entry.getHeader().getGtid(), String.valueOf(delayTime)});
   // 事件类型是query或数据定义语言DDL直接打印sql语句,跳出继续下一次循环
   if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
    logger.info(" sql ----> " + rowChage.getSql() + SEP);
    continue;
   }
   printXAInfo(rowChage.getPropsList());
   // 循环当前内容条目的具体数据
   for (RowData rowData : rowChage.getRowDatasList()) {
    List<CanalEntry.Column> columns;
    // 事件类型是delete返回删除前的列内容,否则返回改变后列的内容
    if (eventType == CanalEntry.EventType.DELETE) {
     columns = rowData.getBeforeColumnsList();
    } else {
     columns = rowData.getAfterColumnsList();
    }
    HashMap<String, Object> map = new HashMap<>(16);
    // 循环把列的name与value放入map中
    for (Column column: columns){
     map.put(column.getName(), column.getValue());
    }
    rows.add(new TwoTuple<>(eventType, map));
   }
  }
 }
 return rows;
} 

ElasticUtils.java

package com.example.canal.study.common;
import com.alibaba.fastjson.JSON;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Map;
/**
* @author haha
*/
@Slf4j
@Component
public class ElasticUtils {
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
 * 新增
 * @param student
 * @param index 索引
 */
public void saveEs(Student student, String index) {
 IndexRequest indexRequest = new IndexRequest(index)
   .id(student.getId())
   .source(JSON.toJSONString(student), XContentType.JSON)
   .opType(DocWriteRequest.OpType.CREATE);
 try {
  IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  log.info("保存数据至ElasticSearch成功:{}", response.getId());
 } catch (IOException e) {
  log.error("保存数据至elasticSearch失败: {}", e);
 }
}
/**
 * 查看
 * @param index 索引
 * @param id _id
 * @throws IOException
 */
public void getEs(String index, String id) throws IOException {
 GetRequest getRequest = new GetRequest(index, id);
 GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
 Map<String, Object> fields = response.getSource();
 for (Map.Entry<String, Object> entry : fields.entrySet()) {
  System.out.println(entry.getKey() + ":" + entry.getValue());
 }
}
/**
 * 更新
 * @param student
 * @param index 索引
 * @throws IOException
 */
public void updateEs(Student student, String index) throws IOException {
 UpdateRequest updateRequest = new UpdateRequest(index, student.getId());
 updateRequest.upsert(JSON.toJSONString(student), XContentType.JSON);
 UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
 log.info("更新数据至ElasticSearch成功:{}", response.getId());
}
/**
 * 根据id删除数据
 * @param index 索引
 * @param id _id
 * @throws IOException
 */
public void DeleteEs(String index, String id) throws IOException {
 DeleteRequest deleteRequest = new DeleteRequest(index, id);
 DeleteResponse response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
 log.info("删除数据至ElasticSearch成功:{}", response.getId());
}
} 

BinLogElasticSearch.java

package com.example.canal.study.action;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.canal.study.common.CanalDataParser;
import com.example.canal.study.common.ElasticUtils;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* @author haha
*/
@Slf4j
@Component
public class BinLogElasticSearch {
@Autowired
private CanalConnector canalSimpleConnector;
@Autowired
private ElasticUtils elasticUtils;
//@Qualifier("canalHaConnector")使用名为canalHaConnector的bean
@Autowired
@Qualifier("canalHaConnector")
private CanalConnector canalHaConnector;
public void binLogToElasticSearch() throws IOException {
 openCanalConnector(canalHaConnector);
 // 轮询拉取数据
 Integer batchSize = 5 * 1024;
 while (true) {
  Message message = canalHaConnector.getWithoutAck(batchSize);
//   Message message = canalSimpleConnector.getWithoutAck(batchSize);
  long id = message.getId();
  int size = message.getEntries().size();
  log.info("当前监控到binLog消息数量{}", size);
  if (id == -1 || size == 0) {
   try {
    // 等待2秒
    Thread.sleep(2000);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  } else {
   //1. 解析message对象
   List<CanalEntry.Entry> entries = message.getEntries();
   List<CanalDataParser.TwoTuple<CanalEntry.EventType, Map>> rows = CanalDataParser.printEntry(entries);
   for (CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple : rows) {
    if(tuple.eventType == CanalEntry.EventType.INSERT) {
     Student student = createStudent(tuple);
     // 2。将解析出的对象同步到elasticSearch中
     elasticUtils.saveEs(student, "student_index");
     // 3.消息确认已处理
//     canalSimpleConnector.ack(id);
     canalHaConnector.ack(id);
    }
    if(tuple.eventType == CanalEntry.EventType.UPDATE){
     Student student = createStudent(tuple);
     elasticUtils.updateEs(student, "student_index");
     // 3.消息确认已处理
//     canalSimpleConnector.ack(id);
     canalHaConnector.ack(id);
    }
    if(tuple.eventType == CanalEntry.EventType.DELETE){
     elasticUtils.DeleteEs("student_index", tuple.columnMap.get("id").toString());
     canalHaConnector.ack(id);
    }
   }
  }
 }
}
/**
 * 封装数据至Student
 * @param tuple
 * @return
 */
private Student createStudent(CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple){
 Student student = new Student();
 student.setId(tuple.columnMap.get("id").toString());
 student.setAge(Integer.parseInt(tuple.columnMap.get("age").toString()));
 student.setName(tuple.columnMap.get("name").toString());
 student.setSex(tuple.columnMap.get("sex").toString());
 student.setCity(tuple.columnMap.get("city").toString());
 return student;
}
/**
 * 打开canal连接
 *
 * @param canalConnector
 */
private void openCanalConnector(CanalConnector canalConnector) {
 //连接CanalServer
 canalConnector.connect();
 // 订阅destination
 canalConnector.subscribe();
}
/**
 * 关闭canal连接
 *
 * @param canalConnector
 */
private void closeCanalConnector(CanalConnector canalConnector) {
 //关闭连接CanalServer
 canalConnector.disconnect();
 // 注销订阅destination
 canalConnector.unsubscribe();
}
} 

CanalDemoApplication.java(Spring Boot启动类)

package com.example.canal.study;
import com.example.canal.study.action.BinLogElasticSearch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author haha
*/
@SpringBootApplication
public class CanalDemoApplication implements ApplicationRunner {
@Autowired
private BinLogElasticSearch binLogElasticSearch;
public static void main(String[] args) {
 SpringApplication.run(CanalDemoApplication.class, args);
}
// 程序启动则执行run方法
@Override
public void run(ApplicationArguments args) throws Exception {
 binLogElasticSearch.binLogToElasticSearch();
}
} 

application.properties

server.port=8081
spring.application.name = canal-demo
canal.server.ip = 192.168.124.5
canal.server.port = 11111
canal.destination = example
zookeeper.server.ip = 192.168.124.5:2181
zookeeper.sasl.client = false
elasticSearch.server.ip = 192.168.124.5
elasticSearch.server.port = 9200

Canal集群高可用的搭建

通过上面的学习,我们知道了单机直连方式的Canala应用。在当今互联网时代,单实例模式逐渐被集群高可用模式取代,那么Canala的多实例集群方式如何搭建呢!

基于ZooKeeper获取Canal实例

准备ZooKeeper的Docker镜像与容器:

docker pull zookeeper
docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper
docker run -d --name canal-server2 --net mynetwork --ip 172.18.0.8 -p 11113:11113 canal/canal-server

1、机器准备:

  • 运行Canal的容器IP: 172.18.0.4 , 172.18.0.8
  • ZooKeeper容器IP:172.18.0.3:2181
  • MySQL容器IP:172.18.0.6:3306

2、按照部署和配置,在单台机器上各自完成配置,演示时instance name为example。

3、修改canal.properties,加上ZooKeeper配置并修改Canal端口:

canal.port=11113
canal.zkServers=172.18.0.3:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

4、创建example目录,并修改instance.properties:

canal.instance.mysql.slaveId = 1235
#之前的canal slaveId是1234,保证slaveId不重复即可
canal.instance.master.address = 172.18.0.6:3306

注意: 两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置。

启动两个不同容器的Canal,启动后,可以通过tail -100f logs/example/example.log查看启动日志,只会看到一台机器上出现了启动成功的日志。

比如我这里启动成功的是 172.18.0.4:

查看一下ZooKeeper中的节点信息,也可以知道当前工作的节点为172.18.0.4:11111:

[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.4:11111","cid":1} 

客户端链接, 消费数据

可以通过指定ZooKeeper地址和Canal的instance name,canal client会自动从ZooKeeper中的running节点获取当前服务的工作节点,然后与其建立链接:

[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.4:11111","cid":1} 

对应的客户端编码可以使用如下形式,上文中的CanalConfig.java中的canalHaConnector就是一个HA连接:

CanalConnector connector = CanalConnectors.newClusterConnector("172.18.0.3:2181", "example", "", "");

链接成功后,canal server会记录当前正在工作的canal client信息,比如客户端IP,链接的端口信息等(聪明的你,应该也可以发现,canal client也可以支持HA功能):

[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"192.168.124.5:59887","clientId":1001} 

数据消费成功后,canal server会在ZooKeeper中记录下当前最后一次消费成功的binlog位点(下次你重启client时,会从这最后一个位点继续进行消费):

[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor

{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalName":"binlog.000004","position":2169,"timestamp":1562672817000}} 

停止正在工作的172.18.0.4的canal server:

docker exec -it canal-server bash
cd canal-server/bin
sh stop.sh

这时172.18.0.8会立马启动example instance,提供新的数据服务:

[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.8:11111","cid":1} 

与此同时,客户端也会随着canal server的切换,通过获取ZooKeeper中的最新地址,与新的canal server建立链接,继续消费数据,整个过程自动完成。

异常与总结

elasticsearch-head无法访问Elasticsearch

es与es-head是两个独立的进程,当es-head访问es服务时,会存在一个跨域问题。所以我们需要修改es的配置文件,增加一些配置项来解决这个问题,如下:

[root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/
[root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml
# 文件末尾加上如下配置
http.cors.enabled: true
http.cors.allow-origin: "*"

修改完配置文件后需重启es服务。

elasticsearch-head查询报406 Not Acceptable

解决方法:

1、进入head安装目录;

2、cd _site/

3、编辑vendor.js 共有两处

#6886行 contentType: "application/x-www-form-urlencoded
改成 contentType: "application/json;charset=UTF-8"
 #7574行 var inspectData = s.contentType === "application/x-www-form-urlencoded" &&
改成 var inspectData = s.contentType === "application/json;charset=UTF-8" &&

使用elasticsearch-rest-high-level-clientorg.elasticsearch.action.index.IndexRequest.ifSeqNo

#pom中除了加入依赖
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.1.1</version>
</dependency>
#还需加入
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.1.1</version>
</dependency>

相关参考: git hub issues

为什么ElasticSearch要在7.X版本不能使用type?

参考: 为什么ElasticSearch要在7.X版本去掉type

使用spring-data-elasticsearch.jar报org.elasticsearch.client.transport.NoNodeAvailableException

由于本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底层采用es官方TransportClient,而es官方计划放弃TransportClient,工具以es官方推荐的RestHighLevelClient进行调用请求。 可参考 RestHighLevelClient API

设置Docker容器开启启动

如果创建时未指定 --restart=always ,可通过update 命令
docker update --restart=always [containerID]

Docker for Mac network host模式不生效

Host模式是为了性能,但是这却对Docker的隔离性造成了破坏,导致安全性降低。 在性能场景下,可以用--netwokr host开启Host模式,但需要注意的是,如果你用Windows或Mac本地启动容器的话,会遇到Host模式失效的问题。原因是Host模式只支持Linux宿主机。

参见官方文档:  https://docs.docker.com/network/host/  。

客户端连接ZooKeeper报authenticate using SASL(unknow error)

  • zookeeper.jar与Dokcer中的ZooKeeper版本不一致
  • zookeeper.jar使用了3.4.6之前的版本

出现这个错的意思是ZooKeeper作为外部应用需要向系统申请资源,申请资源的时候需要通过认证,而sasl是一种认证方式,我们想办法来绕过sasl认证。避免等待,来提高效率。

在项目代码中加入System.setProperty("zookeeper.sasl.client", "false");,如果是Spring Boot项目可以在application.properties中加入zookeeper.sasl.client=false

参考: Increased CPU usage by unnecessary SASL checks 。

如果更换canal.client.jar中依赖的zookeeper.jar的版本

把Canal的官方源码下载到本机git clone  https://github.com/alibaba/canal.git  ,然后修改client模块下pom.xml文件中关于ZooKeeper的内容,然后重新mvn install:

把自己项目依赖的包替换为刚刚mvn install生产的包:

关于选型的取舍

总结

以上所述是小编给大家介绍的基于Docker结合Canal实现MySQL实时增量数据传输功能,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对我们网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

(0)

相关推荐

  • 详解Android——蓝牙技术 带你实现终端间数据传输

    蓝牙技术在智能硬件方面有很多用武之地,今天我就为大家分享一下蓝牙在Android系统下的使用方法技巧,并实现一下两个终端间数据的传输. 蓝牙(Bluetooth)是一种短距离的无线通信技术标准,蓝牙协议分为4层,即核心协议层.电缆替代协议层.电话控制协议层和采纳的其它协议层. 这4种协议中最重要的是核心协议.蓝牙的核心协议包括基带.链路管理.逻辑链路控制和适应协议四部分.其中链路管理(LMP)负责蓝牙组件间连接的建立.逻辑链路控制与适应协议(L2CAP)位于基带协议层上,属于数据链路层,是一个为

  • ajax数据传输方式实例详解

    本文实例讲述了ajax数据传输方式.分享给大家供大家参考,具体如下: 在异步应用程序中发送和接收信息时,常见的可以选择以纯文本和XML作为数据格式(可参考<jQuery学习笔记之Ajax用法实例详解>),现在还有一种比较流行的方式:JSON(JavaScript Object Notation).好了,下面举例说明这三种数据格式在ajax的异步应用. 一.纯文本方式 1.发送/接收数据: Code is cheap.看代码: testJs.js // 此函数等价于document.getEle

  • 如何使用json在前后台进行数据传输实例介绍

    上一篇博客写到用javascript生成多组文本,可以让数据的输入不受显示,现在我们需要把这些输入写入数据库,这里就用到json传入. 首先,我们来写一下后台如何生成要传输的数据 [html] 复制代码 代码如下: function generateDtb() { //写入 var txtName = document.getElementById("txtName").value; //创建数组 var dtb = new Array(); //通过循环把数据写入到数组并返回 for

  • Java使用TCP实现数据传输实例详解

    Java使用TCP实现数据传输实例详解 TCP所提供服务的主要特点: 1.面向连接的传输: 2.端到端的通信: 3.高可靠性,确保传输数据的正确性,不出现丢失或乱序: 4.全双工方式传输: 5.采用字节流方式,即以字节为单位传输字节序列: 6.紧急数据传送功能. TCP传输需要建立客户端和服务器端,即Socket和Server Socket , 建立连接后,通过Socket中的IO流进行数据的传输 .传输结束后关闭Socket. 客户端和服务器端是两个独立的应用程序. 以下是实现基本的TCP数据

  • 使用jQuery fancybox插件打造一个实用的数据传输模态弹出窗体

    模态窗体已经成为Web开发人员设计界面时经常要使用的传输数据的方式.通过模态窗口,可以提高网站的可用性.正好项目的需要,有个客户想要模态弹出的窗体来提交网站的反馈,经过一番测试实现了,我使用jQuery fancybox插件来创建一个漂亮的模态窗体,提交表单的数据在服务器端实现Ajax调用.你可以在你的邮件里收到用户发送的反馈消息 html代码 header部分主要的JS文件如下引入jquery代码和fancybox代码 复制代码 代码如下: <script type="text/java

  • ajax用json实现数据传输

    JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式.它基于ECMAScript的一个子集. JSON采用完全独立于语言的文本格式,但是也使用了类似于C语言家族的习惯(包括C.C++.C#.Java.JavaScript.Perl.Python等).这些特性使JSON成为理想的数据交换语言. 易于人阅读和编写,同时也易于机器解析和生成(一般用于提升网络传输速率). json简单说就是javascript中的对象和数组,所以这两种结构就是对象和数组两种结构,

  • 用JSON做数据传输格式中的一些问题总结

    向客户端提供JSON数据的方式 一. 用WCF提供Json数据 用WCF向客户端提供Json数据我们需要注意, A. 契约的定义, 在WebInvokeAttribute 或者 WebGetAttribute中的ResponseFormat设置为WebMessageForm.Json, 复制代码 代码如下: [WebInvoke(Method = "POST", ResponseFormat = WebMessageFormat.Json, RequestFormat = WebMes

  • Vuejs 用$emit与$on来进行兄弟组件之间的数据传输通信

    最近在学习vue组件鸡组件之前通信问题,正好看到,以此来留作笔记. <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <title>Vue2-单一事件管理组件通信</title> <script src="vue.js"></script> <script type=

  • asp.net Web Service 接口大量数据传输解决方案

    具体请看下面的流程图及其说明 1,流程图 2,流程说明:线程1开始请求接口获取1W条数据,当数据成功获取后,接口是闲置的,这时我们开始线程2获取数据,同时线程1继续执行获取数据的后续工作,如果转换数据,这里我用的办法是,使用预先定义的实体对象格式 反序列化XML (据说这种方式比遍历XML或是载入到DataSet中循环 读取都要高效的,具体我没有测试过 ,哈哈)将数据插入到数据后停止线程1.使用这样线程循环的办法处理所有线程读取数据.

  • 详解ABP框架中的数据过滤器与数据传输对象的使用

    数据过滤器(Data filters) 在数据库开发中,我们一般会运用软删除(soft-delete)模式,即不直接从数据库删除数据,而是标记这笔数据为已删除.因此,如果实体被软删除了,那么它就应该不会在应用程序中被检索到.要达到这种效果,我们需要在每次检索实体的查询语句上添加SQL的Where条件IsDeleted = false.这是个乏味的工作,但它是个容易被忘掉的事情.因此,我们应该要有个自动的机制来处理这些问题. ABP提供数据过滤器(Data filters),它使用自动化的,基于规

随机推荐