How does offsets.retention.minutes work

2017-03-15 Thread tao xiao
Hi team,

I know that Kafka deletes offset for a consumer group after a period of
time (configured by offsets.retention.minutes) if the consumer group is
inactive for this amount of time. I want to understand the definition of
"inactive". I came across this post[1] and it suggests that no offset
committed for this amount of time is considered inactive. Does it mean that
even I have a consumer group that connects to Kafka all the time but no
offset is committed for that amount of time, the offset will still be
deleted?




[1]
http://stackoverflow.com/questions/39131465/how-does-an-offset-expire-for-an-apache-kafka-consumer-group


Need kafka client v0.9.0.2 / fix for KAFKA-3594

2017-03-15 Thread Phil Adams
I'm currently using the kafka-clients v0.9.0.1 library along with the
v0.9.0.1 kafka broker.
Unfortunately, we've run into this problem:
https://issues.apache.org/jira/browse/KAFKA-3594
According to the JIRA, this has been fixed in v0.9.0.2 but I can't find a
pre-built kafka-clients library for this version.

What is the recommended way to get a fix for this problem given that I need
to stick with the 0.9.0.1 broker?

My understanding is that the v0.10.x kafka-clients libaries will not
inter-operate with a v0.9.x broker.
I'm happy to download source and build myself if that's the only way to get
the defect fix.

Thanks in advance,

-- 
Phil Adams


Re: restart Kafka Streams application takes around 5 minutes

2017-03-15 Thread Guozhang Wang
Are you building with any released versions of Kafka or with a build from
Kafka trunk? There are a few fixes we have made post 0.10.2 in trunk that
has largely reduced the rebalance latency so I'd recommend try using a
build from Kafka trunk for testing if possible.


Guozhang

On Wed, Mar 15, 2017 at 10:46 AM, Tianji Li  wrote:

> It seems independent to the rocksdb sizes. It also took 5 minutes when
> there are 375Mbytes this morning...
>
> On Wed, Mar 15, 2017 at 9:13 AM, Sachin Mittal  wrote:
>
> > rocksdb state store initialization may be taking up that time.
> > Whats the size of your rockksdb state directory. May be partitioning the
> > source topic, increasing the number of threads/instances processing the
> > source and reducing the time window of aggregation can help in reducing
> the
> > startup time.
> >
> >
> >
> > On Wed, Mar 15, 2017 at 6:36 PM, Tianji Li  wrote:
> >
> > > Hi there,
> > >
> > > In the experiments I am doing now, if I restart the streams
> application,
> > I
> > > have to wait for around 5 minutes for some reason.
> > >
> > > I can see something in the Kafka logs:
> > >
> > > [2017-03-15 08:36:18,118] INFO [GroupCoordinator 0]: Preparing to
> > > restabilize group xxx-test25 with old generation 2
> > > (kafka.coordinator.GroupCoordinator)
> > > [2017-03-15 08:41:08,449] INFO [GroupCoordinator 0]: Stabilized group
> > > xxx-test25 generation 3 (kafka.coordinator.GroupCoordinator)
> > > [2017-03-15 08:41:08,473] INFO [GroupCoordinator 0]: Assignment
> received
> > > from leader for group xxx-test25 for generation 3
> > > (kafka.coordinator.GroupCoordinator)
> > >
> > > What is happening in these 5 minutes? How to reduce it?
> > >
> > > Thanks
> > > Tianji
> > >
> >
>



-- 
-- Guozhang


Re: Kafka Stream: RocksDBKeyValueStoreSupplier performance

2017-03-15 Thread Eno Thereska
Tianji,

A couple of things:

- for now could you use RocksDb without the cache? I've opened a JIRA to verify 
why it's slower with the cache: 
https://issues.apache.org/jira/browse/KAFKA-4904 
 

- you can tune the RocksDb performance further by increasing "its" cache (yes, 
RocksDb has a separate cache and its size is set to quite small by default). 
Look at this question on how to do that with the RocksDbConfigSetter: 
https://groups.google.com/forum/#!topic/confluent-platform/RgkaUy1TUno 
. This 
might be a bit too much to start with, but it's possible. You'd have to set the 
blockCacheSize option, for example as done in the openDb call in 
RocksDbStore.java 


- in summary, I'd recommend you use RocksDb as is since 7 vs 5 is a reasonable 
difference.

However, the real performance will be when you actually enable logging, right? 
You might want RocksDb to be backed to Kafka for fault tolerance.

Finally, make sure to use 0.10.2, the latest release.

Thanks
Eno


> On 15 Mar 2017, at 18:14, Tianji Li  wrote:
> 
> Hi Eno,
> 
> Rocksdb without caching took around 7 minutes.
> 
> Tianji
> 
> 
> On Wed, Mar 15, 2017 at 9:40 AM, Eno Thereska 
> wrote:
> 
>> Tianji,
>> 
>> Could you provide a third data point, running with RocksDb, but without
>> caching, i.e:
>> 
>>> StateStoreSupplier stateStoreSupplier = Stores.create(storeName)
>>>   .withKeys(stringSerde)
>>>   .withValues(avroSerde)
>>>   .persistent()
>>>   .disableLogging()
>>>   .build();
>> 
>> 
>> Thanks
>> Eno
>> 
>> 
>>> On 15 Mar 2017, at 13:02, Tianji Li  wrote:
>>> 
>>> Hi there,
>>> 
>>> It seems that the RocksDB state store is quite slow in my case and I
>> wonder
>>> if I did anything wrong.
>>> 
>>> I have a topic, that I groupBy() and then aggregate() 50 times. That is,
>> I
>>> will create 50 results topics and a lot more changelog and repartition
>>> topics.
>>> 
>>> There are a few things that are weird and here I report one, which is the
>>> State store speed.
>>> 
>>> If I use:
>>> 
>>> StateStoreSupplier stateStoreSupplier = Stores.create(storeName)
>>>   .withKeys(stringSerde)
>>>   .withValues(avroSerde)
>>>   .inMemory()
>>>   .build();
>>> 
>>> Then processing 1 millions records takes around 5 minutes on my coding
>>> computer.
>>> 
>>> If I use:
>>> 
>>> StateStoreSupplier stateStoreSupplier = Stores.create(storeName)
>>>   .withKeys(stringSerde)
>>>   .withValues(avroSerde)
>>>   .persistent()
>>>   .disableLogging()
>>>   .enableCaching()
>>>   .build();
>>> 
>>> Processing the same 1 million records takes around 10 minutes.
>>> 
>>> I believe in the first case, changelog is backed up to Kafka and in the
>>> second case, only RocketsDB is used.
>>> 
>>> But why the RocketsDB is so slow?
>>> 
>>> Eventually, I am hoping to do windowed aggregation and it seems I have to
>>> use RocketsDB, but given the performance, I am hesitating.
>>> 
>>> Thanks
>>> Tianji
>> 
>> 



Re: Kafka Stream: RocksDBKeyValueStoreSupplier performance

2017-03-15 Thread Tianji Li
Hi Eno,

Rocksdb without caching took around 7 minutes.

Tianji


On Wed, Mar 15, 2017 at 9:40 AM, Eno Thereska 
wrote:

> Tianji,
>
> Could you provide a third data point, running with RocksDb, but without
> caching, i.e:
>
> > StateStoreSupplier stateStoreSupplier = Stores.create(storeName)
> >.withKeys(stringSerde)
> >.withValues(avroSerde)
> >.persistent()
> >.disableLogging()
> >.build();
>
>
> Thanks
> Eno
>
>
> > On 15 Mar 2017, at 13:02, Tianji Li  wrote:
> >
> > Hi there,
> >
> > It seems that the RocksDB state store is quite slow in my case and I
> wonder
> > if I did anything wrong.
> >
> > I have a topic, that I groupBy() and then aggregate() 50 times. That is,
> I
> > will create 50 results topics and a lot more changelog and repartition
> > topics.
> >
> > There are a few things that are weird and here I report one, which is the
> > State store speed.
> >
> > If I use:
> >
> >  StateStoreSupplier stateStoreSupplier = Stores.create(storeName)
> >.withKeys(stringSerde)
> >.withValues(avroSerde)
> >.inMemory()
> >.build();
> >
> > Then processing 1 millions records takes around 5 minutes on my coding
> > computer.
> >
> > If I use:
> >
> >  StateStoreSupplier stateStoreSupplier = Stores.create(storeName)
> >.withKeys(stringSerde)
> >.withValues(avroSerde)
> >.persistent()
> >.disableLogging()
> >.enableCaching()
> >.build();
> >
> > Processing the same 1 million records takes around 10 minutes.
> >
> > I believe in the first case, changelog is backed up to Kafka and in the
> > second case, only RocketsDB is used.
> >
> > But why the RocketsDB is so slow?
> >
> > Eventually, I am hoping to do windowed aggregation and it seems I have to
> > use RocketsDB, but given the performance, I am hesitating.
> >
> > Thanks
> > Tianji
>
>


Re: Not Serializable Result Error

2017-03-15 Thread Michael Noll
Hi Armaan,

> org.apache.spark.SparkException: Job aborted due to stage failure:
>Task 0.0 in stage 0.0 (TID 0) had a not serializable result:
org.apache.kafka.clients.consumer.ConsumerRecord

perhaps you should ask that question in the Spark mailing list, which
should increase your chances of getting a good response for this Spark
error.  You should also share the Spark and Kafka versions you use.

-Michael



On Fri, Mar 10, 2017 at 7:34 PM, Armaan Esfahani <
armaan.esfah...@advancedopen.com> wrote:

> Hello, I have been trying to setup a SMACK stack to learn the basics of
> Kafka Streams and Spark, yet I keep coming across the following error:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0.0 in stage 0.0 (TID 0) had a not serializable result:
> org.apache.kafka.clients.consumer.ConsumerRecord
>
>
>
> I have a “Tweet” object which is a simple POJO with a Date and String that
> then has a Serializer and Deserializer class.
>
>
>
> I have tested creating an object, serializing it to a local file, then
> reading it with the deserializer and it works fine—however over the stream
> it fails.
>
>
>
> To read the data from the kafka stream , I have setup a an input stream
> using the following code:
>
>
>
> Map  kafkaParams = new HashMap<>();
>
> kafkaParams.put("bootstrap.servers", brokers);
>
> kafkaParams.put("key.deserializer", StringDeserializer.class);
>
> kafkaParams.put("value.deserializer", TweetDeserializer.class);
>
> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>
>
>
> JavaInputDStream> tweets =
> KafkaUtils.createDirectStream(
>
> jssc,
>
> LocationStrategies.PreferConsistent(),
>
> ConsumerStrategies.Subscribe(topicsSet,
> kafkaParams)
>
> );
>
>
>
> To send a sample object to Kafka, I have the following for testing:
>
>
>
> Properties props = new Properties();
>
> props.put("bootstrap.servers", "192.168.194.194:9092");
>
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> StringSerializer");
>
> props.put("value.serializer", "com.armaanaki.smack.tweet.
> TweetSerializer");
>
>
>
> final KafkaProducer kafkaProducer = new
> KafkaProducer(props);
>
>
>
> ProducerRecord record = new ProducerRecord Tweet>("tweets1", "1", new Tweet(new Date(), "Test"));
>
> kafkaProducer.send(record);
>
>
>
>
>
> Can anyone explain my error? Thanks!
>
>


Javadoc for org.apache.kafka.common.requests package

2017-03-15 Thread Afshartous, Nick

Hi,


I'm trying to use kafka-clients-0.10.1.0.jar and don't see Javadoc for the 
classes in package


org.apache.kafka.common.requests


Am I looking in the right place here


   
https://kafka.apache.org/0100/javadoc/index.html


Thanks for any info,

--

Nick


Re: restart Kafka Streams application takes around 5 minutes

2017-03-15 Thread Tianji Li
It seems independent to the rocksdb sizes. It also took 5 minutes when
there are 375Mbytes this morning...

On Wed, Mar 15, 2017 at 9:13 AM, Sachin Mittal  wrote:

> rocksdb state store initialization may be taking up that time.
> Whats the size of your rockksdb state directory. May be partitioning the
> source topic, increasing the number of threads/instances processing the
> source and reducing the time window of aggregation can help in reducing the
> startup time.
>
>
>
> On Wed, Mar 15, 2017 at 6:36 PM, Tianji Li  wrote:
>
> > Hi there,
> >
> > In the experiments I am doing now, if I restart the streams application,
> I
> > have to wait for around 5 minutes for some reason.
> >
> > I can see something in the Kafka logs:
> >
> > [2017-03-15 08:36:18,118] INFO [GroupCoordinator 0]: Preparing to
> > restabilize group xxx-test25 with old generation 2
> > (kafka.coordinator.GroupCoordinator)
> > [2017-03-15 08:41:08,449] INFO [GroupCoordinator 0]: Stabilized group
> > xxx-test25 generation 3 (kafka.coordinator.GroupCoordinator)
> > [2017-03-15 08:41:08,473] INFO [GroupCoordinator 0]: Assignment received
> > from leader for group xxx-test25 for generation 3
> > (kafka.coordinator.GroupCoordinator)
> >
> > What is happening in these 5 minutes? How to reduce it?
> >
> > Thanks
> > Tianji
> >
>


Re: Offset commit request failing

2017-03-15 Thread Robert Quinlivan
I should also mention that this error was seen on broker version 0.10.1.1.
I found that this condition sounds somewhat similar to KAFKA-4362
, but that issue was
submitted in 0.10.1.1 so they appear to be different issues.

On Wed, Mar 15, 2017 at 11:11 AM, Robert Quinlivan 
wrote:

> Good morning,
>
> I'm hoping for some help understanding the expected behavior for an offset
> commit request and why this request might fail on the broker.
>
> *Context:*
>
> For context, my configuration looks like this:
>
>- Three brokers
>- Consumer offsets topic replication factor set to 3
>- Auto commit enabled
>- The user application topic, which I will call "my_topic", has a
>replication factor of 3 as well and 800 partitions
>- 4000 consumers attached in consumer group "my_group"
>
>
> *Issue:*
>
> When I attach the consumers, the coordinator logs the following error
> message repeatedly for each generation:
>
> ERROR [Group Metadata Manager on Broker 0]: Appending metadata message for
> group my_group generation 2066 failed due to org.apache.kafka.common.
> errors.RecordTooLargeException, returning UNKNOWN error code to the
> client (kafka.coordinator.GroupMetadataManager)
>
> *Observed behavior:*
>
> The consumer group does not stay connected long enough to consume
> messages. It is effectively stuck in a rebalance loop and the "my_topic"
> data has become unavailable.
>
>
> *Investigation:*
>
> Following the Group Metadata Manager code, it looks like the broker is
> writing to a cache after it writes an Offset Commit Request to the log
> file. If this cache write fails, the broker then logs this error and
> returns an error code in the response. In this case, the error from the
> cache is MESSAGE_TOO_LARGE, which is logged as a RecordTooLargeException.
> However, the broker then sets the error code to UNKNOWN on the Offset
> Commit Response.
>
> It seems that the issue is the size of the metadata in the Offset Commit
> Request. I have the following questions:
>
>1. What is the size limit for this request? Are we exceeding the size
>which is causing this request to fail?
>2. If this is an issue with metadata size, what would cause abnormally
>large metadata?
>3. How is this cache used within the broker?
>
>
> Thanks in advance for any insights you can provide.
>
> Regards,
> Robert Quinlivan
> Software Engineer, Signal
>



-- 
Robert Quinlivan
Software Engineer, Signal


Help with SASL configuration for Zookeeper on the Microsoft AD.

2017-03-15 Thread Shrikant Patel
Hi

Has anyone experience with securing Kafka to Zookeeper configuration and 
setting up SASL on Microsoft AD account.

We create keytab and principal for Kafka and ZK using 
https://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption/

We see these principal in our AD. When ZK and Kafka are launched they are able 
to connect to Kerberos \ AD server using their individual keytabs. But when 
Kafka tries to request service ticket for ZK from Kerberos, it errors out using 
below error.

>>>KRBError:
 sTime is Fri Feb 10 11:48:41 CST 2017 1486748921000
 suSec is 282568
 error code is 7
 error Message is Server not found in Kerberos database
 sname is zk/.x@xxx.com
 msgType is 30

(https://issues.apache.org/jira/browse/ZOOKEEPER-1811 , as per this we have set 
zookeeper.sasl.client.username so that zk is used for zookeeper name)

It seems the issue is we may not setup SPN (servive profile name) correct, or 
link the user account\keytab to the SPN.

We have spent good amount of time with our IT\AD team on this. We are ready to 
provide some monetary incentive to anyone if they help us resolve this issue.

Thanks,
Shri

This e-mail and its contents (to include attachments) are the property of 
National Health Systems, Inc., its subsidiaries and affiliates, including but 
not limited to Rx.com Community Healthcare Network, Inc. and its subsidiaries, 
and may contain confidential and proprietary or privileged information. If you 
are not the intended recipient of this e-mail, you are hereby notified that any 
unauthorized disclosure, copying, or distribution of this e-mail or of its 
attachments, or the taking of any unauthorized action based on information 
contained herein is strictly prohibited. Unauthorized use of information 
contained herein may subject you to civil and criminal prosecution and 
penalties. If you are not the intended recipient, please immediately notify the 
sender by telephone at 800-433-5719 or return e-mail and permanently delete the 
original e-mail.


Re: Performance and Encryption

2017-03-15 Thread Hans Jespersen
You are correct that a Kafka broker is not just writing to one file. Jay Kreps 
wrote a great blog post with lots of links to even greater detail on the topic 
of Kafka and disk write performance. Still a good read many years later.

https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
 


-hans


> On Mar 15, 2017, at 7:51 AM, Nicolas MOTTE  wrote:
> 
> Ok that makes sense, thanks !
> 
> The next question I have regarding performance is about the way Kafka writes 
> in the data files.
> I often hear Kafka is very performant because it writes in an append-only 
> fashion.
> So even with hard disk (not SSD) we get a great performance because it writes 
> in sequence.
> 
> I could understand that if Kafka was only writing to one file.
> But in reality it s writing to N files, N being the number of partitions 
> hosted by the broker.
> So even though it appends the data to each file, overall I assume it is not 
> writing in sequence on the disk.
> 
> Am I wrong ?
> 
> -Original Message-
> From: Tauzell, Dave [mailto:dave.tauz...@surescripts.com] 
> Sent: 08 March 2017 22:09
> To: users@kafka.apache.org
> Subject: RE: Performance and Encryption
> 
> I think because the product batches messages which could be for different 
> topics.
> 
> -Dave
> 
> -Original Message-
> From: Nicolas MOTTE [mailto:nicolas.mo...@amadeus.com]
> Sent: Wednesday, March 8, 2017 2:41 PM
> To: users@kafka.apache.org
> Subject: Performance and Encryption
> 
> Hi everyone,
> 
> I understand one of the reasons why Kafka is performant is by using zero-copy.
> 
> I often hear that when encryption is enabled, then Kafka has to copy the data 
> in user space to decode the message, so it has a big impact on performance.
> 
> If it is true, I don t get why the message has to be decoded by Kafka. I 
> would assume that whether the message is encrypted or not, Kafka simply 
> receives it, appends it to the file, and when a consumer wants to read it, it 
> simply reads at the right offset...
> 
> Also I m wondering if it s the case if we don t use keys (pure queuing system 
> with key=null).
> 
> Cheers
> Nico
> 
> This e-mail and any files transmitted with it are confidential, may contain 
> sensitive information, and are intended solely for the use of the individual 
> or entity to whom they are addressed. If you have received this e-mail in 
> error, please notify the sender by reply e-mail immediately and destroy all 
> copies of the e-mail and any attachments.
> 



Offset commit request failing

2017-03-15 Thread Robert Quinlivan
Good morning,

I'm hoping for some help understanding the expected behavior for an offset
commit request and why this request might fail on the broker.

*Context:*

For context, my configuration looks like this:

   - Three brokers
   - Consumer offsets topic replication factor set to 3
   - Auto commit enabled
   - The user application topic, which I will call "my_topic", has a
   replication factor of 3 as well and 800 partitions
   - 4000 consumers attached in consumer group "my_group"


*Issue:*

When I attach the consumers, the coordinator logs the following error
message repeatedly for each generation:

ERROR [Group Metadata Manager on Broker 0]: Appending metadata message for
group my_group generation 2066 failed due to
org.apache.kafka.common.errors.RecordTooLargeException, returning UNKNOWN
error code to the client (kafka.coordinator.GroupMetadataManager)

*Observed behavior:*

The consumer group does not stay connected long enough to consume messages.
It is effectively stuck in a rebalance loop and the "my_topic" data has
become unavailable.


*Investigation:*

Following the Group Metadata Manager code, it looks like the broker is
writing to a cache after it writes an Offset Commit Request to the log
file. If this cache write fails, the broker then logs this error and
returns an error code in the response. In this case, the error from the
cache is MESSAGE_TOO_LARGE, which is logged as a RecordTooLargeException.
However, the broker then sets the error code to UNKNOWN on the Offset
Commit Response.

It seems that the issue is the size of the metadata in the Offset Commit
Request. I have the following questions:

   1. What is the size limit for this request? Are we exceeding the size
   which is causing this request to fail?
   2. If this is an issue with metadata size, what would cause abnormally
   large metadata?
   3. How is this cache used within the broker?


Thanks in advance for any insights you can provide.

Regards,
Robert Quinlivan
Software Engineer, Signal


RE: Streams 0.10.2.0 + RocksDB + Avro

2017-03-15 Thread Adrian McCague
Hi Damian,

That is the SerDe we are using, agreed that looks like a good modification to 
make here for a state store version. 
I would add that it is a good idea to include the record type as well since an 
edit of the topology arrangement could still lead to issues down the line.

Thank you for your response, I will create a JIRA in due course
Adrian

-Original Message-
From: Damian Guy [mailto:damian@gmail.com] 
Sent: 15 March 2017 15:00
To: users@kafka.apache.org
Subject: Re: Streams 0.10.2.0 + RocksDB + Avro

Hi Adrian,

The state store names are local names hence don't have the applicationId 
prefix, i.e., they are laid out on disk like so:
/state.dir/applicationId/task/state-store-name. Their corresponding change-logs 
are prefixed with the applicationId.

However, i can see in the case of the schema-registry that this would cause an 
issue as it will use the state-store name when serializing and deserializing 
the keys and values.

Which Avro Serde are you using?

There is an example serde here:
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/utils/SpecificAvroSerializer.java

You could potentially use something like this, but on line 57 prefix the topic 
name with the applicationId (if it doesn't already have it).

Would you mind raising a JIRA for this? It seems like an issue other people are 
yet to encounter.

Thanks,
Damian


On Wed, 15 Mar 2017 at 13:05 Adrian McCague  wrote:

> Hi all,
>
> We are getting collisions with subject names in our schema registry 
> due to state stores that are holding Avro events:
>
> "KSTREAM-JOINOTHER-07-store-value",
>   "KSTREAM-JOINOTHER-06-store-value",
>   "KSTREAM-JOINOTHER-05-store-value",
>   "KSTREAM-OUTEROTHER-05-store-value",
>   "KSTREAM-OUTEROTHER-06-store-value",
>   "KSTREAM-JOINTHIS-05-store-value",
>   "KSTREAM-JOINTHIS-04-store-value",
>   "KSTREAM-JOINTHIS-06-store-value",
>
> As you can see these are not prepended by their app id so any 
> similarly constructed topology in another Streams application, that 
> has different events will lead to conflicting schemas. Is this a bug, 
> intended or user error?
>
> I can see an advantage of the schema registry getting involved here 
> but not if this happens, I see a few work arounds but unsure if any 
> are recommended or there is something I have missed:
>
>   1.  Use an ObjectSerde / Serializable (though then lose the Avro 
> compatibility goodness)
>   2.  Create a modified SpecificAvroSerde that includes the schema 
> with each message
>   3.  Anyway to influence the subject name?
>   4.  Some form of local persistent schema registry for this purpose
>
> I am not sure if anything changed with how Serdes were passed to 
> RocksDB - I intend to deepdive into the source later at my next opportunity.
> I did notice non avro messages in our topologies started throwing 
> exceptions (not instances of SpecificRecord) since we moved to 
> 0.10.2.0 so I suspect there was a plain Object serializer and now they 
> use the topology defaults if you do not supply a serde.
>
> Thanks
> Adrian
>


Re: Streams 0.10.2.0 + RocksDB + Avro

2017-03-15 Thread Damian Guy
Hi Adrian,

The state store names are local names hence don't have the applicationId
prefix, i.e., they are laid out on disk like so:
/state.dir/applicationId/task/state-store-name. Their corresponding
change-logs are prefixed with the applicationId.

However, i can see in the case of the schema-registry that this would cause
an issue as it will use the state-store name when serializing and
deserializing the keys and values.

Which Avro Serde are you using?

There is an example serde here:
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/utils/SpecificAvroSerializer.java

You could potentially use something like this, but on line 57 prefix the
topic name with the applicationId (if it doesn't already have it).

Would you mind raising a JIRA for this? It seems like an issue other people
are yet to encounter.

Thanks,
Damian


On Wed, 15 Mar 2017 at 13:05 Adrian McCague  wrote:

> Hi all,
>
> We are getting collisions with subject names in our schema registry due to
> state stores that are holding Avro events:
>
> "KSTREAM-JOINOTHER-07-store-value",
>   "KSTREAM-JOINOTHER-06-store-value",
>   "KSTREAM-JOINOTHER-05-store-value",
>   "KSTREAM-OUTEROTHER-05-store-value",
>   "KSTREAM-OUTEROTHER-06-store-value",
>   "KSTREAM-JOINTHIS-05-store-value",
>   "KSTREAM-JOINTHIS-04-store-value",
>   "KSTREAM-JOINTHIS-06-store-value",
>
> As you can see these are not prepended by their app id so any similarly
> constructed topology in another Streams application, that has different
> events will lead to conflicting schemas. Is this a bug, intended or user
> error?
>
> I can see an advantage of the schema registry getting involved here but
> not if this happens, I see a few work arounds but unsure if any are
> recommended or there is something I have missed:
>
>   1.  Use an ObjectSerde / Serializable (though then lose the Avro
> compatibility goodness)
>   2.  Create a modified SpecificAvroSerde that includes the schema with
> each message
>   3.  Anyway to influence the subject name?
>   4.  Some form of local persistent schema registry for this purpose
>
> I am not sure if anything changed with how Serdes were passed to RocksDB -
> I intend to deepdive into the source later at my next opportunity.
> I did notice non avro messages in our topologies started throwing
> exceptions (not instances of SpecificRecord) since we moved to 0.10.2.0 so
> I suspect there was a plain Object serializer and now they use the topology
> defaults if you do not supply a serde.
>
> Thanks
> Adrian
>


RE: Performance and Encryption

2017-03-15 Thread Nicolas MOTTE
Ok that makes sense, thanks !

The next question I have regarding performance is about the way Kafka writes in 
the data files.
I often hear Kafka is very performant because it writes in an append-only 
fashion.
So even with hard disk (not SSD) we get a great performance because it writes 
in sequence.

I could understand that if Kafka was only writing to one file.
But in reality it s writing to N files, N being the number of partitions hosted 
by the broker.
So even though it appends the data to each file, overall I assume it is not 
writing in sequence on the disk.

Am I wrong ?

-Original Message-
From: Tauzell, Dave [mailto:dave.tauz...@surescripts.com] 
Sent: 08 March 2017 22:09
To: users@kafka.apache.org
Subject: RE: Performance and Encryption

I think because the product batches messages which could be for different 
topics.

-Dave

-Original Message-
From: Nicolas MOTTE [mailto:nicolas.mo...@amadeus.com]
Sent: Wednesday, March 8, 2017 2:41 PM
To: users@kafka.apache.org
Subject: Performance and Encryption

Hi everyone,

I understand one of the reasons why Kafka is performant is by using zero-copy.

I often hear that when encryption is enabled, then Kafka has to copy the data 
in user space to decode the message, so it has a big impact on performance.

If it is true, I don t get why the message has to be decoded by Kafka. I would 
assume that whether the message is encrypted or not, Kafka simply receives it, 
appends it to the file, and when a consumer wants to read it, it simply reads 
at the right offset...

Also I m wondering if it s the case if we don t use keys (pure queuing system 
with key=null).

Cheers
Nico

This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.



Re: Trying to use Kafka Stream

2017-03-15 Thread Mina Aslani
Hi Eno,

Great finding! You were right! I had to change KAFKA_ADVERTISED_LISTENERS
to be PLAINTEXT://$(docker-machine ip ):
to make it work from IDE. Step 2 (pointing to
:
in my stream app) was already done.

Later, I'll try using CLI as mentioned here https://github.com/
confluentinc/examples/blob/3.2.x/kafka-streams/src/main/
java/io/confluent/examples/streams/WordCountLambdaExample.java#L55-L62 and
pointed out by Michael.

Thank you very much for your time and your prompt responses,
really appreciate it!

Have a wonderful day.

Best regards,
Mina

On Wed, Mar 15, 2017 at 4:38 AM, Eno Thereska 
wrote:

> Hi Mina,
>
> It might be that you need to set this property on the Kafka broker config
> file (server.properties):
> advertised.listeners=PLAINTEXT://your.host.name:9092  your.host.name:9092>
>
>
> The problem might be this: within docker you run Kafka and Kafka’s address
> is localhost:9092. Great. Then say you have another container or are
> running the streams app on your local laptop. If you point streams to
> localhost:9092 that is “not” where Kafka is running. So you need to point
> your streams app at the address of the container. That’s the second step.
> The first step is to have Kafka advertise that address to the streams app
> and that you do by setting the address above. Example:
>
> advertised.listeners=PLAINTEXT://123.45.67:9092
> 
>
> Then when you run the streams app you pass in 123.45.67:9092
> .
>
> Thanks
> Eno
>
> > On Mar 15, 2017, at 5:14 AM, Mina Aslani  wrote:
> >
> > Hi,
> > I just checked streams-wordcount-output topic using below command
> >
> > docker run \
> >
> >  --net=host \
> >
> >  --rm \
> >
> >  confluentinc/cp-kafka:3.2.0 \
> >
> >  kafka-console-consumer --bootstrap-server localhost:9092 \
> >
> >  --topic streams-wordcount-output \
> >
> >  --from-beginning \
> >
> >  --formatter kafka.tools.DefaultMessageFormatter \
> >
> >  --property print.key=true \
> >
> >  --property key.deserializer=org.apache.ka
> > fka.common.serialization.StringDeserializer \
> >
> >  --property value.deserializer=org.apache.
> > kafka.common.serialization.LongDeserializer
> >
> >
> > and it returns
> >
> > all 1
> > lead 1
> > to 1
> > hello 1
> > streams 2
> > join 1
> > kafka 3
> > summit 1
> >
> > Please note above result is when I tried  http://docs.confluent.i
> > o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> > docker-machine  ran /usr/bin/kafka-run-class
> org.apache.kafka.streams.examp
> > les.wordcount.WordCountDemo.
> >
> > How come running same program out of docker-machine does not output to
> the
> > output topic?
> > Should I make the program as jar and deploy to docker-machine and run it
> > using ./bin/kafka-run-class?
> >
> > Best regards,
> > Mina
> >
> >
> >
> > On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani 
> wrote:
> >
> >> I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#
> goal-
> >> of-this-quickstart
> >>
> >> and in docker-machine  ran /usr/bin/kafka-run-class
> >> org.apache.kafka.streams.examples.wordcount.WordCountDemo
> >>
> >> Running
> >>
> >> docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> >> kafka-console-consumer --bootstrap-server localhost:9092 --topic
> >> streams-wordcount-output --new-consumer --from-beginning
> >>
> >> shows 8 blank messages
> >>
> >> Is there any setting/configuration should be done as running the class
> in
> >> the docker-machine and running program outside the docker-machine does
> not
> >> return expected result!
> >>
> >> On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani 
> wrote:
> >>
> >>> And the port for kafka is 29092 and for zookeeper 32181.
> >>>
> >>> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani 
> >>> wrote:
> >>>
>  Hi,
> 
>  I forgot to add in my previous email 2 questions.
> 
>  To setup my env, shall I use https://raw.githubusercont
>  ent.com/confluentinc/cp-docker-images/master/examples/kafka-
>  single-node/docker-compose.yml instead or is there any other
>  docker-compose.yml (version 2 or 3) which is suggested to setup env?
> 
>  How can I check "whether streams (that is just an app) can reach
> Kafka"?
> 
>  Regards,
>  Mina
> 
>  On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani 
>  wrote:
> 
> > Hi Eno,
> >
> > Sorry! That is a typo!
> >
> > I have a docker-machine with different containers (setup as directed
> @
> > http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
> )
> >
> > docker ps --format "{{.Image}}: {{.Names}}"
> >
> > confluentinc/cp-kafka-connect:3.2.0: kafka-connect
> >
> > confluentinc/cp-enterprise-control-center:3.2.0: control-center
> >
> > confluentinc/cp-kafka-rest:3.2.0: kafka-rest
> >
> > 

Consumer group stable state

2017-03-15 Thread Igor Velichko

Hi all,

is it possible to determine on client's side when consumer group's state 
become stable in group coordinator? From what I see (Kafka 0.9.0.1), 
when a consumer subscribes to a topic, it's not immediately ready to 
receive messages because of rebalancing process in the broker. So I'd 
like to know when my consumer is completely initialized and is able to 
receive messages. It would be especially helpful for integration tests 
with in-memory Kafka cluster.



--
Best regards,
Igor Velichko



Re: Kafka connection to start from latest offset

2017-03-15 Thread Stephen Durfey
Yes, it is pretty coarse. I have a pull request open for supporting
overriding those settings at the connector level, I'm just waiting for it
to be pulled.

So, if any committers are interested in reviewing/pulling it for me, that
would be great :)

https://github.com/apache/kafka/pull/2548

On Wed, Mar 15, 2017 at 8:42 AM, Aaron Niskode-Dossett <
aniskodedoss...@etsy.com.invalid> wrote:

> Thank you Stephen!  That's a very coarse setting, as you note, since it's
> at the worker level, but I'll take it.
>
> -Aaron
>
> On Tue, Mar 14, 2017 at 8:07 PM Stephen Durfey  wrote:
>
> > Producer and consumer overrides used by the connect worker can be
> > overridden by prefixing the specific kafka config with either 'producer.'
> > or 'consumer.'. So, you should be able to set
> > 'consumer.auto.offset.reset=latest' in your worker config to do that.
> >
> >
> > http://docs.confluent.io/3.0.0/connect/userguide.html?
> highlight=override%20configuration#overriding-producer-consumer-settings
> >
> > On Tue, Mar 14, 2017 at 7:47 PM, Aaron Niskode-Dossett <
> > aniskodedoss...@etsy.com.invalid> wrote:
> >
> > > Is it possible to start a kafka connect instance that reads from the
> > > *latest* offset as opposed to the earliest?  I suppose this would be
> the
> > > equivalent of passing auto.offset.reset=earliest to a kafka consumer.
> > >
> > > More generally, is this something that specific implementations of the
> > > kafka connect API would have to be responsible for handling or should
> it
> > > exposed through the connect API?
> > >
> > > Thanks, Aaron
> > >
> >
>


Re: Kafka connection to start from latest offset

2017-03-15 Thread Aaron Niskode-Dossett
Thank you Stephen!  That's a very coarse setting, as you note, since it's
at the worker level, but I'll take it.

-Aaron

On Tue, Mar 14, 2017 at 8:07 PM Stephen Durfey  wrote:

> Producer and consumer overrides used by the connect worker can be
> overridden by prefixing the specific kafka config with either 'producer.'
> or 'consumer.'. So, you should be able to set
> 'consumer.auto.offset.reset=latest' in your worker config to do that.
>
>
> http://docs.confluent.io/3.0.0/connect/userguide.html?highlight=override%20configuration#overriding-producer-consumer-settings
>
> On Tue, Mar 14, 2017 at 7:47 PM, Aaron Niskode-Dossett <
> aniskodedoss...@etsy.com.invalid> wrote:
>
> > Is it possible to start a kafka connect instance that reads from the
> > *latest* offset as opposed to the earliest?  I suppose this would be the
> > equivalent of passing auto.offset.reset=earliest to a kafka consumer.
> >
> > More generally, is this something that specific implementations of the
> > kafka connect API would have to be responsible for handling or should it
> > exposed through the connect API?
> >
> > Thanks, Aaron
> >
>


Re: restart Kafka Streams application takes around 5 minutes

2017-03-15 Thread Sachin Mittal
rocksdb state store initialization may be taking up that time.
Whats the size of your rockksdb state directory. May be partitioning the
source topic, increasing the number of threads/instances processing the
source and reducing the time window of aggregation can help in reducing the
startup time.



On Wed, Mar 15, 2017 at 6:36 PM, Tianji Li  wrote:

> Hi there,
>
> In the experiments I am doing now, if I restart the streams application, I
> have to wait for around 5 minutes for some reason.
>
> I can see something in the Kafka logs:
>
> [2017-03-15 08:36:18,118] INFO [GroupCoordinator 0]: Preparing to
> restabilize group xxx-test25 with old generation 2
> (kafka.coordinator.GroupCoordinator)
> [2017-03-15 08:41:08,449] INFO [GroupCoordinator 0]: Stabilized group
> xxx-test25 generation 3 (kafka.coordinator.GroupCoordinator)
> [2017-03-15 08:41:08,473] INFO [GroupCoordinator 0]: Assignment received
> from leader for group xxx-test25 for generation 3
> (kafka.coordinator.GroupCoordinator)
>
> What is happening in these 5 minutes? How to reduce it?
>
> Thanks
> Tianji
>


restart Kafka Streams application takes around 5 minutes

2017-03-15 Thread Tianji Li
Hi there,

In the experiments I am doing now, if I restart the streams application, I
have to wait for around 5 minutes for some reason.

I can see something in the Kafka logs:

[2017-03-15 08:36:18,118] INFO [GroupCoordinator 0]: Preparing to
restabilize group xxx-test25 with old generation 2
(kafka.coordinator.GroupCoordinator)
[2017-03-15 08:41:08,449] INFO [GroupCoordinator 0]: Stabilized group
xxx-test25 generation 3 (kafka.coordinator.GroupCoordinator)
[2017-03-15 08:41:08,473] INFO [GroupCoordinator 0]: Assignment received
from leader for group xxx-test25 for generation 3
(kafka.coordinator.GroupCoordinator)

What is happening in these 5 minutes? How to reduce it?

Thanks
Tianji


Streams 0.10.2.0 + RocksDB + Avro

2017-03-15 Thread Adrian McCague
Hi all,

We are getting collisions with subject names in our schema registry due to 
state stores that are holding Avro events:

"KSTREAM-JOINOTHER-07-store-value",
  "KSTREAM-JOINOTHER-06-store-value",
  "KSTREAM-JOINOTHER-05-store-value",
  "KSTREAM-OUTEROTHER-05-store-value",
  "KSTREAM-OUTEROTHER-06-store-value",
  "KSTREAM-JOINTHIS-05-store-value",
  "KSTREAM-JOINTHIS-04-store-value",
  "KSTREAM-JOINTHIS-06-store-value",

As you can see these are not prepended by their app id so any similarly 
constructed topology in another Streams application, that has different events 
will lead to conflicting schemas. Is this a bug, intended or user error?

I can see an advantage of the schema registry getting involved here but not if 
this happens, I see a few work arounds but unsure if any are recommended or 
there is something I have missed:

  1.  Use an ObjectSerde / Serializable (though then lose the Avro 
compatibility goodness)
  2.  Create a modified SpecificAvroSerde that includes the schema with each 
message
  3.  Anyway to influence the subject name?
  4.  Some form of local persistent schema registry for this purpose

I am not sure if anything changed with how Serdes were passed to RocksDB - I 
intend to deepdive into the source later at my next opportunity.
I did notice non avro messages in our topologies started throwing exceptions 
(not instances of SpecificRecord) since we moved to 0.10.2.0 so I suspect there 
was a plain Object serializer and now they use the topology defaults if you do 
not supply a serde.

Thanks
Adrian


Kafka Stream: RocksDBKeyValueStoreSupplier performance

2017-03-15 Thread Tianji Li
Hi there,

It seems that the RocksDB state store is quite slow in my case and I wonder
if I did anything wrong.

I have a topic, that I groupBy() and then aggregate() 50 times. That is, I
will create 50 results topics and a lot more changelog and repartition
topics.

There are a few things that are weird and here I report one, which is the
State store speed.

If I use:

  StateStoreSupplier stateStoreSupplier = Stores.create(storeName)
.withKeys(stringSerde)
.withValues(avroSerde)
.inMemory()
.build();

Then processing 1 millions records takes around 5 minutes on my coding
computer.

If I use:

  StateStoreSupplier stateStoreSupplier = Stores.create(storeName)
.withKeys(stringSerde)
.withValues(avroSerde)
.persistent()
.disableLogging()
.enableCaching()
.build();

Processing the same 1 million records takes around 10 minutes.

I believe in the first case, changelog is backed up to Kafka and in the
second case, only RocketsDB is used.

But why the RocketsDB is so slow?

Eventually, I am hoping to do windowed aggregation and it seems I have to
use RocketsDB, but given the performance, I am hesitating.

Thanks
Tianji


Re: Trying to use Kafka Stream

2017-03-15 Thread Michael Noll
Ah, I see.

> However, running the program (e.g. https://github.com/
confluentinc/examples/blob/3.2.x/kafka-streams/src/main/
java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181) in
my IDE was not and still is not working.

Another thing to try is to run the program above from the CLI, not from
your IDE (perhaps your IDE setup is wonky).
That's described in step 3 of the program's usage instructions [1].

[1]
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java#L55-L62



On Wed, Mar 15, 2017 at 12:56 PM, Mina Aslani  wrote:

> Hi Michael,
>
> I was aware that the output should be written in a kafka topic not the
> console.
>
> To understand if streams can reach the kafka as Eno asked in earlier email
> I found http://docs.confluent.io/3.2.0/streams/quickstart.html
> #goal-of-this-quickstart and went through the steps mentioned and ran
> /usr/bin/kafka-run-class
> org.apache.kafka.streams.examples.wordcount.WordCountDemo which works.
>
> However, running the program (e.g. https://github.com/
> confluentinc/examples/blob/3.2.x/kafka-streams/src/main/
> java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181)
> in my IDE was not and still is not working.
>
> Best regards,
> Mina
>
>
> On Wed, Mar 15, 2017 at 4:43 AM, Michael Noll 
> wrote:
>
> > Mina,
> >
> > in your original question you wrote:
> >
> > > However, I do not see the word count when I try to run below example.
> > Looks like that it does not connect to Kafka.
> >
> > The WordCount demo example writes its output to Kafka only --  it *does
> > not* write any results to the console/STDOUT.
> >
> > From what I can tell the WordCount example ran correctly because, in your
> > latest email, you showed the output of the console consumer (which *does*
> > write to the console), and that output is a list of words and counts:
> >
> > > all 1
> > > lead 1
> > > to 1
> > > hello 1
> > > streams 2
> > > join 1
> > > kafka 3
> > > summit 1
> >
> > In other words, I think everything you did was correct, and Kafka too was
> > working correctly.  You were simply unaware that the WordCount example
> does
> > not write its output to the console.
> >
> > Best,
> > Michael
> >
> >
> >
> >
> >
> > On Wed, Mar 15, 2017 at 6:14 AM, Mina Aslani 
> wrote:
> >
> > > Hi,
> > > I just checked streams-wordcount-output topic using below command
> > >
> > > docker run \
> > >
> > >   --net=host \
> > >
> > >   --rm \
> > >
> > >   confluentinc/cp-kafka:3.2.0 \
> > >
> > >   kafka-console-consumer --bootstrap-server localhost:9092 \
> > >
> > >   --topic streams-wordcount-output \
> > >
> > >   --from-beginning \
> > >
> > >   --formatter kafka.tools.DefaultMessageFormatter \
> > >
> > >   --property print.key=true \
> > >
> > >   --property key.deserializer=org.apache.ka
> > > fka.common.serialization.StringDeserializer \
> > >
> > >   --property value.deserializer=org.apache.
> > > kafka.common.serialization.LongDeserializer
> > >
> > >
> > > and it returns
> > >
> > > all 1
> > > lead 1
> > > to 1
> > > hello 1
> > > streams 2
> > > join 1
> > > kafka 3
> > > summit 1
> > >
> > > Please note above result is when I tried  http://docs.confluent.i
> > > o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> > > docker-machine  ran /usr/bin/kafka-run-class
> > org.apache.kafka.streams.examp
> > > les.wordcount.WordCountDemo.
> > >
> > > How come running same program out of docker-machine does not output to
> > the
> > > output topic?
> > > Should I make the program as jar and deploy to docker-machine and run
> it
> > > using ./bin/kafka-run-class?
> > >
> > > Best regards,
> > > Mina
> > >
> > >
> > >
> > > On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani 
> > > wrote:
> > >
> > > > I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#
> > > goal-
> > > > of-this-quickstart
> > > >
> > > > and in docker-machine  ran /usr/bin/kafka-run-class
> > > > org.apache.kafka.streams.examples.wordcount.WordCountDemo
> > > >
> > > > Running
> > > >
> > > > docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> > > > kafka-console-consumer --bootstrap-server localhost:9092 --topic
> > > > streams-wordcount-output --new-consumer --from-beginning
> > > >
> > > > shows 8 blank messages
> > > >
> > > > Is there any setting/configuration should be done as running the
> class
> > in
> > > > the docker-machine and running program outside the docker-machine
> does
> > > not
> > > > return expected result!
> > > >
> > > > On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani 
> > > wrote:
> > > >
> > > >> And the port for kafka is 29092 and for zookeeper 32181.
> > > >>
> > > >> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani 
> > > >> wrote:
> > > >>
> > > >>> Hi,
> > > >>>
> > > >>> I 

Re: Trying to use Kafka Stream

2017-03-15 Thread Mina Aslani
Hi Michael,

I was aware that the output should be written in a kafka topic not the
console.

To understand if streams can reach the kafka as Eno asked in earlier email
I found http://docs.confluent.io/3.2.0/streams/quickstart.html
#goal-of-this-quickstart and went through the steps mentioned and ran
/usr/bin/kafka-run-class
org.apache.kafka.streams.examples.wordcount.WordCountDemo which works.

However, running the program (e.g. https://github.com/
confluentinc/examples/blob/3.2.x/kafka-streams/src/main/
java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181)
in my IDE was not and still is not working.

Best regards,
Mina


On Wed, Mar 15, 2017 at 4:43 AM, Michael Noll  wrote:

> Mina,
>
> in your original question you wrote:
>
> > However, I do not see the word count when I try to run below example.
> Looks like that it does not connect to Kafka.
>
> The WordCount demo example writes its output to Kafka only --  it *does
> not* write any results to the console/STDOUT.
>
> From what I can tell the WordCount example ran correctly because, in your
> latest email, you showed the output of the console consumer (which *does*
> write to the console), and that output is a list of words and counts:
>
> > all 1
> > lead 1
> > to 1
> > hello 1
> > streams 2
> > join 1
> > kafka 3
> > summit 1
>
> In other words, I think everything you did was correct, and Kafka too was
> working correctly.  You were simply unaware that the WordCount example does
> not write its output to the console.
>
> Best,
> Michael
>
>
>
>
>
> On Wed, Mar 15, 2017 at 6:14 AM, Mina Aslani  wrote:
>
> > Hi,
> > I just checked streams-wordcount-output topic using below command
> >
> > docker run \
> >
> >   --net=host \
> >
> >   --rm \
> >
> >   confluentinc/cp-kafka:3.2.0 \
> >
> >   kafka-console-consumer --bootstrap-server localhost:9092 \
> >
> >   --topic streams-wordcount-output \
> >
> >   --from-beginning \
> >
> >   --formatter kafka.tools.DefaultMessageFormatter \
> >
> >   --property print.key=true \
> >
> >   --property key.deserializer=org.apache.ka
> > fka.common.serialization.StringDeserializer \
> >
> >   --property value.deserializer=org.apache.
> > kafka.common.serialization.LongDeserializer
> >
> >
> > and it returns
> >
> > all 1
> > lead 1
> > to 1
> > hello 1
> > streams 2
> > join 1
> > kafka 3
> > summit 1
> >
> > Please note above result is when I tried  http://docs.confluent.i
> > o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> > docker-machine  ran /usr/bin/kafka-run-class
> org.apache.kafka.streams.examp
> > les.wordcount.WordCountDemo.
> >
> > How come running same program out of docker-machine does not output to
> the
> > output topic?
> > Should I make the program as jar and deploy to docker-machine and run it
> > using ./bin/kafka-run-class?
> >
> > Best regards,
> > Mina
> >
> >
> >
> > On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani 
> > wrote:
> >
> > > I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#
> > goal-
> > > of-this-quickstart
> > >
> > > and in docker-machine  ran /usr/bin/kafka-run-class
> > > org.apache.kafka.streams.examples.wordcount.WordCountDemo
> > >
> > > Running
> > >
> > > docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> > > kafka-console-consumer --bootstrap-server localhost:9092 --topic
> > > streams-wordcount-output --new-consumer --from-beginning
> > >
> > > shows 8 blank messages
> > >
> > > Is there any setting/configuration should be done as running the class
> in
> > > the docker-machine and running program outside the docker-machine does
> > not
> > > return expected result!
> > >
> > > On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani 
> > wrote:
> > >
> > >> And the port for kafka is 29092 and for zookeeper 32181.
> > >>
> > >> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani 
> > >> wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> I forgot to add in my previous email 2 questions.
> > >>>
> > >>> To setup my env, shall I use https://raw.githubusercont
> > >>> ent.com/confluentinc/cp-docker-images/master/examples/kafka-
> > >>> single-node/docker-compose.yml instead or is there any other
> > >>> docker-compose.yml (version 2 or 3) which is suggested to setup env?
> > >>>
> > >>> How can I check "whether streams (that is just an app) can reach
> > Kafka"?
> > >>>
> > >>> Regards,
> > >>> Mina
> > >>>
> > >>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani 
> > >>> wrote:
> > >>>
> >  Hi Eno,
> > 
> >  Sorry! That is a typo!
> > 
> >  I have a docker-machine with different containers (setup as
> directed @
> >  http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickst
> art.html)
> > 
> >  docker ps --format "{{.Image}}: {{.Names}}"
> > 
> >  confluentinc/cp-kafka-connect:3.2.0: kafka-connect
> > 
> >  

Re: Lost ISR when upgrading kafka from 0.10.0.1 to any newer version like 0.10.1.0 or 0.10.2.0

2017-03-15 Thread Ismael Juma
Looking at the output you pasted, broker `0` was the one being upgraded? A
few things to check:

1. Does broker `0` connect to the other brokers after the restart
2. Is broker `0` able to connect to zookeeper
3. Does everything look OK in the controller and state-change logs in the
controller node
4. Did you allow enough time for the restarted broker to rejoin the ISR

Ismael

On Tue, Mar 14, 2017 at 1:37 PM, Thomas KIEFFER <
thomas.kief...@olamobile.com.invalid> wrote:

> Yes, I've set the inter.broker.protocol.version=0.10.0 before restarting
> each broker on a previous update. Clusters currently run with this config.
>
> On 03/14/2017 12:34 PM, Ismael Juma wrote:
>
> So, to double-check, you set inter.broker.protocol.version=0.10.0 before
> bouncing each broker?
>
> On Tue, Mar 14, 2017 at 11:22 AM, Thomas KIEFFER 
>  wrote:
>
>
> Hello Ismael,
>
> Thank you for your feedback.
>
> Yes I've done  this changes on a previous upgrade and set them accordingly
> with the new version when trying to do the upgrade.
>
> inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0,
> 0.10.0 or 0.10.1).
> log.message.format.version=CURRENT_KAFKA_VERSION (See potential
> performance impact following the upgrade for the details on what this
> configuration does.)
> On 03/14/2017 11:26 AM, Ismael Juma wrote:
>
> Hi Thomas,
>
> Did you follow the 
> instructions:https://kafka.apache.org/documentation/#upgrade
>
> Ismael
>
> On Mon, Mar 13, 2017 at 9:43 AM, Thomas KIEFFER 
>   
> wrote:
>
>
> I'm trying to perform an upgrade of 2 kafka cluster of 5 instances, When
> I'm doing the switch between 0.10.0.1 and 0.10.1.0 or 0.10.2.0, I saw that
> ISR is lost when I upgrade one instance. I didn't find out yet anything
> relevant about this problem, logs seems just fine.
> eg.
>
> kafka-topics.sh --describe --zookeeper kazoo002.#.prv --topic redirects
> Topic:redirectsPartitionCount:6ReplicationFactor:2
> Configs:retention.bytes=10737418240
> Topic: redirectsPartition: 0Leader: 1Replicas: 1,2Isr:
> 1,2
> Topic: redirectsPartition: 1Leader: 2Replicas: 2,0Isr:
> 2
> Topic: redirectsPartition: 2Leader: 1Replicas: 0,1Isr:
> 1
> Topic: redirectsPartition: 3Leader: 1Replicas: 1,0Isr:
> 1
> Topic: redirectsPartition: 4Leader: 2Replicas: 2,1Isr:
> 2,1
> Topic: redirectsPartition: 5Leader: 2Replicas: 0,2Isr:
> 2
>
> It run with Zookeeper 3.4.6.
>
> As those clusters are in production, I didn't try to migrate more than 1
> instance after spotting this ISR problem, and then rollback to the original
> version 0.10.0.1.
>
> Any update about this would be greatly receive.
>
> --
>  
>  
>  
> 
>
>
> Thomas Kieffer
>
> Senior Linux Systems Administrator
>
> Skype: thomas.kieffer.corporate | Phone: (+352) 691444263 
> <+352%20691%20444%20263> <+352%20691%20444%20263>
> <+352%20691%20444%20263> | www.olamobile.com
>
> The information transmitted is intended only for the person or entity to
> which it is addressed and may contain confidential and/or privileged
> material. Any review, retransmission, dissemination or other use of, or
> taking of any action in reliance upon, this information by persons or
> entities other than the intended recipient is prohibited. If you received
> this in error, please contact the sender and delete the material from any
> computer.
>
>
> -- 
> 
>
> Thomas Kieffer
>
> Senior Linux Systems Administrator
>
> Skype: thomas.kieffer.corporate | Phone: (+352) 691444263 
> <+352%20691%20444%20263>
> <+352%20691%20444%20263> | www.olamobile.com
>
> The information transmitted is intended only for the person or entity to
> which it is addressed and may contain confidential and/or privileged
> material. Any review, retransmission, dissemination or other use of, or
> taking of any action in reliance upon, this information by persons or
> entities other than the intended recipient is prohibited. If you received
> this in error, please contact the sender and delete the material from any
> computer.
>
>
>
> --
> 
>
> Thomas Kieffer
>
> Senior Linux Systems Administrator
>
> Skype: thomas.kieffer.corporate | Phone: (+352) 691444263
> <+352%20691%20444%20263> | www.olamobile.com
>
> The information transmitted is intended only for the person or entity to
> which it is addressed and may contain confidential and/or privileged
> material. Any review, retransmission, dissemination or other use of, or
> taking of any action 

Re: Trying to use Kafka Stream

2017-03-15 Thread Eno Thereska
Hi Mina,

It might be that you need to set this property on the Kafka broker config file 
(server.properties):
advertised.listeners=PLAINTEXT://your.host.name:9092 



The problem might be this: within docker you run Kafka and Kafka’s address is 
localhost:9092. Great. Then say you have another container or are running the 
streams app on your local laptop. If you point streams to localhost:9092 that 
is “not” where Kafka is running. So you need to point your streams app at the 
address of the container. That’s the second step. The first step is to have 
Kafka advertise that address to the streams app and that you do by setting the 
address above. Example:

advertised.listeners=PLAINTEXT://123.45.67:9092 

Then when you run the streams app you pass in 123.45.67:9092 
.

Thanks
Eno

> On Mar 15, 2017, at 5:14 AM, Mina Aslani  wrote:
> 
> Hi,
> I just checked streams-wordcount-output topic using below command
> 
> docker run \
> 
>  --net=host \
> 
>  --rm \
> 
>  confluentinc/cp-kafka:3.2.0 \
> 
>  kafka-console-consumer --bootstrap-server localhost:9092 \
> 
>  --topic streams-wordcount-output \
> 
>  --from-beginning \
> 
>  --formatter kafka.tools.DefaultMessageFormatter \
> 
>  --property print.key=true \
> 
>  --property key.deserializer=org.apache.ka
> fka.common.serialization.StringDeserializer \
> 
>  --property value.deserializer=org.apache.
> kafka.common.serialization.LongDeserializer
> 
> 
> and it returns
> 
> all 1
> lead 1
> to 1
> hello 1
> streams 2
> join 1
> kafka 3
> summit 1
> 
> Please note above result is when I tried  http://docs.confluent.i
> o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> docker-machine  ran /usr/bin/kafka-run-class org.apache.kafka.streams.examp
> les.wordcount.WordCountDemo.
> 
> How come running same program out of docker-machine does not output to the
> output topic?
> Should I make the program as jar and deploy to docker-machine and run it
> using ./bin/kafka-run-class?
> 
> Best regards,
> Mina
> 
> 
> 
> On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani  wrote:
> 
>> I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#goal-
>> of-this-quickstart
>> 
>> and in docker-machine  ran /usr/bin/kafka-run-class
>> org.apache.kafka.streams.examples.wordcount.WordCountDemo
>> 
>> Running
>> 
>> docker run --net=host --rm confluentinc/cp-kafka:3.2.0
>> kafka-console-consumer --bootstrap-server localhost:9092 --topic
>> streams-wordcount-output --new-consumer --from-beginning
>> 
>> shows 8 blank messages
>> 
>> Is there any setting/configuration should be done as running the class in
>> the docker-machine and running program outside the docker-machine does not
>> return expected result!
>> 
>> On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani  wrote:
>> 
>>> And the port for kafka is 29092 and for zookeeper 32181.
>>> 
>>> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani 
>>> wrote:
>>> 
 Hi,
 
 I forgot to add in my previous email 2 questions.
 
 To setup my env, shall I use https://raw.githubusercont
 ent.com/confluentinc/cp-docker-images/master/examples/kafka-
 single-node/docker-compose.yml instead or is there any other
 docker-compose.yml (version 2 or 3) which is suggested to setup env?
 
 How can I check "whether streams (that is just an app) can reach Kafka"?
 
 Regards,
 Mina
 
 On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani 
 wrote:
 
> Hi Eno,
> 
> Sorry! That is a typo!
> 
> I have a docker-machine with different containers (setup as directed @
> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)
> 
> docker ps --format "{{.Image}}: {{.Names}}"
> 
> confluentinc/cp-kafka-connect:3.2.0: kafka-connect
> 
> confluentinc/cp-enterprise-control-center:3.2.0: control-center
> 
> confluentinc/cp-kafka-rest:3.2.0: kafka-rest
> 
> confluentinc/cp-schema-registry:3.2.0: schema-registry
> 
> confluentinc/cp-kafka:3.2.0: kafka
> 
> confluentinc/cp-zookeeper:3.2.0: zookeeper
> 
> I used example @ https://github.com/confluent
> inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
> uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
> followed the same steps.
> 
> When I run below command in docker-machine, I see the messages in
> TextLinesTopic.
> 
> docker run --net=host --rm confluentinc/cp-kafka:3.2.0 
> kafka-console-consumer
> --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer
> --from-beginning
> 
> hello kafka streams
> 
> all streams lead to kafka
> 
> join kafka summit
> 
> test1
> 
> test2
> 
> test3
> 
> test4
> 
> Running above command for WordsWithCountsTopic 

Re: Trying to use Kafka Stream

2017-03-15 Thread Michael Noll
Mina,

in your original question you wrote:

> However, I do not see the word count when I try to run below example.
Looks like that it does not connect to Kafka.

The WordCount demo example writes its output to Kafka only --  it *does
not* write any results to the console/STDOUT.

>From what I can tell the WordCount example ran correctly because, in your
latest email, you showed the output of the console consumer (which *does*
write to the console), and that output is a list of words and counts:

> all 1
> lead 1
> to 1
> hello 1
> streams 2
> join 1
> kafka 3
> summit 1

In other words, I think everything you did was correct, and Kafka too was
working correctly.  You were simply unaware that the WordCount example does
not write its output to the console.

Best,
Michael





On Wed, Mar 15, 2017 at 6:14 AM, Mina Aslani  wrote:

> Hi,
> I just checked streams-wordcount-output topic using below command
>
> docker run \
>
>   --net=host \
>
>   --rm \
>
>   confluentinc/cp-kafka:3.2.0 \
>
>   kafka-console-consumer --bootstrap-server localhost:9092 \
>
>   --topic streams-wordcount-output \
>
>   --from-beginning \
>
>   --formatter kafka.tools.DefaultMessageFormatter \
>
>   --property print.key=true \
>
>   --property key.deserializer=org.apache.ka
> fka.common.serialization.StringDeserializer \
>
>   --property value.deserializer=org.apache.
> kafka.common.serialization.LongDeserializer
>
>
> and it returns
>
> all 1
> lead 1
> to 1
> hello 1
> streams 2
> join 1
> kafka 3
> summit 1
>
> Please note above result is when I tried  http://docs.confluent.i
> o/3.2.0/streams/quickstart.html#goal-of-this-quickstart and in
> docker-machine  ran /usr/bin/kafka-run-class org.apache.kafka.streams.examp
> les.wordcount.WordCountDemo.
>
> How come running same program out of docker-machine does not output to the
> output topic?
> Should I make the program as jar and deploy to docker-machine and run it
> using ./bin/kafka-run-class?
>
> Best regards,
> Mina
>
>
>
> On Tue, Mar 14, 2017 at 11:11 PM, Mina Aslani 
> wrote:
>
> > I even tried http://docs.confluent.io/3.2.0/streams/quickstart.html#
> goal-
> > of-this-quickstart
> >
> > and in docker-machine  ran /usr/bin/kafka-run-class
> > org.apache.kafka.streams.examples.wordcount.WordCountDemo
> >
> > Running
> >
> > docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> > kafka-console-consumer --bootstrap-server localhost:9092 --topic
> > streams-wordcount-output --new-consumer --from-beginning
> >
> > shows 8 blank messages
> >
> > Is there any setting/configuration should be done as running the class in
> > the docker-machine and running program outside the docker-machine does
> not
> > return expected result!
> >
> > On Tue, Mar 14, 2017 at 9:56 PM, Mina Aslani 
> wrote:
> >
> >> And the port for kafka is 29092 and for zookeeper 32181.
> >>
> >> On Tue, Mar 14, 2017 at 9:06 PM, Mina Aslani 
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> I forgot to add in my previous email 2 questions.
> >>>
> >>> To setup my env, shall I use https://raw.githubusercont
> >>> ent.com/confluentinc/cp-docker-images/master/examples/kafka-
> >>> single-node/docker-compose.yml instead or is there any other
> >>> docker-compose.yml (version 2 or 3) which is suggested to setup env?
> >>>
> >>> How can I check "whether streams (that is just an app) can reach
> Kafka"?
> >>>
> >>> Regards,
> >>> Mina
> >>>
> >>> On Tue, Mar 14, 2017 at 9:00 PM, Mina Aslani 
> >>> wrote:
> >>>
>  Hi Eno,
> 
>  Sorry! That is a typo!
> 
>  I have a docker-machine with different containers (setup as directed @
>  http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html)
> 
>  docker ps --format "{{.Image}}: {{.Names}}"
> 
>  confluentinc/cp-kafka-connect:3.2.0: kafka-connect
> 
>  confluentinc/cp-enterprise-control-center:3.2.0: control-center
> 
>  confluentinc/cp-kafka-rest:3.2.0: kafka-rest
> 
>  confluentinc/cp-schema-registry:3.2.0: schema-registry
> 
>  confluentinc/cp-kafka:3.2.0: kafka
> 
>  confluentinc/cp-zookeeper:3.2.0: zookeeper
> 
>  I used example @ https://github.com/confluent
>  inc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confl
>  uent/examples/streams/WordCountLambdaExample.java#L178-L181 and
>  followed the same steps.
> 
>  When I run below command in docker-machine, I see the messages in
>  TextLinesTopic.
> 
>  docker run --net=host --rm confluentinc/cp-kafka:3.2.0
> kafka-console-consumer
>  --bootstrap-server localhost:29092 --topic TextLinesTopic
> --new-consumer
>  --from-beginning
> 
>  hello kafka streams
> 
>  all streams lead to kafka
> 
>  join kafka summit
> 
>  test1
> 
>  test2
> 
>  test3
> 
>  test4
> 
>  Running 

Re: Kafka Retention Policy to Indefinite

2017-03-15 Thread Joe San
>
> I am saying that replication quotas will mitigate one of the potential
> downsides of setting an infinite retention policy.


I was just interested in all of the possible potential downsides! Could you
please point me to a documentation that has more information on this?

On Tue, Mar 14, 2017 at 7:07 PM, Hans Jespersen  wrote:

> I am saying that replication quotas will mitigate one of the potential
> downsides of setting an infinite retention policy.
>
> There is no clear set yes/no best practice rule for setting an extremely
> large retention policy. It is clearly a valid configuration and there are
> people who run this way.
>
> The issues have more to do will the amount of data you expect to be stored
> over the life of the system. If you have a Kafka cluster with petabytes of
> data in it and a consumer comes along and blindly consumes from the
> beginning, they will be getting a lot of data. So much so that this might
> be considered an anti-pattern because their apps might not behave as they
> expect and the network bandwidth used by lots of clients operating this way
> may be considered bad practice.
>
> Another way to avoid collecting too much data is to use compacted topics,
> which are a special kind of topic that keeps the latest value for each key
> forever, but removes the older messages with the same key in order to
> reduce the total about of messages stored.
>
> How much data do you expect to store in your largest topic over the life of
> the cluster?
>
> -hans
>
>
>
>
>
> /**
>  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>  * h...@confluent.io (650)924-2670
>  */
>
> On Tue, Mar 14, 2017 at 10:36 AM, Joe San  wrote:
>
> > So that means with replication quotas, I can set the retention policy to
> be
> > infinite?
> >
> > On Tue, Mar 14, 2017 at 6:25 PM, Hans Jespersen 
> wrote:
> >
> > > You might want to use the new replication quotas mechanism (i.e.
> network
> > > throttling) to make sure that replication traffic doesn't negatively
> > impact
> > > your production traffic.
> > >
> > > See for details:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 73+Replication+Quotas
> > >
> > > This feature was added in 0.10.1
> > >
> > > -hans
> > >
> > > /**
> > >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> > >  * h...@confluent.io (650)924-2670
> > >  */
> > >
> > > On Tue, Mar 14, 2017 at 10:09 AM, Joe San 
> > wrote:
> > >
> > > > Dear Kafka Users,
> > > >
> > > > What are the arguments against setting the retention plociy on a
> Kafka
> > > > topic to infinite? I was in an interesting discussion with one of my
> > > > colleagues where he was suggesting to set the retention policy for a
> > > topic
> > > > to be indefinite.
> > > >
> > > > So how does this play up when adding new broker partitions? Say, I
> have
> > > > accumulated in my topic some gigabytes of data and now I realize
> that I
> > > > have to scale up by adding another partition. Now is this going to
> pose
> > > me
> > > > a problem? The partition rebalance has to happen and I'm not sure
> what
> > > the
> > > > implications are with rebalancing a partition that has gigabytes of
> > data.
> > > >
> > > > Any thoughts on this?
> > > >
> > > > Thanks and Regards,
> > > > Jothi
> > > >
> > >
> >
>