Consumer pause/resume & partition assignment race condition

2016-06-24 Thread Elias Levy
While performing some prototyping on 0.10.0.0 using the new client API I
noticed that some some clients fail to drain their topic partitions.

The Kafka cluster is comprised of 3 nodes.  The topic in question has been
preloaded with messages.  The topic has 50 partitions.  The messages were
loaded without a key, so they should be spread in a round robin fashion.
The kafka-consumer-groups command shows that each partition has a
log-end-offset of 137, except for one partition at 136.

The worker is a simple single threaded client.  As mentioned, it uses the
new consumer API.  The consumer is configured to fetch a single record at a
time by setting the max.poll.record config property to 1.  The worker
handles commits and sets enable.auto.commit to false.

The worker can take substantial time processing the messages.  To avoid
timing out the Kafka connection, the worker calls consumer.pause() with the
results of consumer.assignment() when it starts processing the message,
calls consumer.poll(0) at regular intervals while processing the message to
trigger heartbeats to Kafka, and calls consumer.resume() with the result of
a call to consumer.assignment() when it is done processing the message and
it has committed the offset for the message using consumer.commitSync().

Note that when calling consumer.resume() I pass in the results of a fresh
call to consumer.assignment().  Passing in the results of the results to
the previous call to consumer.assignment(), the ones used when calling
consumer.pause(), would result in an exception if partitions were
reassigned while the worker was processing the message, as it may happen
when workers join the consumer group.  I presume this mean it call to
assignment() generates a call to the consumer coordinator in the cluster to
obtain the latest assignments rather than returning a locally cached copy
of assignments.

The test used four worker nodes running four workers each, for sixteen
total workers.

kafka-consumer-groups.sh shows that all partitions have been assigned to a
worker, and that the workers successfully processed most partitions, 29 out
of 50, to completion (lag is 0).

5 partition appear to not have been processed at all, with unknown shown
for current-offset and lag, and 16 partitions have processed some messages
but not all.  In either case, the workers believe there are no more
messages to fetch.  When they call poll with a timeout, it eventually
returns with no messages.  The workers show no errors and continue to run.

That indicates to me that the workers and cluster disagree on partition
assignment.  Thus, the consumer is not asking for messages on partitions
the broker has assigned to it, and messages on those partitions are not
processed.

My guess is that partition assignments are being changed after by call to
consumer.assignment() and consumer.resume().

Presumably I can solve this issue by implementing a
ConsumerRebalanceListener and updating the assigning I call resume() with
whenever onPartitionsRevoked and onPartitionsAssigned are called.

Ideally, the Consumer interface would allow you to call pause() and
resume() without a list of topic partitions, which would pause and resume
fetching from all assigned partitions, which the client already is keeping
track off.

Thoughts?  Suggestions?


Re: Kafka streams for out of order density aggregation

2016-06-24 Thread Matthias J. Sax
I just want to add something:

If I understand the question correctly, you are asking for a strong
ordering guarantee. I personally doubt that out-of-order on count-based
windows can be supported with strong consistency guarantee in an
efficient manner.

If a late record arrives for window X, the newest record from window X
must be removed and also added to the next window (which starts a
cascaded update of all window up to "now" -- at least for
non-overlapping windows).

Let the input be A,B,C,D,E,F,G,... and let C be the late record. Thus,
you would initially build (eg, 5-tuple) tumbling windows as follows:

ABDEF GHIJK LMNOP

On late arriving of C, you update as follows

insert C in W1
move F from W1 to W2
move K from W2 to W3
(and so on up to "now")

=> ABCDE FGHIJ KLMNOP

This can be quite expensive, depending on the time lag of C.

For a window hop of one tuple the cascading update can be terminated
earlier. (For other hop values, you should think careful to get it right).

Let the input be A,B,C,D,E,F,G,... and let C be the late record. Thus,
you would initially build the following windows:

ABDEF
 BDEFG
  DEFGH

On late arriving of C, you update as follows

ABDEF => ABCDE (remove F, add C)
 BDEFG=>  BCDEF (remove G, add C)
  =>   CDEFG (new window; terminates the update sequence)
  DEFGH   =>DEFGH (stay the same)


Hope this helps to understand the issue...


-Matthias


On 06/24/2016 12:21 AM, Guozhang Wang wrote:
> Hello Ryan,
> 
> On the DSL layer, currently there is not support for record window yet; and
> we are discussing about adding such support in the future, maybe first
> session windows then others.
> 
> On the Processor API layer, you can definitely implement this "record
> window" feature yourself by keeping track of the most recent 10 records in
> a state store, and upon each new incoming record, delete the oldest record
> and insert this new record, and re-compute the count (for your specific
> case you do not need to re-scan all 10 records, but just get the new counts
> as
> 
> old counts + (if new record > 100) ? 1 : 0 - (if old record > 100) ? 1 : 0
> 
> 
> Guozhang
> 
> 
> On Thu, Jun 23, 2016 at 1:55 PM, Ryan Thompson 
> wrote:
> 
>> Hello,
>>
>> Say I have a stream, and want to determine whether or not a given "density"
>> of of records match a given condition.  For example, let's say I want to
>> how many of the last 10 records have a numerical value greater than 100.
>>
>> Does the kafka streams DSL (or processor API) provide a way to do this type
>> of aggregation in a way that supports out of order messages?
>>
>> I can't use a time window based aggregation here because my window is based
>> on a quantity of records (in this case, the last 10) rather than time.
>> However, I still want to know about the last 10 records regardless of what
>> order they arrive.
>>
>> Thanks,
>> Ryan
>>
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: log4j setting for embedded kafka server

2016-06-24 Thread Guozhang Wang
Siyuan,

log4j.properties only gets read by the kafka-run-class.sh scripts:

KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/config/tools-log4j.properties


If you start the server within your Java application, you need to try
to pass "log4j.configuration" to Kafka.


Guozhang


On Fri, Jun 24, 2016 at 10:27 AM, hsy...@gmail.com  wrote:

> Hi guys,
>
> I start server grammatically in my application using
> KafkaStatableServer.startup() method. And in the log4j.properties setting.
> I add this
>
> log4j.logger.org.apacke.kafka=WARN
> log4j.logger.kafka=WARN
>
> But I always get INFO log, Do you guys know how to enforce the log level
> here? Thanks!
>



-- 
-- Guozhang


Re: issue in SimpleConsumerDemo

2016-06-24 Thread Guozhang Wang
Did you run it multiple times, and did not clean the committed offsets?
This may be a common root cause of seeing fewer messages.


Guozhang

On Fri, Jun 24, 2016 at 2:51 AM, hengheng0h...@163.com <
hengheng0h...@163.com> wrote:

> hi,
> I got an issue when i run SimpleConsumerDemo(source:kafka
> /examples
> /src
> /main
> /java
> /kafka
> /
> examples
> 
> /SimpleConsumerDemo.java)
> I only got 2 messages when i set fetchSize to 100.
>
> thanks!
>
> --
> hengheng0h...@163.com
>



-- 
-- Guozhang


Producer Properties

2016-06-24 Thread Chris Barlock
I started porting our code from Kafka 0.8.2.1 to 0.10.0.0 and found my 
producer code blowing up because of some changes to the config.  For 
example,  metadata.broker.list is now bootstrap.servers.  I discovered the 
ProducerConfig class which has, at least, some of the config keys.  Before 
I screw this up, I'd like some confirmation of the right mappings for 
these config pararmeters in our 0.8.2.1 code:

serializer.classMaybe value.serializer = 
VALUE_SERIALIZER_CLASS_CONFIG?
key.serializer.classMaybe key.serializer = 
KEY_SERIALIZER_CLASS_CONFIG?
producer.type   Not in ProducerConfig
batch.num.messages  Not in ProducerConfig, unless maybe 
batch.size = BATCH_SIZE_CONFIG?
queue.buffering.max.ms  Not in ProducerConfig

Thanks!

Chris



Re: Kafka producer metadata issue

2016-06-24 Thread Shekar Tippur
Intersting. So if we introduce a sleep after the first send then it
produces properly?

Here is my log. Clearly there is a conn reset.

[2016-06-24 13:42:48,620] ERROR Closing socket for /127.0.0.1 because of
error (kafka.network.Processor)

java.io.IOException: Connection reset by peer

at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

at sun.nio.ch.IOUtil.read(IOUtil.java:197)

at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)

at kafka.utils.Utils$.read(Utils.scala:380)

at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)

at kafka.network.Processor.read(SocketServer.scala:444)

at kafka.network.Processor.run(SocketServer.scala:340)

at java.lang.Thread.run(Thread.java:745)

On Fri, Jun 24, 2016 at 1:28 PM, Fumo, Vincent 
wrote:

> I'm seeing similar with the v9 producer. Here is some test code:
>
> @Test
> public void test1() throws InterruptedException {
> Producer producer = createProducer(BROKER_DEV);
> producer.send(new ProducerRecord<>(TOPIC, "value"));
> producer.send(new ProducerRecord<>(TOPIC, "key2", "value2"));
> producer.send(new ProducerRecord<>(TOPIC, "key3", "value3"));
> }
>
> @Test
> public void test2() throws InterruptedException {
> Producer producer = createProducer(BROKER_DEV);
> producer.send(new ProducerRecord<>(TOPIC, "value"));
> Thread.sleep(10L);
> producer.send(new ProducerRecord<>(TOPIC, "key2", "value2"));
> producer.send(new ProducerRecord<>(TOPIC, "key3", "value3"));
> }
>
>
> public Producer createProducer(String broker) {
>
> if (StringUtils.isBlank(broker)) {
> return null;
> }
>
> Properties props = new Properties();
>
> props.put("bootstrap.servers", broker);
> props.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("producer.type", "async");
>
> props.put("max.block.ms", "500");
> props.put("acks", "all");
> props.put("retries", "0");
> props.put("batch.size", "1638");
> props.put("linger.ms", "1");
> props.put("buffer.memory", "33554432");
> props.put("compression.type", "gzip");
> props.put("client.id", "testClientId");
>
> return new KafkaProducer<>(props);
> }
>
> /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper
> mer-arch-zk-01d.something.com:2181 --property print.key=true --topic test
>
> note that when I run test1() I get nothing posted to the topic at all.
> Here is the log produced ::
>
> 16:15:44.527 [main] INFO  o.a.k.c.producer.ProducerConfig - ProducerConfig
> values:
> compression.type = gzip
> metric.reporters = []
> metadata.max.age.ms = 30
> metadata.fetch.timeout.ms = 6
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> buffer.memory = 33554432
> timeout.ms = 3
> key.serializer = class
> org.apache.kafka.common.serialization.StringSerializer
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> block.on.buffer.full = false
> ssl.key.password = null
> max.block.ms = 500
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> ssl.truststore.password = null
> max.in.flight.requests.per.connection = 5
> metrics.num.samples = 2
> client.id = testClientId
> ssl.endpoint.identification.algorithm = null
> ssl.protocol = TLS
> request.timeout.ms = 3
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> acks = all
> batch.size = 1638
> ssl.keystore.location = null
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> retries = 0
> max.request.size = 1048576
> value.serializer = class
> org.apache.kafka.common.serialization.StringSerializer
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
> send.buffer.bytes = 131072
> linger.ms = 1
>
> 16:15:44.544 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
> with name bufferpool-wait-time
> 16:15:44.595 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor

Re: Kafka producer metadata issue

2016-06-24 Thread Fumo, Vincent
I'm seeing similar with the v9 producer. Here is some test code:

@Test
public void test1() throws InterruptedException {
Producer producer = createProducer(BROKER_DEV);
producer.send(new ProducerRecord<>(TOPIC, "value"));
producer.send(new ProducerRecord<>(TOPIC, "key2", "value2"));
producer.send(new ProducerRecord<>(TOPIC, "key3", "value3"));
}

@Test
public void test2() throws InterruptedException {
Producer producer = createProducer(BROKER_DEV);
producer.send(new ProducerRecord<>(TOPIC, "value"));
Thread.sleep(10L);
producer.send(new ProducerRecord<>(TOPIC, "key2", "value2"));
producer.send(new ProducerRecord<>(TOPIC, "key3", "value3"));
}


public Producer createProducer(String broker) {

if (StringUtils.isBlank(broker)) {
return null;
}

Properties props = new Properties();

props.put("bootstrap.servers", broker);
props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
props.put("producer.type", "async");

props.put("max.block.ms", "500");
props.put("acks", "all");
props.put("retries", "0");
props.put("batch.size", "1638");
props.put("linger.ms", "1");
props.put("buffer.memory", "33554432");
props.put("compression.type", "gzip");
props.put("client.id", "testClientId");

return new KafkaProducer<>(props);
}

/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 
mer-arch-zk-01d.something.com:2181 --property print.key=true --topic test

note that when I run test1() I get nothing posted to the topic at all. Here is 
the log produced ::

16:15:44.527 [main] INFO  o.a.k.c.producer.ProducerConfig - ProducerConfig 
values: 
compression.type = gzip
metric.reporters = []
metadata.max.age.ms = 30
metadata.fetch.timeout.ms = 6
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 3
key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms = 500
sasl.kerberos.min.time.before.relogin = 6
connections.max.idle.ms = 54
ssl.truststore.password = null
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id = testClientId
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLS
request.timeout.ms = 3
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
acks = all
batch.size = 1638
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
retries = 0
max.request.size = 1048576
value.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 3
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms = 1

16:15:44.544 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name bufferpool-wait-time
16:15:44.595 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name buffer-exhausted-records
16:15:44.599 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster 
metadata version 1 to Cluster(nodes = [Node(-1, 
cmp-arch-kafka-01d.something.com, 9092)], partitions = [])
16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name connections-closed:client-id-testClientId
16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name connections-created:client-id-testClientId
16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name bytes-sent-received:client-id-testClientId
16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name bytes-sent:client-id-testClientId
16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name bytes-received:client-id-testClientId
16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name select-time:client-id-testClientId
16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name io-time:client-id-testClientId
16:15:44.620 [main] DEBUG o.a.k

Re: Kafka producer metadata issue

2016-06-24 Thread Shekar Tippur
I just see this on kafka.log file

[2016-06-24 13:27:14,346] INFO Closing socket connection to /127.0.0.1.
(kafka.network.Processor)

On Fri, Jun 24, 2016 at 1:05 PM, Shekar Tippur  wrote:

> Hello,
>
> I have a simple Kafka producer directly taken off of
>
>
> https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
>
> I have changed the bootstrap.servers property.
>
> props.put("bootstrap.servers", "localhost:9092");
>
> I dont see any events added to the test topic.
>
> console-producer works fine with broker localhost:9092.
>
> *I see that if I change *props.put("metadata.fetch.timeout.ms",100);
>
> the wait reduces but I still dont see any events in the topic.
>
> Can someone please explain what could be going on?
>
> - Shekar
>
>


Kafka producer metadata issue

2016-06-24 Thread Shekar Tippur
Hello,

I have a simple Kafka producer directly taken off of

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

I have changed the bootstrap.servers property.

props.put("bootstrap.servers", "localhost:9092");

I dont see any events added to the test topic.

console-producer works fine with broker localhost:9092.

*I see that if I change *props.put("metadata.fetch.timeout.ms",100);

the wait reduces but I still dont see any events in the topic.

Can someone please explain what could be going on?

- Shekar


[kafka-connect] multiple or single clusters?

2016-06-24 Thread noah
I'm having some trouble figuring out the right way to run Kafka Connect in
production. We will have multiple sink connectors that we need to remain
running indefinitely and have at least once semantics (with as little
duplication as possible) so it seems clear that we need to run in
distributed mode so that our offsets are durable and we can scale up by
adding new distributed mode instances of Connect.

What isn't clear is what the best way to run multiple, heterogenous
connectors in distributed mode is. It looks like every instance of Connect
will read the config/status topics and take on some number of tasks (and
that tasks can't be assigned to specific running instances of Connect.) It
also looks like it is only possible to configure 1 key and value converter
per Connect instance. So if I need two different conversion strategies, I'd
need to either write a custom converter that can figure it out, or run
multiple Connect clusters, each with their own set of config+offset+status
topics.

Is that right? Worst case, I need another set of N distributed Connect
instances per sink/source, which ends up being a lot of topics to manage.
What does a real-world Connect topology look like?


Re: log.retention.bytes

2016-06-24 Thread Alex Loddengaard
Hi Dave,

log.retention.bytes is per partition. If you change it after the topic was
created, you'll see the behavior you expect -- namely that the new value is
used when the log is cleaned. The frequency that the log is cleaned is
controlled by log.retention.check.interval.ms, with a default value of 5
minutes.

Hope this helps!

Alex

On Fri, Jun 24, 2016 at 9:19 AM, Tauzell, Dave  wrote:

> Is the log.retention.bytes setting per partition or for the whole topic?
>   If I change it after a topic has been created do the changes apply to the
> existing topics?
>
> Thanks,
>Dave
>
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of the
> individual or entity to whom they are addressed. If you have received this
> e-mail in error, please notify the sender by reply e-mail immediately and
> destroy all copies of the e-mail and any attachments.
>


log4j setting for embedded kafka server

2016-06-24 Thread hsy...@gmail.com
Hi guys,

I start server grammatically in my application using
KafkaStatableServer.startup() method. And in the log4j.properties setting.
I add this

log4j.logger.org.apacke.kafka=WARN
log4j.logger.kafka=WARN

But I always get INFO log, Do you guys know how to enforce the log level
here? Thanks!


log.retention.bytes

2016-06-24 Thread Tauzell, Dave
Is the log.retention.bytes setting per partition or for the whole topic?If 
I change it after a topic has been created do the changes apply to the existing 
topics?

Thanks,
   Dave

This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


issue in SimpleConsumerDemo

2016-06-24 Thread hengheng0h...@163.com
hi,
I got an issue when i run 
SimpleConsumerDemo(source:kafka/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java)
I only got 2 messages when i set fetchSize to 100.

thanks!



hengheng0h...@163.com


RE: Setting max fetch size for the console consumer

2016-06-24 Thread Tauzell, Dave
Thanks!  I also had to pass  --consumer.config=/etc/kafka/consumer.properties  
to the command line consumer.

-Dave

Dave Tauzell | Senior Software Engineer | Surescripts
O: 651.855.3042 | www.surescripts.com |   dave.tauz...@surescripts.com
Connect with us: Twitter I LinkedIn I Facebook I YouTube


-Original Message-
From: Ben Stopford [mailto:b...@confluent.io] 
Sent: Friday, June 24, 2016 8:41 AM
To: users@kafka.apache.org
Subject: Re: Setting max fetch size for the console consumer

It’s actually more than one setting: 
http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message 


B
> On 24 Jun 2016, at 14:31, Tauzell, Dave  wrote:
> 
> How do I set the maximum fetch size for the console consumer?
> 
> I'm getting this error when doing some testing with large messages:
> 
> kafka.common.MessageSizeTooLargeException: Found a message larger than the 
> maximum fetch size of this consumer on topic replicated_twice partition 28 at 
> fetch offset 11596. Increase the fetch size, or decrease the maximum message 
> size the broker will allow.
> 
> 
> 
> Dave Tauzell | Senior Software Engineer | Surescripts
> O: 651.855.3042 | www.surescripts.com |   
> dave.tauz...@surescripts.com
> Connect with us: Twitter I 
> LinkedIn I 
> Facebook I 
> YouTube
> 
> 
> This e-mail and any files transmitted with it are confidential, may contain 
> sensitive information, and are intended solely for the use of the individual 
> or entity to whom they are addressed. If you have received this e-mail in 
> error, please notify the sender by reply e-mail immediately and destroy all 
> copies of the e-mail and any attachments.



Re: Setting max fetch size for the console consumer

2016-06-24 Thread Ben Stopford
It’s actually more than one setting: 
http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message 


B
> On 24 Jun 2016, at 14:31, Tauzell, Dave  wrote:
> 
> How do I set the maximum fetch size for the console consumer?
> 
> I'm getting this error when doing some testing with large messages:
> 
> kafka.common.MessageSizeTooLargeException: Found a message larger than the 
> maximum fetch size of this consumer on topic replicated_twice partition 28 at 
> fetch offset 11596. Increase the fetch size, or decrease the maximum message 
> size the broker will allow.
> 
> 
> 
> Dave Tauzell | Senior Software Engineer | Surescripts
> O: 651.855.3042 | www.surescripts.com |   
> dave.tauz...@surescripts.com
> Connect with us: Twitter I 
> LinkedIn I 
> Facebook I 
> YouTube
> 
> 
> This e-mail and any files transmitted with it are confidential, may contain 
> sensitive information, and are intended solely for the use of the individual 
> or entity to whom they are addressed. If you have received this e-mail in 
> error, please notify the sender by reply e-mail immediately and destroy all 
> copies of the e-mail and any attachments.



Setting max fetch size for the console consumer

2016-06-24 Thread Tauzell, Dave
How do I set the maximum fetch size for the console consumer?

I'm getting this error when doing some testing with large messages:

kafka.common.MessageSizeTooLargeException: Found a message larger than the 
maximum fetch size of this consumer on topic replicated_twice partition 28 at 
fetch offset 11596. Increase the fetch size, or decrease the maximum message 
size the broker will allow.



Dave Tauzell | Senior Software Engineer | Surescripts
O: 651.855.3042 | www.surescripts.com |   
dave.tauz...@surescripts.com
Connect with us: Twitter I 
LinkedIn I 
Facebook I 
YouTube


This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


Re: is kafka the right choice

2016-06-24 Thread Ben Stopford
correction: elevates => alleviates

> On 24 Jun 2016, at 11:13, Ben Stopford  wrote:
> 
> Kafka uses a long poll 
> . So requests 
> effectively block on the server, if there is insufficient data available. 
> This elevates many of the issues associated with traditional polling 
> approaches. 
> 
> Service-based applications often require directed channels to do request 
> response. People do use Kafka in this way, Philippe gave a good example 
> below.  You just need to be aware that, should you have a lot of services 
> that need to interact, it could involve creating a lot of topics.  Kafka 
> topics are persistent and generally long lived. They shouldn’t be considered 
> ephemeral imho. 
> 
> B
> 
> 
>> On 23 Jun 2016, at 17:35, Philippe Derome > > wrote:
>> 
>> See Keyhole Software blog and particularly John Boardman's presentation of
>> sample app with responsive web client using WebSockets connecting to a
>> netty embedded web server that itself uses producer and consumer clients
>> with a Kafka infrastructure (@johnwboardman). On first look, it seems like
>> a valid approach. Behind the web server are services that are Kafka apps
>> interacting with external web APIs.
>> 
>> Anecdotally quite a few companies post jobs with Kafka playing a role in a
>> micro architecture solution.
>> 
>> I'll now let experts speak...
>> On 23 Jun 2016 11:47 a.m., "Pranay Suresh" > > wrote:
>> 
>>> Hey Kafka experts,
>>> 
>>> After having read Jay Kreps awesome Kafka reading(
>>> 
>>> https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
>>>  
>>> 
>>> )
>>> I have a doubt.
>>> 
>>> For communication between browsers (lets say collaborative editing, chat
>>> etc.) is Kafka the right choice ? Especially given that Kafka consumers are
>>> designed to pull , rather than a callback style push. For low latency
>>> possibly ephemeral data/events is Kafka a good choice ? Can I have a
>>> browser open a socket into a webserver and each request initiate a consumer
>>> to consume from kafka (by polling?) OR is Kafka designed and/or meant to be
>>> used for a separate usecase ?
>>> 
>>> Any feedback is appreciated. Let the bashing begin!
>>> 
>>> Many Thanks,
>>> pranay
>>> 
> 



kafka lost data when use scala API to send data.

2016-06-24 Thread DuanSky
Hello With Respect,
  Here I met a problem when use scala API to send/receive data to/from kafka 
brokers. I write a very simple producer and consumer code(just like the 
official examples), I found the code with Java API can work correctly, but the 
code with Scala API may lost data. Here is details.


Config: I down load kafka_2.11-0.10.0.0.tgz binary files and start it on single 
mode. Just one broker and one zookeeper, use default configuration.


Problem: 
(1)Java API Test 
  I write a simple consumer and producer program with Java API first. The 
producer code is like this
code A
void produce() {
int messageNo = 1;
while (messageNo <= Config.count) {
for (String topic : KafkaConfig.topics.split(",")) {
String key = String.valueOf(messageNo);
String data = topic + "-" + new Date();
producer.send(new KeyedMessage(topic, key ,data));
System.out.println(topic + "#" + key + "#" + data);
}
messageNo ++;
}
}
--
The consumer code is like this:
code B
void consume() {
Map topicCountMap = new HashMap();

StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

for (String topic : Config.topics.split(",")) {
topicCountMap.put(topic, new Integer(1));
}

final Map>> consumerMap =

consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);

for (final String topic : Config.topics.split(",")) {
new Thread(new Runnable() {
public void run() {
int count = 0;
KafkaStream stream = 
consumerMap.get(topic).get(0);
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
count ++;
MessageAndMetadata message = it.next();
System.out.println(count + "#"  + message.topic() +":" + 
message.key() + ":"+message.message());
}
}
}).start();

}

}
--
As I change the number of Config.count (which is the total number of every 
topic, here I use two topic -- a and b) I found that the consumer will receive 
the same number data no matter what the count is. So Java API is correct, but 
when I do the same thing using Scala API, I found some data may lost when send 
to the kafka brokers.


(2) Kafka API Test
  I write a simple producer program with Scala API, part of it like this
--
code C
def main(args:Array[String]): Unit ={
  val producer = {
// Zookeeper connection properties
val props = new util.HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.ACKS_CONFIG,"-1")

new KafkaProducer[String, String](props)
  }

  (1 to Config.count).foreach(key => {
Config.topics.split(",").foreach( topic =>{
  val data = topic+new Date().toString
  val message = new ProducerRecord[String,String](topic, key+"", data)
  producer.send(message)
  System.out.println(topic + "#" + key + "#" + data)
})
  })

}
--
Step:
(1) First I start a consumer node waiting for receiving data;(use code B above)
(2) Then I start a producer node to produce data. (use code C above) 

Problem:
 I found that when I start code C, the producer produce data very fast that 
sending 100 messages takes no more than 1 second. While the Producer with Java 
API(code A above) cannot send data so fast; the consumer(code B) can only 
receive 34 topics a and 34 topics b then stand by, in fact I produce 100 
messages. I have changed the count of the message that I send to Kafka, but no 
matter how many data I want to send, the producer can only receive half then I 
send sometimes less. More messages I send then more data I will lost.[lost 
means I can not send them successfully to brokers or I send successfully but 
cannot receive? In fact what I saw is I received part of the data and will not 
consume any more] 
Try:
  I found that the producer of Scala API send data too fast, so I add 
Thread.sleep(time) af

Re: is kafka the right choice

2016-06-24 Thread Ben Stopford
Kafka uses a long poll 
. So requests 
effectively block on the server, if there is insufficient data available. This 
elevates many of the issues associated with traditional polling approaches. 

Service-based applications often require directed channels to do request 
response. People do use Kafka in this way, Philippe gave a good example below.  
You just need to be aware that, should you have a lot of services that need to 
interact, it could involve creating a lot of topics.  Kafka topics are 
persistent and generally long lived. They shouldn’t be considered ephemeral 
imho. 

B


> On 23 Jun 2016, at 17:35, Philippe Derome  wrote:
> 
> See Keyhole Software blog and particularly John Boardman's presentation of
> sample app with responsive web client using WebSockets connecting to a
> netty embedded web server that itself uses producer and consumer clients
> with a Kafka infrastructure (@johnwboardman). On first look, it seems like
> a valid approach. Behind the web server are services that are Kafka apps
> interacting with external web APIs.
> 
> Anecdotally quite a few companies post jobs with Kafka playing a role in a
> micro architecture solution.
> 
> I'll now let experts speak...
> On 23 Jun 2016 11:47 a.m., "Pranay Suresh"  wrote:
> 
>> Hey Kafka experts,
>> 
>> After having read Jay Kreps awesome Kafka reading(
>> 
>> https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
>> )
>> I have a doubt.
>> 
>> For communication between browsers (lets say collaborative editing, chat
>> etc.) is Kafka the right choice ? Especially given that Kafka consumers are
>> designed to pull , rather than a callback style push. For low latency
>> possibly ephemeral data/events is Kafka a good choice ? Can I have a
>> browser open a socket into a webserver and each request initiate a consumer
>> to consume from kafka (by polling?) OR is Kafka designed and/or meant to be
>> used for a separate usecase ?
>> 
>> Any feedback is appreciated. Let the bashing begin!
>> 
>> Many Thanks,
>> pranay
>>