Design aspects of resilient event-driven applications using Apache Kafka.

Kalpa Senanayake
15 min readDec 22, 2021

Introduction

This article is the first of a series of articles that originated from lessons learned trying to design resilient, fault-tolerance, event-driven applications which involve Kafka as the eventing platform. It describes a set of design and implementation considerations that I had to go through. This article intends to share these aspects with the rest of the developers, architects, and system designers.

High-level topics

  1. Designing topics and partitions
  • The Number of partitions in the topics.
  • Replication factor
  • Data retention policy

2. Designing producer

  • Producer ack mode
  • Producer retry, message duplication
  • Order Guarantees and Number of requests in flight

3. Designing consumer

  • Consumers and Consumer Groups
  • Consumers rebalancing
  • Consumer offset and commit strategy
  • Consumer Concurrency
  • Consumer Resiliency and error handling

Designing topics and partition

In Kafka, topics are the core pice when it comes to communication.
Producers and consumers connect via topics. Topics are a logical representation of an event group.

Partitions allow a topic to have parallel work orders. Each topic can have multiple partitions which are spread across the brokers.

Topics and partitions

One of the critical designing decisions you will have to take early in the solution is the number of partitions for the given topic.

Usually, the business teams or business analysts bring the requirement, not in computer science terms. A non-functional requirement like this comes in the form of “The system needs to process 100,000 payments per second.”

Let’s do a back-of-the-envelope calculation to develop the number of partitions we need for such a system.

Size of an average message: ~1 KBMax throughput from consumers for the topic: 1 KB * 100,000 = 100 MB

Now you know the maximum throughput expected from the system.

The next question is to find any limitations in the consumer side of the system.

For example, you may have a consumer who invokes downstream HTTP endpoint synchronously for additional data. It can only process 10,000 messages per second due to the limitation of the downstream.

1 KB * 10,000 = 10MBMax throughput / per consumer throughput = 100MB / 10MB = 10 partitions for the topic.

There will be ten consumers per partition moving the data of the topic and process it.

Partitions within topic assign to consumers in a consumer group

Not too many partitions

Another aspect of selecting the number of partitions is that, when the number of partitions is high for a given topic, the Zookeeper ensemble has to do the leg work to deliver the metadata. Too many metadata messages mean the load on the zookeeper instance will increase.

Replication factor

Kafka comes with inbuilt fault tolerance mechanisms. One of them is the ability to replicate the partitions to brokers in the cluster.

Kafka assigns a leader broker for the new topic when we create a topic.
After this initial assignment, Kafka creates replicas of the new topics in the available non-leader (follower) brokers based on the replication factor.

4 partitions with replication factor 3, across 4 brokers

The replication factor should be based on the degree of fault tolerance you may want to build into the solution.

A modern cloud-deployed Kafka cluster can have brokers across multiple availability zones. If you have three availability zones having replication factors as three, it will give you three replicas (including the leader’s copy) of the topic.

In the event of a lost broker

If we lose a broker, Kafka will assign another broker as the leader for the partition handled by the lost broker.

However, if the number of replicas is lower than the requested in-sync replicas, the producers will not be able to send events to this topic.

Data retention policy

Kafka is different from other popular message queueing systems in many ways. Kafka can keep the messages for an extended period for its consumers even after consumers read the message.
Thus, we can consider Kafka as a database in some aspects.

The data retention policy is another critical decision for you as a designer to decide the period the business needs to have these events in Kafka.

The default retention period is seven days, this can be configured either as a global setting or per topic based on the business use of the data in the topic.

Designing producer

Producer acknowledgment mode

When sending a message to a Kafka topic, we need to confirm the delivery of the message to the broker.
What is the factor we consider to mark a send operation as successful in the given business context?

For example, A payment intent message to create a payment record from a customer account must deliver to all partition replicas in the payment event topic due to the high business criticality of the message content.

However, the business may satisfy with a fire-and-forget approach for noncritical announcement messages which appear in the notification centre in the customer dashboard.

There are three producer ack modes.

acks = 0 = NONE: The producer does not wait for acknowledgment from any Kafka broker.

Acks 0 (NONE)

Implications of this, if the leader broker for the selected partition is down, the producer will lose the message.

acks =1 = LEADER: The producer waits until an acknowledgment from the leader of the selected partition. The followers may not have the message in their replicas.

Acks 1 (LEADER)

acks = ALL: The leader collects write acknowledgments from all in-sync replicas before sending the ack back to the producer.

Acks -1 (ALL)

This “acks = all “ is the most potent form of availability guarantee from Kafka.

Producer retry and message duplication

Another design aspect of the producers is the retry strategy for the producer. You may have requirements to which are bound to strong delivery guarantees.

Producers may fail to send messages to brokers due to various reasons, such as network failures.

In such cases, Kafka producers may retry to send the messages.

We can configure that using the following producer configurations.

retries: Set the number of retries.
default value = 0
retry.backoff.ms: Number of milliseconds to wait between the retry attempts.
default value = 100

The default values for these settings may not be suitable for your solution.

You may need your producer to retry. Hence you may need to add value for the retry.
If you lose a broker, it may take some time to come up and join the rest of the nodes. Having too low a retry backoff value may give up the retry attempt too-soon before the broker comes up. It is good to adjust these values based on your test experience and craft them to fit your solution.

There are other implications for setting the retires value to > 0.

Suppose the producer sent a message to the broker, and the broker committed these messages. However, an error occurs during the acknowledgment from the broker to the producer.

The producer may retry and send the message again and managed to deliver the message on the second attempt. Now you will have a duplicate message.

Producer retry mechanism may create duplicate messages

To avoid message duplication, we can configure the producer to act as an idempotence producer. That guarantees that a message is delivered exactly once to the brokers.

enable.idempotent = true

Order Guarantees and number of requests in flight

If message ordering is essential for your business case, but if you have configured the retry settings, you may end up having a delivery guarantee of at least once. (From all records written to Kafka, none is ever lost. If errors occurred during the send, there could be a duplicate in the broker.)

max.in.flight.requests.per.connection: Controls the number of unacknowledged requests the client will send on a single connection before blocking.default value = 5

Setting this to 1 means messages will be delivered to the broker in the same order they arrived at the producer.

If we have retries > 0, we should set this value to 1. Otherwise, there is a chance that retried messages may deliver messages out of order.

Partitioning Strategy

A well-designed Kafka producer should have a well-defined partition strategy according to the business context of the messages it produces.

Our partitioning strategy has implications for the messages processed by the system. For example, messages related to customer orders on an eCommerce application should have a strong order guarantee.

We can not afford to send a push notification to the consumer with the status “DELIVERED” before “DISPATCHED.”

However, one key difference between message-oriented middleware like MQ and Kafka is that Kafka has no global order guarantee.

Order is only guaranteed within a partition.

Hence adding the messages to a partition with the correct order is essential in the above business context.

To understand partition strategy, let’s look at the journey of a producer record until it selects a destination partition.

Anatomy of a producer record

Kafka producers use a few partition strategies to determine the partitioning for a given record.

The most straightforward strategy is providing a partition number in the producer record, then the producer record will be directly going to the designated partition.

Suppose we provide a key that can be a uniquely identified set of messages. In that case, the Kafka producer guarantees that those messages will be orderly delivered to the same partition according to the following formula.

partition = hash (unique_key) % number_of_partitions.

And one can implement custom partitioning logic. In that case, the producer will look up the custom strategy and use it to determine the destination partition.

Selecting partitioning strategy for a producer record

It is important to note the implications of this key-based partition selection. This stratagem works until you do not need more partitions for your topic. If you have decided to add more partitions to the topic later, there is no guarantee that records with the same key should append to the same partitions. Since the `number_of_partitions` changed.

Suppose your consumer has a sensitivity for such behavior from the system. In that case, you may need to consider selecting a bit advanced approach and taking the partitioning strategy into your control from the default partitioner.

In Java producer client implementing `org.apache.kafka.clients.producer.Partitioner` interface allows you to have a custom partitioning strategy.

Designing consumer

Consumers and Consumer Groups

One of the key advantages of using Kafka as a messaging platform is that its native ability to enable parallel processing of the messages.

Consumers and consumer groups are at the heart of this ability.

As we discussed in the `designing topic and partitions` section, now you know the expected throughput of the system and the number of partitions for the given topic.

Hence, it is easy to derive the number of consumers you need for your consumer group.

If you have ten partitions, you need ten consumers assigned to corresponding partitions to achieve the maximum throughput.

Of course, you may have fewer consumers, but that will not get you the throughput you aimed to achieve.

Distributed and parallel nature of consumption of events via consumer groups

In a cloud deployment environment this there can be 3–4 consumers in each availability zone.

Events in a Kafka topic are usually not consumed by only one application. There can be other applications/consumers interested in the same events, and it is recommended to have at least one consumer group per application.

Consumers rebalancing

When a new consumer gets added to a consumer group or consumer remove event occurs, the ownership of the partition moved from one consumer to another.

Moving partitions ownership is called consumer rebalance.

Since the consumption stopped during the rebalance, there is no message loss.

After the rebalance, a consumer may or may not get assigned to the same partition as previously.

Consumer removal

A consumer group can remove a consumer from the group due to its unhealthiness.

The broker determines a consumer’s health by sending heartbeats in periodic intervals.

Suppose a consumer fails to respond to heartbeats for the time defined in the `session.timeout.ms`, then the broker will kick out that consumer from the consumer group and reassign the partition handled by that consumer to a healthy consumer, which initiates a consumer rebalance.

heartbeat.interval.ms: The amount of time used to ensure that the consumer’s session stays active and facilitates rebalancing when new consumers join or leave the group.default value = 3000session.timeout.ms: The amount of time a consumer can be out of contact while still considered alive.default value = 45000

Setting the session.timoeut.ms higher than the heartbeat.interval.ms is vital as otherwise, it may cause unnecessary rebalances if the heartbeats are slower than usual due to the consumer’s intermittent high load.

Consumer addition

A consumer group can add a new consumer to the group to handle the increased load or an existing consumer removed due to unhealthiness. But now it is healthy again

Consumer offset and commit strategy.

Offset is the value that enabled the consumer to operate independently by representing the last read position the consumer had read from partition within a topic.

There are a few simple concepts that you need to understand regarding the offset.

Last committed offset: The last record the consumer has processed.

Current offset: Consumer is reading this record at the moment.

Log-end offset: The last record of the given partition

Concepts of offset

As you see in the diagram, there is a gap between the last committed offset and the current offset.

If there was a failure in the consumer at the current offset after rebalance, the new consumer gets the metadata about the last committed offset from the lead broker. It will start processing messages from the last committed offset. As a result, there will be duplicate message processing in your system.

There are a few strategies to handle this situation.

  1. Enabling the auto offset commit
enable.auto.commit = true
If true the consumer's offset will be periodically committed in the background.

This configuration delegates the responsibility of upgrading the current position to the last committed offset to Kafka.

Enabling auto-commit is a bit blind setup since Kafka has no idea about the conditions that need to be satisfied before a message is considered ready to mark as committed.

But if you are ok with having duplicate message processing, it may work for you.

2. Control how the offset committed

If you need to control the commit offset operation, you can set that ```

enable.auto.commit = false

And use synchronous or asynchronous commit methods.

Using the `commitSync()` method will block the consumer thread until it receives a response from the cluster. Hence your throughput may be slow, but the deterministic outcome is guaranteed.

If you cannot afford reduced throughput, you can use the other alternative, commitAsync().

The asynchronous commit strategy will not block the consumer thread, but the results are non-deterministic.

3. High level client libraries to save

Usually, the client library wrappers provide nice and easy APIs to deal with this while having the best of both worlds.

For example, Spring-Kafka provides an Ack mode called RECORD

`https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.AckMode.html#RECORD`

RECORD mode commits the offset after the listener processes each record. It allows the developer to control how the processing should happen. If no errors occur during the processing, the consumer container will commit the offset.

The consumer will not commit the offset if there are errors. And the event will be re-delivered to the consumer on the next poll call.

Consumer Concurrency

Kafka consumers operate in single-threaded mode. You cannot have multiple threads in the same consumer. Frameworks have implementations that hide this complexity from you.

Suppose you are deploying into a modern cloud base deployment environment with computation workloads with auto-scaling capability. Maybe deploying to Kubernetes with horizontal pod autoscaling choosing process-based concurrency over thread-based concurrency will make your life easier in the production environment.

Process based concurrency in k8 environment

It will be easy to have process-based concurrency when you run a dev-ops team that needs to support these event-driven applications in high throughput messaging environments.

Consumer Deserialisation

These event-driven applications can act as consumers of various topics and message types. Even though Kafka is loosely typed about the value of the message, it is good to have typed consumers per topic, which takes the separation of concerns in your design.

When it comes to deserializations, Kafka consumers offer many options.

String, JSON, Avro, to name a few, you can choose which one to use based on the technical stack and the optimisation factors interested in your solution.

Consumer Resiliency and error handling

Resiliency and error handling are the final aspects that we will discuss in this article. We have addressed the error handling of the producer in the producer design section.

This section will discuss a few critical aspects of consumer error handling.

Deserialization errors

In Kafka’s world, these are called poison pills. Once you have an event/message which is not possible to deserialized in the property

`value.deserializer`, until it goes off from the message flow, it will continue to cause errors.

There can be multiple reasons for this types of messages,

a malformed message sent by the producer or an update to the consumer application, which cannot deserialize the older version of the message.

Either way, you will have to send this to the dead letter topic and commit the offset to the broker. And your team has to look for the root cause.

Errors from dependency systems

Even driven applications may not be purely event-driven. You might end up calling the systems that only expose HTTP request/response style integration as data providers.

For example, you met to get a payment intent with PayId, but to resolve the PayId to account Number before it sends to core banking service, you may have to call HTTP endpoint.

And as usual, the network is unreliable, and the endpoint fails with read-timeout.

Strategy 1: Send to Dead Letter Topic

You can use the dead latter topic straight away, but if the message processing is time-critical for the business, you may not be able to forward it to DLT and be done with it.

Forward the event to DLT when error occurred

Strategy 2: Send to Retry Topic

Send the message to the retry topic, and the main application can move on with other messages.

Retry application (it could be a replica of the main application but subscribe to retry topic instead of the main topic)

Forward the event to retry topic and retry app picks it up

There could be a lag in message processing from the retry application. Hence you could end up with a message ordering problem if ordering is critical for your data sink.

If ordering is critical, you may have to improve this strategy to track sent events to retry the topic.

Strategy 3: Retry the whole transaction with exponential backoff.

Some business applications prioritize the ordering and process guarantee before the throughput.

In that case, you can use this strategy to retry the transaction with exponential backoff, which increases the time between retry, with each retry.

This must have an upper limit which is less than the session.timeout.ms. Otherwise the broker will kick-out the consumer from the consumer group

Retry with exponential backoff for the time interval between retries

An exponential backoff strategy reduces your throughput and increases the backlog of messages in the source topic.

And you may have to deal with a heavy load on your consumers once the problem is solved. Hence you may have to consider they build auto-scaling capabilities to the consumer army to handle that.

Strategy 4: Retry the whole transaction infinitely

Some business cases are ok with the low level of throughput, but handling the DLT event may cost more.

As a result, the business may trade the throughput for cost. This strategy comes in handy in such scenarios.

This is a special application of strategy 3.

Few things worth highlighting about this strategy.

  1. Infinite retry acts as natural backpressure for a slow consumer (blocking data stream is the extreme form of backpressure.)
  2. The application’s error handling requires careful consideration of downstream throughput, latency, and error rates to stop accidental infinite retry loops.
  3. The fixed interval should be able to accommodate any transactional aspects of the data processing.

Closing notes

The article went through some of the key design aspects and implementation details of Kafka topics, producer, and consumer.

Kafka is a versatile tool to build event-driven, data-streaming applications, and it contains lots of other approaches and configurations to support various solutions.

This article merely touches on the vast aspects of the above topics. However, I believe this is a solid starting point for someone new to the Kafka ecosystem and can help put the initial design on the table and iterate it as per their business use case.

References

  1. https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
  2. https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
  3. https://www.confluent.io/blog/error-handling-patterns-in-kafka/
  4. https://www.pluralsight.com/courses/apache-kafka-getting-started
  5. https://kafka.apache.org/documentation/
  6. https://www.confluent.io/blog/kafka-rebalance-protocol-static-membership/
  7. https://www.confluent.io/resources/kafka-summit-2020/spring-kafka-beyond-the-basics-lessons-learned-on-our-kafka-journey/

--

--

Kalpa Senanayake

Senior Software Engineer | Cloud | API | System Design