网站首页 > java教程 正文
Flink CDC在配置mysql时,可以指定几种方式来选择位点: INITIAL、EARLIEST_OFFSET、LATEST_OFFSET、SPECIFIC_OFFSETS、TIMESTAMP、SNAPSHOT。
INITIAL: 全量与增量
EARLIEST_OFFSET:最早位点
LATEST_OFFSET:最近的位点
SPECIFIC_OFFSETS:指定位点
TIMESTAMP:指定时间点
SNAPSHOT:全量
源码分析
设置该类型的cdc同步任务,机制会检查当前存在的binlog文件列表,因为每个文件是按顺序排列,同时对应的时间也是有顺序的,最终是通过二分法进行查找。
public static void main(String[] args) {
MySqlSource.<Data>builder()
.startupOptions(StartupOptions.timestamp(System.currentTimeMillis()))
.build();
}
当设置了cdc任务的类型为TIMESTAMP时,会通过以下的方法来获取对应的binlogfile,具体查看类 BinlogOffsetUtils.java
public static BinlogOffset initializeEffectiveOffset(
BinlogOffset offset, MySqlConnection connection) {
BinlogOffsetKind offsetKind = offset.getOffsetKind();
switch (offsetKind) {
case EARLIEST:
return BinlogOffset.ofBinlogFilePosition("", 0);
case TIMESTAMP:
// 遍历当前所有存在的binlogfile文件,取每个文件的文件头来判断时间
// 所以一定是当前整个文件的数据,也是按binlogfile文件名来读取数据的
return DebeziumUtils.findBinlogOffset(offset.getTimestampSec() * 1000, connection);
case LATEST:
return DebeziumUtils.currentBinlogOffset(connection);
default:
return offset;
}
}
public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) {
MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
BinaryLogClient client =
new BinaryLogClient(
config.hostname(), config.port(), config.username(), config.password());
List<String> binlogFiles = new ArrayList<>();
JdbcConnection.ResultSetConsumer rsc =
rs -> {
while (rs.next()) {
String fileName = rs.getString(1);
long fileSize = rs.getLong(2);
if (fileSize > 0) {
binlogFiles.add(fileName);
}
}
};
try {
// 获取mysql系统内存在的binlog
connection.query("SHOW BINARY LOGS", rsc);
LOG.info("Total search binlog: {}", binlogFiles);
if (binlogFiles.isEmpty()) {
return BinlogOffset.ofBinlogFilePosition("", 0);
}
// 搜索最接近的binlog文件
String binlogName = searchBinlogName(client, targetMs, binlogFiles);
return BinlogOffset.ofBinlogFilePosition(binlogName, 0);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
}
private static String searchBinlogName(
BinaryLogClient client, long targetMs, List<String> binlogFiles)
throws IOException, InterruptedException {
int startIdx = 0;
int endIdx = binlogFiles.size() - 1;
// 因为binlog文件名是递增的,同时时间也是递增的
// 以二分法进行查找
while (startIdx <= endIdx) {
int mid = startIdx + (endIdx - startIdx) / 2;
long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
if (midTs < targetMs) {
startIdx = mid + 1;
} else if (targetMs < midTs) {
endIdx = mid - 1;
} else {
return binlogFiles.get(mid);
}
}
return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
}
从以上的逻辑可以看到,当指定了timestamp时,会从最接近的那个binlog文件开始从头开始读取数据,那会不会多读很多数据呢?答案是否定的,当从找到的binlog文件中读取数据后,真正在处理的时候,会再判断一次当前的事件是否在指定的时间范围内,代码在 MySqlBinlogSplitReadTask.java
protected void handleEvent(
MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {
// 当从binlog读取数据后,进行一次过滤
if (!eventFilter.test(event)) {
return;
}
super.handleEvent(partition, offsetContext, event);
// check do we need to stop for read binlog for snapshot split.
if (isBoundedRead()) {
final BinlogOffset currentBinlogOffset =
RecordUtils.getBinlogPosition(offsetContext.getOffset());
// reach the high watermark, the binlog reader should finished
if (currentBinlogOffset.isAtOrAfter(binlogSplit.getEndingOffset())) {
// send binlog end event
try {
signalEventDispatcher.dispatchWatermarkEvent(
binlogSplit,
currentBinlogOffset,
SignalEventDispatcher.WatermarkKind.BINLOG_END);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new DebeziumException("Error processing binlog signal event", e));
}
// tell reader the binlog task finished
((StoppableChangeEventSourceContext) context).stopChangeEventSource();
}
}
}
eventFilter由BinlogSplitReader在创建MySqlBinlogSplitReadTask时处理。
private Predicate<Event> createEventFilter(BinlogOffset startingOffset) {
// 当是TIMESTAMP类型时,需要将小于指定时间的事件进行移除
if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) {
long startTimestampSec = startingOffset.getTimestampSec();
return event ->
EventType.HEARTBEAT.equals(event.getHeader().getEventType())
|| event.getHeader().getTimestamp() >= startTimestampSec * 1000;
}
return event -> true;
}
猜你喜欢
- 2024-12-24 Java 8 新增6接口:Optional、Consumer等
- 2024-12-24 java小知识-纳秒 纳秒是什么
- 2024-12-24 弃用 Java 8,Apache Kafka 3.0 发布
- 2024-12-24 Sharding Sphere-JDBC从入门到实战,一顿饭的时间让你学懂
- 2024-12-24 Java虚拟机之 XX:+UseGCLogFileRotation 解析
- 2024-12-24 阿里Java三面:分布式延时任务方案解析,万字长文一篇点通你
- 2024-12-24 拯救Java应用:Arthas监控术,让你事半功倍
- 2024-12-24 java+uniapp实现微信JSSDK扫码功能
- 2024-12-24 filebeat收集K8S日志,写入自动创建的索引
- 2024-12-24 Java 动态调试技术原理及实践 java动态配置
你 发表评论:
欢迎- 04-27微服务部署架构设计详解(图文全面总结)
- 04-27Java微服务架构选型与对比:一场技术流派的巅峰对决
- 04-27微服务架构下Java的最佳实践
- 04-27Java微服务架构选型:优雅拆分与高效整合
- 04-27微服务架构下的Java代码拆分策略:像拼图一样构建系统
- 04-27微服务架构下的Java最佳实践
- 04-27微服务架构下Java的挑战与机遇
- 04-27微服务架构下Java事务管理的艺术
- 最近发表
- 标签列表
-
- java反编译工具 (77)
- java反射 (57)
- java接口 (61)
- java随机数 (63)
- java7下载 (59)
- java数据结构 (61)
- java 三目运算符 (65)
- java对象转map (63)
- Java继承 (69)
- java字符串替换 (60)
- 快速排序java (59)
- java并发编程 (58)
- java api文档 (60)
- centos安装java (57)
- java调用webservice接口 (61)
- java深拷贝 (61)
- 工厂模式java (59)
- java代理模式 (59)
- java.lang (57)
- java连接mysql数据库 (67)
- java重载 (68)
- java 循环语句 (66)
- java反序列化 (58)
- java时间函数 (60)
- java是值传递还是引用传递 (62)
本文暂时没有评论,来添加一个吧(●'◡'●)