Hi Daniel, Thanks a lot for the suggestion! I'll give it a try.
Now if with batch consumers everything is pretty clear, with event-driven consumers - it is not. For example, amqp and jms implementations can prefetch a number of messages, but cannot batch those ones out of the box. So to make camel-kafka producer send messages, obtained from event-driven consumers, in batches it's necessary to prepare batches manually (i.e. by means of aggregator eip), although downstream components (camel-kafka) support batching. On Mon, Aug 1, 2016 at 3:22 PM, Daniel Kulp <dk...@apache.org> wrote: > For one of my clients, I ended up not using the splitter in Camel and > instead us a custom processor that would create an Iterator<byte[]>. This > will work with updates to camel-kafka that are included in 2.17.3. For my > tests, using the camel splitter like you have would get about 5K-10K > msg/sec. With this, I get about 200K. However, within camel, it stays as > a single message so anything in the camel route that needs to look at each > line wouldn’t really work. > > > > from(“file://…….") > .process(new Processor() { > public void process(Exchange exchange) throws Exception { > InputStream ins = exchange.getIn().getBody(InputStream.class > ); > exchange.getIn().setBody(new SplitterIterator(ins)); > } > }) > .to("kafka:brokerAddr?topic=messages" > + "&serializerClass= > org.apache.kafka.common.serialization.ByteArraySerializer" > + "&keySerializerClass= > org.apache.kafka.common.serialization.ByteArraySerializer" > ); > > class SplitterIterator implements Iterator<byte[]> { > final InputStream stream; > byte[] next; > SplitterIterator(InputStream i) { > stream = i; > next = readNext(); > } > private byte[] readNext() { > ByteArrayOutputStream bout = new ByteArrayOutputStream(); > try { > int v = stream.read(); > while (v != -1 && v != '\n') { > bout.write(v); > v = stream.read(); > } > if (bout.size() == 0) { > return null; > } > return bout.toByteArray(); > } catch (IOException e) { > throw new RuntimeException(e); > } > } > > > public boolean hasNext() { > return next != null; > } > public byte[] next() { > byte[] tmp = next; > next = readNext(); > return tmp; > } > @Override > public void remove() { > } > }; > > > On Aug 1, 2016, at 4:38 AM, Sergey Zhemzhitsky <szh.s...@gmail.com> wrote: > > Hi Camel Gurus, > > I've faced with some performance issues of camel-kafka component during > migrating it from 2.17.0 then to 2.17.1 and then to 2.17.2. > > The camel route is pretty simple and looks like this > > from("file:/var/lib/app/input") > .split().simple("\n").streaming() > .to("direct:kafka"); > from("direct:kafka") > .to("kafka:brokerAddr?topic=messages"); > > The first issue with camel 2.17.0 was the possibility of losing messages > < > https://github.com/apache/camel/blob/camel-2.17.0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L101 > >. > Kafka's native producer is buffering the messages > < > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L468 > > > and if kafka broker is unavailable then the messages can be lost when the > route is restarted. Although the messages can be lost, the performance was > pretty good (~10K rps) due to kafka's producer buffering. > > The second issue with camel 2.17.1 was that the performance of kafka > producer degraded tremendously (up to 100 times) because of blocking on > every message > < > https://github.com/apache/camel/blob/camel-2.17.1/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L100 > > > (although in that case no message losing occurs). > > The third issue with camel 2.17.2 (although camel started using async > callbacks > < > https://github.com/apache/camel/blob/camel-2.17.2/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java#L180 > >) > was that the performance was still pretty poor because kafka's native > producer was not able to buffer more than a single message (because of > synchronous direct endpoint). > > The two solutions for the mentioned issues I was able to figure out: > > - using seda endpoint instead of direct one (then kafka's native producer > is able to buffer the messages, but there is still a possibility to lose > messages (because of nature of seda)) > > - using aggregator with direct endpoint (then the route becomes more > complicated than it is expected to be, aggregator adds additional not > necessary delays and why at all we need additional aggregator for batching > if the kafka's native producer already does buffering/batching?) > > So the question is - is there any possibility to allow kafka's native > producer buffer more than a single message not using aggregator eip and not > lose the messages as it can happen with intermediate seda endpoint? > > Kind Regards, > Sergey > > > -- > Daniel Kulp > dk...@apache.org - http://dankulp.com/blog > Talend Community Coder - http://coders.talend.com > >