Apache Kafka Producers are going to write data to topics and topics are made of partitions. Now the producers in Kafka will automatically know to which broker and partition to write based on your message and in case there is a Kafka broker failure in your cluster the producers will automatically recover from it which makes Kafka resilient and which makes Kafka so good and used today. So if we look at a diagram to have the data in our topic partitions we’re going to have a producer on the left-hand side sending data into each of the partitions of our topics. Read more on Apache Kafka Producer here: Apache Kafka Producer
So in this article, we are going to discuss another very important setting that is called Producer Retries and max.in.flight.requests.per.connection.
Producer Retries
So, basically, when you have transient failures, the developers are expected to handle these exceptions in the callback. Otherwise, the data will be lost, and so, NotEnoughReplicas Exception is an example of a transient failure. So what is this exception?
What is NotEnoughReplicas Exception in Kafka?
So as you know, there are three confirmation or acknowledgment modes available in Apache Kafka Producer.
- acks=0 (possible data loss)
- acks=1 (limited data loss)
- acks=all (no data loss)
Now, basically, when you do acks equals all, you have to use another setting called min.insync.replicas. So, basically, acks=all must be used in conjunction with min.insync.replicas. It is a broker and topic setting. If you set up the broker level, you can override it at the topic level. So, the most common setting for min.insync.replicas is
min.insync.replicas=2
That means that at least two brokers that are insync replicas, including the leader, must respond that they have the data. Otherwise, you’ll get an error message. So, that means that if you use replication.factor=3, min.insync.replica=2, and acks=all, you can only allow one broker going down. Or, the producer will get an exception on Send. Here is a diagram to understand this concept.
So, you have min.insync.replicas=two, and we have our sync set up. Now, let’s assume that Broker 102 and 103 are both down, so we only have one broker out of this. We have a replication of three, but only one broker is still online. Now, the producer sends the data to the leader, and the leader is going to try to replicate it, but this replication cannot happen, because the other brokers that were replicas are down. And so, basically, the number of insync replicas right now is one. And so, what Broker 101 will say is, it’ll reply to the producer and says, “Exception:NotEnoughReplicas“, that the data will be returned safely to the cluster. And it is the role of the producer to perform retries up until this write succeeds. So, if you want maximum safety, and maximum availability, then min.insync.replicas=two, acks=all, and a replication.factor of at least 3. So this is NotEnoughReplicas Exception in Kafka.
So in the case of transient failure, there is a “retries” setting that allows you to deal with handling exceptions on your own. The retries setting by default is zero, and this is why, in the callback, we look at whether or not we receive an exception from the broker, but you can increase these retries to a really high number, for example, Integer.MAX_VALUE, and this will allow you to basically indefinitely retry on Kafka your send up until it succeeds. Now, in case of retries, there is a chance that the messages will be sent out of order because messages get re-queued for send, and so, you can have a small inversion of messages, and so, the reason is, if you rely on key-based ordering, that could be a huge issue. Because, now, we don’t have the same guarantee that all the same keys go in order to the same partition. They will still go to the same partition, but they may not be in order anymore, so, to control this behavior, there is a setting called max.in.flight.requests.per.connection.
What is max.in.flight.requests.per.connection in Kafka?
This setting basically controls how many requests can be made in parallel to any partition, and so, by default that setting is 5, which explains why you can have some reordering, but you have to set it to 1 if you want to strictly ensure ordering. Obviously, by setting it to 1, you may get an impact on the throughput. But if you use the Kafka Version >= 0.11 for brokers and clients, then there’s a better solution which is Idempotent Producer. If not, then be careful. If you set retries to a high number and you want to keep ordering, then you absolutely need to set max.inflight.requests.per.connection=1.