For one of my clients, I ended up not using the splitter in Camel and instead us a custom processor that would create an Iterator<byte[]>. This will work with updates to camel-kafka that are included in 2.17.3. For my tests, using the camel splitter like you have would get about 5K-10K msg/sec. With this, I get about 200K. However, within camel, it stays as a single message so anything in the camel route that needs to look at each line wouldn’t really work.
from(“file://…….") .process(new Processor() { public void process(Exchange exchange) throws Exception { InputStream ins = exchange.getIn().getBody(InputStream.class); exchange.getIn().setBody(new SplitterIterator(ins)); } }) .to("kafka:brokerAddr?topic=messages" + "&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer" + "&keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer" ); class SplitterIterator implements Iterator<byte[]> { final InputStream stream; byte[] next; SplitterIterator(InputStream i) { stream = i; next = readNext(); } private byte[] readNext() { ByteArrayOutputStream bout = new ByteArrayOutputStream(); try { int v = stream.read(); while (v != -1 && v != '\n') { bout.write(v); v = stream.read(); } if (bout.size() == 0) { return null; } return bout.toByteArray(); } catch (IOException e) { throw new RuntimeException(e); } } public boolean hasNext() { return next != null; } public byte[] next() { byte[] tmp = next; next = readNext(); return tmp; } @Override public void remove() { } }; > On Aug 1, 2016, at 4:38 AM, Sergey Zhemzhitsky <szh.s...@gmail.com> wrote: > > Hi Camel Gurus, > > I've faced with some performance issues of camel-kafka component during > migrating it from 2.17.0 then to 2.17.1 and then to 2.17.2. > > The camel route is pretty simple and looks like this > > from("file:/var/lib/app/input") > .split().simple("\n").streaming() > .to("direct:kafka"); > from("direct:kafka") > .to("kafka:brokerAddr?topic=messages"); > > The first issue with camel 2.17.0 was the possibility of losing messages > <https://github.com/apache/camel/blob/camel-2.17.0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L101>. > Kafka's native producer is buffering the messages > <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L468> > and if kafka broker is unavailable then the messages can be lost when the > route is restarted. Although the messages can be lost, the performance was > pretty good (~10K rps) due to kafka's producer buffering. > > The second issue with camel 2.17.1 was that the performance of kafka > producer degraded tremendously (up to 100 times) because of blocking on > every message > <https://github.com/apache/camel/blob/camel-2.17.1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L100> > (although in that case no message losing occurs). > > The third issue with camel 2.17.2 (although camel started using async > callbacks > <https://github.com/apache/camel/blob/camel-2.17.2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L180>) > was that the performance was still pretty poor because kafka's native > producer was not able to buffer more than a single message (because of > synchronous direct endpoint). > > The two solutions for the mentioned issues I was able to figure out: > > - using seda endpoint instead of direct one (then kafka's native producer > is able to buffer the messages, but there is still a possibility to lose > messages (because of nature of seda)) > > - using aggregator with direct endpoint (then the route becomes more > complicated than it is expected to be, aggregator adds additional not > necessary delays and why at all we need additional aggregator for batching > if the kafka's native producer already does buffering/batching?) > > So the question is - is there any possibility to allow kafka's native > producer buffer more than a single message not using aggregator eip and not > lose the messages as it can happen with intermediate seda endpoint? > > Kind Regards, > Sergey -- Daniel Kulp dk...@apache.org <mailto:dk...@apache.org> - http://dankulp.com/blog <http://dankulp.com/blog> Talend Community Coder - http://coders.talend.com <http://coders.talend.com/>