专业的JAVA编程教程与资源

网站首页 > java教程 正文

Flink CDC | Mysql指定时间戳读取

temp10 2024-12-24 15:47:13 java教程 10 ℃ 0 评论

Flink CDC在配置mysql时,可以指定几种方式来选择位点: INITIAL、EARLIEST_OFFSET、LATEST_OFFSET、SPECIFIC_OFFSETS、TIMESTAMP、SNAPSHOT。

INITIAL: 全量与增量

Flink CDC | Mysql指定时间戳读取

EARLIEST_OFFSET:最早位点

LATEST_OFFSET:最近的位点

SPECIFIC_OFFSETS:指定位点

TIMESTAMP:指定时间点

SNAPSHOT:全量

源码分析

设置该类型的cdc同步任务,机制会检查当前存在的binlog文件列表,因为每个文件是按顺序排列,同时对应的时间也是有顺序的,最终是通过二分法进行查找。

public static void main(String[] args) {
    MySqlSource.<Data>builder()
    .startupOptions(StartupOptions.timestamp(System.currentTimeMillis()))
    .build();
}

当设置了cdc任务的类型为TIMESTAMP时,会通过以下的方法来获取对应的binlogfile,具体查看类 BinlogOffsetUtils.java

public static BinlogOffset initializeEffectiveOffset(
    BinlogOffset offset, MySqlConnection connection) {
    BinlogOffsetKind offsetKind = offset.getOffsetKind();
    switch (offsetKind) {
        case EARLIEST:
            return BinlogOffset.ofBinlogFilePosition("", 0);
        case TIMESTAMP:
            // 遍历当前所有存在的binlogfile文件,取每个文件的文件头来判断时间
            // 所以一定是当前整个文件的数据,也是按binlogfile文件名来读取数据的
            return DebeziumUtils.findBinlogOffset(offset.getTimestampSec() * 1000, connection);
        case LATEST:
            return DebeziumUtils.currentBinlogOffset(connection);
        default:
            return offset;
    }
}
public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) {
        MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
        BinaryLogClient client =
                new BinaryLogClient(
                        config.hostname(), config.port(), config.username(), config.password());

        List<String> binlogFiles = new ArrayList<>();
        JdbcConnection.ResultSetConsumer rsc =
                rs -> {
                    while (rs.next()) {
                        String fileName = rs.getString(1);
                        long fileSize = rs.getLong(2);
                        if (fileSize > 0) {
                            binlogFiles.add(fileName);
                        }
                    }
                };

        try {
            // 获取mysql系统内存在的binlog
            connection.query("SHOW BINARY LOGS", rsc);
            LOG.info("Total search binlog: {}", binlogFiles);

            if (binlogFiles.isEmpty()) {
                return BinlogOffset.ofBinlogFilePosition("", 0);
            }
            // 搜索最接近的binlog文件
            String binlogName = searchBinlogName(client, targetMs, binlogFiles);
            return BinlogOffset.ofBinlogFilePosition(binlogName, 0);
        } catch (Exception e) {
            throw new FlinkRuntimeException(e);
        }
    }
    private static String searchBinlogName(
            BinaryLogClient client, long targetMs, List<String> binlogFiles)
            throws IOException, InterruptedException {
        int startIdx = 0;
        int endIdx = binlogFiles.size() - 1;
        // 因为binlog文件名是递增的,同时时间也是递增的
        // 以二分法进行查找
        while (startIdx <= endIdx) {
            int mid = startIdx + (endIdx - startIdx) / 2;
            long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
            if (midTs < targetMs) {
                startIdx = mid + 1;
            } else if (targetMs < midTs) {
                endIdx = mid - 1;
            } else {
                return binlogFiles.get(mid);
            }
        }

        return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
    }

从以上的逻辑可以看到,当指定了timestamp时,会从最接近的那个binlog文件开始从头开始读取数据,那会不会多读很多数据呢?答案是否定的,当从找到的binlog文件中读取数据后,真正在处理的时候,会再判断一次当前的事件是否在指定的时间范围内,代码在 MySqlBinlogSplitReadTask.java

protected void handleEvent(
            MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {
    // 当从binlog读取数据后,进行一次过滤  
    if (!eventFilter.test(event)) {
            return;
        }
        super.handleEvent(partition, offsetContext, event);
        // check do we need to stop for read binlog for snapshot split.
        if (isBoundedRead()) {
            final BinlogOffset currentBinlogOffset =
                    RecordUtils.getBinlogPosition(offsetContext.getOffset());
            // reach the high watermark, the binlog reader should finished
            if (currentBinlogOffset.isAtOrAfter(binlogSplit.getEndingOffset())) {
                // send binlog end event
                try {
                    signalEventDispatcher.dispatchWatermarkEvent(
                            binlogSplit,
                            currentBinlogOffset,
                            SignalEventDispatcher.WatermarkKind.BINLOG_END);
                } catch (InterruptedException e) {
                    LOG.error("Send signal event error.", e);
                    errorHandler.setProducerThrowable(
                            new DebeziumException("Error processing binlog signal event", e));
                }
                // tell reader the binlog task finished
                ((StoppableChangeEventSourceContext) context).stopChangeEventSource();
            }
        }
    }

eventFilter由BinlogSplitReader在创建MySqlBinlogSplitReadTask时处理。

    private Predicate<Event> createEventFilter(BinlogOffset startingOffset) {
        // 当是TIMESTAMP类型时,需要将小于指定时间的事件进行移除
        if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) {
            long startTimestampSec = startingOffset.getTimestampSec();
            return event ->
                    EventType.HEARTBEAT.equals(event.getHeader().getEventType())
                            || event.getHeader().getTimestamp() >= startTimestampSec * 1000;
        }
        return event -> true;
    }

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表