Using the new (0.8.2) KafkaProducer interface, if the Kafka server
goes down, how can one stop future attempts to write messages that
have been "sent" to the producer?

This is the TLDR of the post. The rest is detail...


Background
==========
We have a process in a large application that writes a lot of data to
Kafka. However, this data is not mission critical. When a problem
arises writing to Kafka, we want the application to continue working,
but to stop sending data to Kafka, allowing other tasks to
continue. The issue I have is handling messages that have been "sent"
to the producer but are waiting to go to Kafka. These messages remain
long after my processing is over, timing out, writing to the logs, and
preventing me from moving forward. I am looking for some way to tell
the client to stop forwarding messages to Kafka.


The following code is the only thing I have been able to come up
with (based on the c892c08 git SHA). I have several issues with this
code:
(1) Too much knowledge of the KafkaProducer internals:
(1a) Getting and using the private ioThread member.
(1b) Know the "kafka-producer-network-thread" name.
(2) Using the deprecated Thread.stop(..) method.
(3) A general feeling of unease that this won't work when I most want
it to.

What I have so far
==================

       //
       // Closing the producer
       //
        try {
            lastFuture.get(60, TimeUnit.SECONDS); // Wait a minute
        } catch (TimeoutException e) {
            // Do not close() a producer that cannot be "flushed" in time
(by get(timeout) on the last future.
            // We are afraid that close() will block for a long time if
kafka-producer-network-thread is alive but cannot write.
            // Instead try to kill the thread.
            killProducerIOThread();
            return;
        } catch (InterruptedException | ExecutionException e) {
            logger.error("Kafka exception thrown in future.get(timeout)
(continuing)", e);
        }
        try {
            producer.close();
        } catch (KafkaException e) {
            logger.error("Kafka exception thrown in producer.close()
(continuing)", e);
        }


        //
        // Callback
        //
    private class ErrorCallback implements Callback {
        public void onCompletion(RecordMetadata metadata, Exception
exception) {
            if (exception == null) { // Not an error.
                return;
            }

            // Prevent producer.send() and producer.close().
            // Note: We may already be in producer.close().
            disableEvenMoreMessagesBeingSent()

            // Here it is useful to kill the
"kafka-producer-network-thread"s.
            // This prevent the treads continuing in the background and
spouting log messages for a long-long time.
            // The difficulty is that some of these callbacks are on our
threads and some on kafka-producer-network-thread threads.
            // If this is a kafka-producer-network-thread, it will just
commit thread suicide.
            // If we are in our thread we get hold of the
private"((KafkaProducer) producer).ioThread" and kill that. Ugly!!

            String threadName = Thread.currentThread().getName();
            if (threadName.equals("kafka-producer-network-thread")) {
                Thread.currentThread().interrupt();
                throw new ThreadDeath(); // Commit thread suicide

            } else { // Presumably a qtask-worker-nn thread
                killProducerIOThread();
            }
        }
    }

    private void killProducerIOThread() {

        try {
            Thread ioThread = (Thread) FieldUtils.readField(producer,
"ioThread", true);
            ioThread.interrupt();
            ioThread.stop(new ThreadDeath());
        } catch (IllegalAccessException e) {
        }
    }



I posted a similar question to us...@kafka.apache.org. Only later have
I realized that this is more suitable to dev@kafka.apache.org. I
apologize for the double posting.

Andrew Stein

Reply via email to