For one of my clients, I ended up not using the splitter in Camel and instead 
us a custom processor that would create an Iterator<byte[]>.  This will work 
with updates to camel-kafka that are included in 2.17.3.   For my tests, using 
the camel splitter like you have would get about 5K-10K msg/sec. With this, I 
get about 200K.   However, within camel, it stays as a single message so 
anything in the camel route that needs to look at each line wouldn’t really 
work.   



from(“file://…….")
    .process(new Processor() {
        public void process(Exchange exchange) throws Exception {
             InputStream ins = exchange.getIn().getBody(InputStream.class);
                  exchange.getIn().setBody(new SplitterIterator(ins));
             }
        })
        .to("kafka:brokerAddr?topic=messages"
             + 
"&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer"
             + 
"&keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer"
     ); 

    class SplitterIterator implements Iterator<byte[]> {
        final InputStream stream;
        byte[] next;
        SplitterIterator(InputStream i) {
            stream = i;
            next = readNext();
        }
        private byte[] readNext() {
            ByteArrayOutputStream bout = new ByteArrayOutputStream();
            try {
                int v = stream.read();
                while (v != -1 && v != '\n') {
                    bout.write(v);
                    v = stream.read();
                }
                if (bout.size() == 0) {
                    return null;
                }
                return bout.toByteArray();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        
        public boolean hasNext() {
            return next != null;
        }
        public byte[] next() {
            byte[] tmp = next;
            next = readNext();
            return tmp;
        }
        @Override
        public void remove() {
        }        
    };


> On Aug 1, 2016, at 4:38 AM, Sergey Zhemzhitsky <szh.s...@gmail.com> wrote:
> 
> Hi Camel Gurus,
> 
> I've faced with some performance issues of camel-kafka component during
> migrating it from 2.17.0 then to 2.17.1 and then to 2.17.2.
> 
> The camel route is pretty simple and looks like this
> 
> from("file:/var/lib/app/input")
>    .split().simple("\n").streaming()
>        .to("direct:kafka");
> from("direct:kafka")
>    .to("kafka:brokerAddr?topic=messages");
> 
> The first issue with camel 2.17.0 was the possibility of losing messages
> <https://github.com/apache/camel/blob/camel-2.17.0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L101>.
> Kafka's native producer is buffering the messages
> <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L468>
> and if kafka broker is unavailable then the messages can be lost when the
> route is restarted. Although the messages can be lost, the performance was
> pretty good (~10K rps) due to kafka's producer buffering.
> 
> The second issue with camel 2.17.1 was that the performance of kafka
> producer degraded tremendously (up to 100 times) because of blocking on
> every message
> <https://github.com/apache/camel/blob/camel-2.17.1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L100>
> (although in that case no message losing occurs).
> 
> The third issue with camel 2.17.2 (although camel started using async
> callbacks
> <https://github.com/apache/camel/blob/camel-2.17.2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L180>)
> was that the performance was still pretty poor because kafka's native
> producer was not able to buffer more than a single message (because of
> synchronous direct endpoint).
> 
> The two solutions for the mentioned issues I was able to figure out:
> 
> - using seda endpoint instead of direct one (then kafka's native producer
> is able to buffer the messages, but there is still a possibility to lose
> messages (because of nature of seda))
> 
> - using aggregator with direct endpoint (then the route becomes more
> complicated than it is expected to be, aggregator adds additional not
> necessary delays and why at all we need additional aggregator for batching
> if the kafka's native producer already does buffering/batching?)
> 
> So the question is - is there any possibility to allow kafka's native
> producer buffer more than a single message not using aggregator eip and not
> lose the messages as it can happen with intermediate seda endpoint?
> 
> Kind Regards,
> Sergey

-- 
Daniel Kulp
dk...@apache.org <mailto:dk...@apache.org> - http://dankulp.com/blog 
<http://dankulp.com/blog>
Talend Community Coder - http://coders.talend.com <http://coders.talend.com/>

Reply via email to