专业的JAVA编程教程与资源

网站首页 > java教程 正文

快速处理Kafka反序列化错误(kafka自定义反序列化)

temp10 2024-11-04 14:06:40 java教程 18 ℃ 0 评论

事件驱动架构在各种业务场景中被许多组织成功使用了相当长一段时间。它们在性能、可伸缩性、可演进性和容错性方面表现出色,提供了良好的抽象和弹性。这些优势使它们成为应用需要实时或几乎实时响应时的良好选择。

在实现方面,对于标准消息传递,ActiveMQ 和 RabbitMQ 是不错的选择,而对于数据流处理,诸如 Apache Kafka 和 Redpanda 这样的平台更为合适。通常,当开发人员和架构师需要选择这两个方向中的一个时,他们会从多个角度进行分析和权衡——消息负载、数据流和使用、吞吐量以及解决方案拓扑结构。由于围绕这些方面的讨论可能会变得过于庞大和复杂,本文不会对其进行详细讨论。

快速处理Kafka反序列化错误(kafka自定义反序列化)

从概念上讲,事件驱动架构至少涉及三个主要角色:消息生产者、消息代理和消息消费者。简而言之,其目的是允许生产者和消费者以一种解耦和异步的方式进行通信,这是通过前面提到的消息代理来实现的。在乐观的情况下,生产者创建一条消息,将其发布到代理拥有的主题上,消费者从中读取消息,处理消息,并礼貌地提供回应。当消息被发送到主题时,生产者会对其进行序列化(编组),而消费者在从主题接收消息时会对其进行反序列化(解组)。

本文关注的是消费者在对接收到的消息进行反序列化时遇到问题,并提供进一步处理的方法。此类操作的一些示例可能包括构建默认消息或向消息代理发送反馈。开发人员有足够的创造力来决定这种行为,这取决于特定的实现用例。

设置

  • Java 21
  • Maven 3.9.2
  • Spring Boot – 版本 3.1.5
  • 运行在 Docker 中的 Redpanda 消息代理 – 镜像版本 23.2.15

Redpanda 是一种轻量级消息代理,被选择用于此概念验证,以便为读者提供一个与广泛使用的 Kafka 不同的选择。由于它与 Kafka 兼容,如果从一个服务提供商转移到另一个服务提供商,生产者和消费者的开发和配置都不需要做任何改变。

根据 Redpanda 文档,Docker 支持仅适用于开发和测试。对于这个项目的目的来说,这已经足够了;因此,设置了一个在 Docker 中运行的单个 Redpanda 消息代理。

有关如何完成最小设置的详细信息,请参见本文结尾的资源 1。

一旦运行起来,将使用以下命令创建一个名为 minifig 的主题:

>docker exec -it redpanda-0 rpk topic create minifig
TOPIC    STATUS
minifig  OK

如果检查集群,可以观察到创建了一个具有一个分区和一个副本的主题。

>docker exec -it redpanda-0 rpk cluster info
CLUSTER
=======
redpanda.581f9a24-3402-4a17-af28-63353a602421

BROKERS
=======
ID      HOST        PORT
0*      redpanda-0  9092

TOPICS
======
NAME                PARTITIONS  REPLICAS
__consumer_offsets  3           1
_schemas            1           1
minifig             1           1

实现

流程很简单:生产者向配置的主题发送请求,然后由消费者进一步读取。

一个请求代表着一个迷你人物,它由以下记录简单地建模:

public record Minifig(String id,
                      Size size,
                      String name) {

    public Minifig(Size size, String name) {
        this(UUID.randomUUID().toString(), size, name);
    }

    public enum Size {
        SMALL, MEDIUM, BIG;
    }
}

id 是迷你人物的唯一标识符,具有特定的 name,并且是特定的 size —— 小、中或大。

要配置生产者和消费者,至少需要以下属性(application.properties 文件):

# 消息代理的路径
broker.url=localhost:19092

# 代理主题的名称
topic.minifig=minifig

# 标识消费者组的唯一字符串
topic.minifig.group.id=group-0

为了发送消息,生产者需要一个 KafkaTemplate 实例。

@Configuration
public class KafkaConfig {

    @Value("${broker.url}")
    private String brokerUrl;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);

        return new KafkaTemplate<>(producerFactory);
    }
}

可以看到,在生产者配置中,选择了 StringSerializer 来对负载值进行编组。通常,JsonSerializer 对于生产者-消费者契约提供了更强大的支持。然而,这里的选择是有意为了增加消费者端的实验灵活性(后面会看到)。只是作为提醒,这个概念验证的兴趣在于对遇到的反序列化错误进行处理。

一旦消息到达 minifig 主题,就配置一个消费者来接收它们。

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${broker.url}")
    private String brokerUrl;

    @Value("${topic.minifig.group.id}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Minifig> kafkaListenerContainerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, Minifig.class.getPackageName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Minifig.class.getName());

        DefaultKafkaConsumerFactory<String, Minifig> defaultFactory = new DefaultKafkaConsumerFactory<>(props);

        ConcurrentKafkaListenerContainerFactory<String, Minifig> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(defaultFactory);
        factory.setCommonErrorHandler(new DefaultErrorHandler());
        return factory;
    }
}

KafkaListenerContainerFactory 接口负责为特定端点创建监听器容器。配置类上的 @EnableKafka 注解启用了容器中任何 Spring 管理的 bean 上的 @KafkaListener 注解的检测。因此,接下来开发实际的监听器(消息消费者)。

@Component
public class MinifigListener {

    private static final Logger LOG = LoggerFactory.getLogger(MinifigListener.class);

    @KafkaListener(topics = "${topic.minifig}", groupId = "${topic.minifig.group.id}")
    public void onReceive(@Payload Minifig minifig) {
        LOG.info("New minifig received - {}.", minifig);
    }
}

它的功能很简单。它只是记录从 minifig 主题读取的消息,这些消息是为配置的消费者组准备的。

如果应用程序已启动,并且消息代理已经运行,监听器就准备好接收消息了。

为了检查集成,使用以下简单的测试。由于监听器期望一个 Minifig,因此为方便起见创建了一个符合模板。

@SpringBootTest
class AppTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${topic.minifig}")
    private String topic;

    final String template = "{" +
            "\"id\":\"%s\"," +
            "\"size\":\"%s\"," +
            "\"name\":\"%s\"" +
            "}";

    @Test
    void send_compliant() {
        final String minifig = String.format(template,
                UUID.randomUUID(), Minifig.Size.SMALL, "Spider-Man");

        kafkaTemplate.send(topic, minifig);
    }
}

运行测试时,会向代理发送一个“符合”消息,预期地,它会被本地消费者成功接收。

Redpanda 控制台可以帮助观察在代理级别发生的情况,特别是通过 minifig 主题流动的内容。

在上述情况下,消息是从生产者通过消息代理发送到消费者的。

在反序列化失败时进行恢复

在这个概念验证的特定情况下,假设迷你人物的类型可以是 SMALL、MEDIUM 或 BIG,与定义的 Type 枚举一致。如果生产者发送了一个未知类型的迷你人物,即与约定的契约有所偏离,那么消息基本上会被监听器拒绝,因为无法对负载进行反序列化。

为了模拟这种情况,运行以下测试。

@SpringBootTest
class AppTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${topic.minifig}")
    private String topic;

    final String template = "{" +
            "\"id\":\"%s\"," +
            "\"size\":\"%s\"," +
            "\"name\":\"%s\"" +
            "}";

    @Test
    void send_non_compliant() {
        final String minifig = String.format(template,
                UUID.randomUUID(), "Unknown", "Spider-Man");

        kafkaTemplate.send(topic, minifig);
    }
}

消息到达主题,但没有到达 MinifigListener#onReceive() 方法。预期地,在对负载进行反序列化时出现了错误。可以通过深入查看堆栈跟踪来描述造成这种情况的原因。

Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data  from topic [minifig]
Caused by: com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize value of type `com.hcd.errhandlerdeserializer.domain.Minifig$Size` from String "Unknown": not one of the values accepted for Enum class: [BIG, MEDIUM, SMALL]
 at [Source: (byte[])"{"id":"fbc86874-55ac-4313-bbbb-0ed99341825a","size":"Unknown","name":"Spider-Man"}"; line: 1, column: 53] (through reference chain: com.hcd.errhandlerdeserializer.domain.Minifig["size"])

另一个方面是,消费者端不断尝试读取消息。至少从消费者的角度来看,这是不幸的,因为日志正在积累。

为了应对这种情况,用于解组载荷值的JsonDeserializer被装饰成ErrorHandlingDeserializer作为其实际委托。此外,ErrorHandlingDeserializer具有failedDeserializationFunction成员,根据其JavaDoc,在反序列化失败时提供替代机制。

新的消费者配置如下:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Minifig> kafkaListenerContainerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, Minifig.class.getPackageName());
    props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Minifig.class.getName());

    JsonDeserializer<Minifig> jsonDeserializer = new JsonDeserializer<>(Minifig.class);

    ErrorHandlingDeserializer<Minifig> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer);
    valueDeserializer.setFailedDeserializationFunction(new MinifigFailedDeserializationFunction());

    DefaultKafkaConsumerFactory<String, Minifig> defaultFactory = new DefaultKafkaConsumerFactory<>(props,
            new StringDeserializer(), valueDeserializer);

    ConcurrentKafkaListenerContainerFactory<String, Minifig> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(defaultFactory);
    factory.setCommonErrorHandler(new DefaultErrorHandler());
    return factory;
}

这里使用的failedDeserializationFunction是简单的,但目的是证明其实用性。

public class MinifigFailedDeserializationFunction implements Function<FailedDeserializationInfo, Minifig> {

    private static final Logger LOG = LoggerFactory.getLogger(MinifigFailedDeserializationFunction.class);

    @Override
    public Minifig apply(FailedDeserializationInfo failedDeserializationInfo) {
        final Exception exception = failedDeserializationInfo.getException();
        LOG.info("Error deserializing minifig - {}", exception.getCause().getMessage());
        return new Minifig("Default");
    }
}

FailedDeserializationInfo实体(Function#apply()的输入)是在从反序列化异常中恢复时构建的,它封装了各种信息(这里,异常是被利用的)。

由于apply()方法的输出是实际的反序列化结果,可以根据目标行为返回null或适当的内容。

如果再次运行send_non_compliant()测试,反序列化异常将被处理,并返回默认值。此外,将调用MinifigListener,并有机会处理它。

INFO 30160 --- [ntainer#0-0-C-1] e.l.MinifigFailedDeserializationFunction : Error deserializing minifig - Cannot deserialize value of type `com.hcd.errhandlerdeserializer.domain.Minifig$Size` from String "Unknown": not one of the values accepted for Enum class: [BIG, MEDIUM, SMALL]
 at [Source: (byte[])"{"id":"f35a77bf-29e5-4f5c-b5de-cc674f22029f","size":"Unknown","name":"Spider-Man"}"; line: 1, column: 53] (through reference chain: com.hcd.errhandlerdeserializer.domain.Minifig["size"])
INFO 30160 --- [ntainer#0-0-C-1] c.h.e.listener.MinifigListener           : New minifig received - Minifig[id=null, size=SMALL, name=Undefined].

结论

配置Kafka生产者和消费者,并对其进行微调,以实现与使用的消息代理相符的期望性能,这并不总是直截了当的。在各个通信步骤中进行控制无疑是令人期待的,而且在未知情况下迅速行动有助于提供稳健且易于维护的解决方案。本文重点讨论了可能出现在Kafka消费者级别的反序列化问题,并提供了在处理不符合规范的载荷时具备备用方案的方法。

示例代码

  • err-handler-deserializer

资源

  1. Redpanda快速入门
  2. Spring for Apache Kafka

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表