Re: RocksDB Error on partition assignment

2017-07-28 Thread Sameer Kumar
Hi Guozhang,

I am using 10.2.1.

-Sameer.

On Sat, Jul 29, 2017 at 12:05 AM, Guozhang Wang  wrote:

> Sameer,
>
> This bug should be already fixed in trunk.
>
> Which version of Kafka Streams are you running with? We can consider
> backport it and have a bug-fix release if it turns out to be a common
> issue.
>
>
> Guozhang
>
>
> On Fri, Jul 28, 2017 at 4:57 AM, Damian Guy  wrote:
>
> > It is due to a bug. You should set
> > StreamsConfig.STATE_DIR_CLEANUP_DELAY_MS_CONFIG to Long.MAX_VALUE -
> i.e.,
> > disabling it.
> >
> > On Fri, 28 Jul 2017 at 10:38 Sameer Kumar 
> wrote:
> >
> > > Hi,
> > >
> > > I am facing this error, no clue why this occurred. No other exception
> in
> > > stacktrace was found.
> > >
> > > Only thing different I did was I ran kafka streams jar on machine2 a
> > couple
> > > of mins after i ran it on machine1.
> > >
> > > Please search for this string in the log below:-
> > > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > > LICSp-4-25k failed on partition assignment
> > >
> > >
> > > 2017-07-28 14:55:51 INFO  StateDirectory:213 - Deleting obsolete state
> > > directory 2_43 for task 2_43
> > > 2017-07-28 14:55:51 INFO  StateDirectory:213 - Deleting obsolete state
> > > directory 1_29 for task 1_29
> > > 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> > > directory 2_22 for task 2_22
> > > 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> > > directory 0_9 for task 0_9
> > > 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> > > directory 0_49 for task 0_49
> > > 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> > > directory 2_27 for task 2_27
> > > 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> > > directory 2_32 for task 2_32
> > > 2017-07-28 14:55:52 INFO  StreamThread:767 - stream-thread
> > [StreamThread-7]
> > > Committing all tasks because the commit interval 5000ms has elapsed
> > > 2017-07-28 14:55:52 INFO  StreamThread:805 - stream-thread
> > [StreamThread-7]
> > > Committing task StreamTask 0_1
> > > 2017-07-28 14:55:52 ERROR StreamThread:813 - stream-thread
> > [StreamThread-2]
> > > Failed to commit StreamTask 1_35 state:
> > > org.apache.kafka.streams.errors.ProcessorStateException: task [1_35]
> > Failed
> > > to flush state store lic3-deb-ci-25k
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.flush(ProcessorStateManager.java:337)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask$1.run(StreamTask.java:72)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:188)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:280)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> > StreamThread.java:807)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> > StreamThread.java:794)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> > StreamThread.java:769)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:647)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:361)
> > > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
> > Error
> > > while executing flush from store lic3-deb-ci-25k-201707280900
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > flushInternal(RocksDBStore.java:354)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > flush(RocksDBStore.java:345)
> > > at
> > > org.apache.kafka.streams.state.internals.Segments.
> > flush(Segments.java:138)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.
> > flush(RocksDBSegmentedBytesStore.java:117)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.WrappedStateStore$
> > AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.
> > flush(MeteredSegmentedBytesStore.java:111)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.flush(
> > RocksDBWindowStore.java:92)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.CachingWindowStore.flush(
> > CachingWindowStore.java:120)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.flush(ProcessorStateManager.java:335)
> > > ... 8 more
> > > Caused by: org.rocksdb.RocksDBException: s
> > > at org.rocksdb.RocksDB.flush(Native Method)
> > > at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
> > > at
> > >
> > > 

Open file descriptors on a broker

2017-07-28 Thread Sunil Parmar
Environment
CDH 5.7.
Kafka 0.9 ( Cloudera)

Our broker ( Cloudera manager) is warning us about open file descriptors on
the cluster. It has around 17K file descriptors open. There is a
configuration in Cloudera manager to change threshold for warning and
critical number of file descriptors open at given time. We can always fix
it by doing it but not sure if it's the right way to deal with this warning.

Is there way to ( roughly) calculate what should we set this thresholds (
allowable open File descriptors ) to ? What affects open file descriptors
topic partitions, small batch size, # consumer, # producers ?
What makes an broker to leave a file open read/write or both ? Some
insights might help understand this.
We always see upward trend in # of file descriptors on broker. Does it ever
go down ? When ?

Also, although we’re using 0.9 Kafka right now, I noticed this bug
https://issues.apache.org/jira/browse/KAFKA-3619 which appears to found and
fixed in 0.10. Can someone confirm if this is not an issue in 0.9.

Thanks,
Sunil Parmar


Re: Kafka streams regex match

2017-07-28 Thread Shekar Tippur
Damien,

Here is a public gist:
https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8

- Shekar

On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy  wrote:

> It might be easier if you make a github gist with your code. It is quite
> difficult to see what is happening in an email.
>
> Cheers,
> Damian
> On Fri, 28 Jul 2017 at 19:22, Shekar Tippur  wrote:
>
> > Thanks a lot Damien.
> > I am able to get to see if the join worked (using foreach). I tried to
> add
> > the logic to query the store after starting the streams:
> > Looks like the code is not getting there. Here is the modified code:
> >
> > KafkaStreams streams = new KafkaStreams(builder, props);
> >
> > streams.start();
> >
> >
> > parser.foreach(new ForeachAction() {
> > @Override
> > public void apply(String key, JsonNode value) {
> > System.out.println(key + ": " + value);
> > if (value == null){
> > System.out.println("null match");
> > ReadOnlyKeyValueStore keyValueStore =
> > null;
> > try {
> > keyValueStore =
> > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > QueryableStoreTypes.keyValueStore(), streams);
> > } catch (InterruptedException e) {
> > e.printStackTrace();
> > }
> >
> > KeyValueIterator  kviterator =
> > keyValueStore.range("test_nod","test_node");
> > }
> > }
> > });
> >
> >
> > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy 
> wrote:
> >
> > > Hi,
> > > The store won't be queryable until after you have called
> streams.start().
> > > No stores have been created until the application is up and running and
> > > they are dependent on the underlying partitions.
> > >
> > > To check that a stateful operation has produced a result you would
> > normally
> > > add another operation after the join, i.e.,
> > > stream.join(other,...).foreach(..) or stream.join(other,...).to("
> topic")
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur  wrote:
> > >
> > > > One more thing.. How do we check if the stateful join operation
> > resulted
> > > in
> > > > a kstream of some value in it (size of kstream)? How do we check the
> > > > content of a kstream?
> > > >
> > > > - S
> > > >
> > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur 
> > > wrote:
> > > >
> > > > > Damien,
> > > > >
> > > > > Thanks a lot for pointing out.
> > > > >
> > > > > I got a little further. I am kind of stuck with the sequencing.
> > Couple
> > > of
> > > > > issues:
> > > > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > > > 2. Do I need to create a new KafkaStreams object when I create a
> > > > > KeyValueStore?
> > > > > 3. How do I initialize KeyValueIterator with  I
> > seem
> > > to
> > > > > get a error when I try:
> > > > > *KeyValueIterator  kviterator
> > > > > = keyValueStore.range("test_nod","test_node");*
> > > > >
> > > > > /// START CODE /
> > > > > //parser is a kstream as a result of join
> > > > > if (parser.toString().matches("null")){
> > > > >
> > > > > ReadOnlyKeyValueStore keyValueStore =
> > > > > null;
> > > > > KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > > > try {
> > > > > keyValueStore =
> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > > > } catch (InterruptedException e) {
> > > > > e.printStackTrace();
> > > > > }
> > > > > *KeyValueIterator kviterator
> > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > }else {
> > > > >
> > > > > *parser.to (stringSerde, jsonSerde,
> > "parser");*}
> > > > >
> > > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > > > streams.start();
> > > > >
> > > > > /// END CODE /
> > > > >
> > > > > - S
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy  >
> > > > wrote:
> > > > > >
> > > > > > It is part of the ReadOnlyKeyValueStore interface:
> > > > > >
> > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > > main/java/org/apache/kafka/streams/state/
> ReadOnlyKeyValueStore.java
> > > > > >
> > > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur 
> > > wrote:
> > > > > >
> > > > > > > That's cool. This feature is a part of rocksdb object and not
> > > ktable?
> > > > > > >
> > > > > > > Sent from my iPhone
> > > > > > >
> > > > > > > > On Jul 27, 2017, at 07:57, Damian Guy 
> > > > wrote:
> > > > > > > >
> > > > > > > > Yes they can be strings,
> > > > > > > >
> > > > > > > > so you could do something like:
> > > > > > > > 

Re: Kafka streams regex match

2017-07-28 Thread Damian Guy
It might be easier if you make a github gist with your code. It is quite
difficult to see what is happening in an email.

Cheers,
Damian
On Fri, 28 Jul 2017 at 19:22, Shekar Tippur  wrote:

> Thanks a lot Damien.
> I am able to get to see if the join worked (using foreach). I tried to add
> the logic to query the store after starting the streams:
> Looks like the code is not getting there. Here is the modified code:
>
> KafkaStreams streams = new KafkaStreams(builder, props);
>
> streams.start();
>
>
> parser.foreach(new ForeachAction() {
> @Override
> public void apply(String key, JsonNode value) {
> System.out.println(key + ": " + value);
> if (value == null){
> System.out.println("null match");
> ReadOnlyKeyValueStore keyValueStore =
> null;
> try {
> keyValueStore =
> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> QueryableStoreTypes.keyValueStore(), streams);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
>
> KeyValueIterator  kviterator =
> keyValueStore.range("test_nod","test_node");
> }
> }
> });
>
>
> On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy  wrote:
>
> > Hi,
> > The store won't be queryable until after you have called streams.start().
> > No stores have been created until the application is up and running and
> > they are dependent on the underlying partitions.
> >
> > To check that a stateful operation has produced a result you would
> normally
> > add another operation after the join, i.e.,
> > stream.join(other,...).foreach(..) or stream.join(other,...).to("topic")
> >
> > Thanks,
> > Damian
> >
> > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur  wrote:
> >
> > > One more thing.. How do we check if the stateful join operation
> resulted
> > in
> > > a kstream of some value in it (size of kstream)? How do we check the
> > > content of a kstream?
> > >
> > > - S
> > >
> > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur 
> > wrote:
> > >
> > > > Damien,
> > > >
> > > > Thanks a lot for pointing out.
> > > >
> > > > I got a little further. I am kind of stuck with the sequencing.
> Couple
> > of
> > > > issues:
> > > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > > 2. Do I need to create a new KafkaStreams object when I create a
> > > > KeyValueStore?
> > > > 3. How do I initialize KeyValueIterator with  I
> seem
> > to
> > > > get a error when I try:
> > > > *KeyValueIterator  kviterator
> > > > = keyValueStore.range("test_nod","test_node");*
> > > >
> > > > /// START CODE /
> > > > //parser is a kstream as a result of join
> > > > if (parser.toString().matches("null")){
> > > >
> > > > ReadOnlyKeyValueStore keyValueStore =
> > > > null;
> > > > KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > > try {
> > > > keyValueStore =
> > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > > } catch (InterruptedException e) {
> > > > e.printStackTrace();
> > > > }
> > > > *KeyValueIterator kviterator
> > > > = keyValueStore.range("test_nod","test_node");*
> > > > }else {
> > > >
> > > > *parser.to (stringSerde, jsonSerde,
> "parser");*}
> > > >
> > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > > streams.start();
> > > >
> > > > /// END CODE /
> > > >
> > > > - S
> > > >
> > > >
> > > >
> > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy 
> > > wrote:
> > > > >
> > > > > It is part of the ReadOnlyKeyValueStore interface:
> > > > >
> > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
> > > > >
> > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur 
> > wrote:
> > > > >
> > > > > > That's cool. This feature is a part of rocksdb object and not
> > ktable?
> > > > > >
> > > > > > Sent from my iPhone
> > > > > >
> > > > > > > On Jul 27, 2017, at 07:57, Damian Guy 
> > > wrote:
> > > > > > >
> > > > > > > Yes they can be strings,
> > > > > > >
> > > > > > > so you could do something like:
> > > > > > > store.range("test_host", "test_hosu");
> > > > > > >
> > > > > > > This would return an iterator containing all of the values
> > > > (inclusive)
> > > > > > from
> > > > > > > "test_host" -> "test_hosu".
> > > > > > >
> > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur  >
> > > > wrote:
> > > > > > >>
> > > > > > >> Can you please point me to an example? Can from and to be a
> > > string?
> > > > > > >>
> > > > > > >> Sent from my iPhone
> > > > > > >>
> > > > > 

Re: RocksDB Error on partition assignment

2017-07-28 Thread Guozhang Wang
Sameer,

This bug should be already fixed in trunk.

Which version of Kafka Streams are you running with? We can consider
backport it and have a bug-fix release if it turns out to be a common issue.


Guozhang


On Fri, Jul 28, 2017 at 4:57 AM, Damian Guy  wrote:

> It is due to a bug. You should set
> StreamsConfig.STATE_DIR_CLEANUP_DELAY_MS_CONFIG to Long.MAX_VALUE - i.e.,
> disabling it.
>
> On Fri, 28 Jul 2017 at 10:38 Sameer Kumar  wrote:
>
> > Hi,
> >
> > I am facing this error, no clue why this occurred. No other exception in
> > stacktrace was found.
> >
> > Only thing different I did was I ran kafka streams jar on machine2 a
> couple
> > of mins after i ran it on machine1.
> >
> > Please search for this string in the log below:-
> > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > LICSp-4-25k failed on partition assignment
> >
> >
> > 2017-07-28 14:55:51 INFO  StateDirectory:213 - Deleting obsolete state
> > directory 2_43 for task 2_43
> > 2017-07-28 14:55:51 INFO  StateDirectory:213 - Deleting obsolete state
> > directory 1_29 for task 1_29
> > 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> > directory 2_22 for task 2_22
> > 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> > directory 0_9 for task 0_9
> > 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> > directory 0_49 for task 0_49
> > 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> > directory 2_27 for task 2_27
> > 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> > directory 2_32 for task 2_32
> > 2017-07-28 14:55:52 INFO  StreamThread:767 - stream-thread
> [StreamThread-7]
> > Committing all tasks because the commit interval 5000ms has elapsed
> > 2017-07-28 14:55:52 INFO  StreamThread:805 - stream-thread
> [StreamThread-7]
> > Committing task StreamTask 0_1
> > 2017-07-28 14:55:52 ERROR StreamThread:813 - stream-thread
> [StreamThread-2]
> > Failed to commit StreamTask 1_35 state:
> > org.apache.kafka.streams.errors.ProcessorStateException: task [1_35]
> Failed
> > to flush state store lic3-deb-ci-25k
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(ProcessorStateManager.java:337)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.java:72)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:280)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> StreamThread.java:807)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> StreamThread.java:794)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:769)
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:647)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
> Error
> > while executing flush from store lic3-deb-ci-25k-201707280900
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> flushInternal(RocksDBStore.java:354)
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> flush(RocksDBStore.java:345)
> > at
> > org.apache.kafka.streams.state.internals.Segments.
> flush(Segments.java:138)
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.
> flush(RocksDBSegmentedBytesStore.java:117)
> > at
> >
> > org.apache.kafka.streams.state.internals.WrappedStateStore$
> AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
> > at
> >
> > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.
> flush(MeteredSegmentedBytesStore.java:111)
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.flush(
> RocksDBWindowStore.java:92)
> > at
> >
> > org.apache.kafka.streams.state.internals.CachingWindowStore.flush(
> CachingWindowStore.java:120)
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(ProcessorStateManager.java:335)
> > ... 8 more
> > Caused by: org.rocksdb.RocksDBException: s
> > at org.rocksdb.RocksDB.flush(Native Method)
> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> flushInternal(RocksDBStore.java:352)
> > ... 16 more
> > 2017-07-28 14:55:52 INFO  StreamThread:767 - stream-thread
> > [StreamThread-12] Committing all tasks because the commit interval 5000ms
> > has elapsed
> > 2017-07-28 14:55:52 INFO  StreamThread:390 - stream-thread
> [StreamThread-2]
> > Shutting down
> > 2017-07-28 14:55:52 INFO  StreamThread:805 - stream-thread
> > 

Re: Kafka streams regex match

2017-07-28 Thread Shekar Tippur
Thanks a lot Damien.
I am able to get to see if the join worked (using foreach). I tried to add
the logic to query the store after starting the streams:
Looks like the code is not getting there. Here is the modified code:

KafkaStreams streams = new KafkaStreams(builder, props);

streams.start();


parser.foreach(new ForeachAction() {
@Override
public void apply(String key, JsonNode value) {
System.out.println(key + ": " + value);
if (value == null){
System.out.println("null match");
ReadOnlyKeyValueStore keyValueStore =
null;
try {
keyValueStore =
IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
QueryableStoreTypes.keyValueStore(), streams);
} catch (InterruptedException e) {
e.printStackTrace();
}

KeyValueIterator  kviterator =
keyValueStore.range("test_nod","test_node");
}
}
});


On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy  wrote:

> Hi,
> The store won't be queryable until after you have called streams.start().
> No stores have been created until the application is up and running and
> they are dependent on the underlying partitions.
>
> To check that a stateful operation has produced a result you would normally
> add another operation after the join, i.e.,
> stream.join(other,...).foreach(..) or stream.join(other,...).to("topic")
>
> Thanks,
> Damian
>
> On Thu, 27 Jul 2017 at 22:52 Shekar Tippur  wrote:
>
> > One more thing.. How do we check if the stateful join operation resulted
> in
> > a kstream of some value in it (size of kstream)? How do we check the
> > content of a kstream?
> >
> > - S
> >
> > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur 
> wrote:
> >
> > > Damien,
> > >
> > > Thanks a lot for pointing out.
> > >
> > > I got a little further. I am kind of stuck with the sequencing. Couple
> of
> > > issues:
> > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > 2. Do I need to create a new KafkaStreams object when I create a
> > > KeyValueStore?
> > > 3. How do I initialize KeyValueIterator with  I seem
> to
> > > get a error when I try:
> > > *KeyValueIterator  kviterator
> > > = keyValueStore.range("test_nod","test_node");*
> > >
> > > /// START CODE /
> > > //parser is a kstream as a result of join
> > > if (parser.toString().matches("null")){
> > >
> > > ReadOnlyKeyValueStore keyValueStore =
> > > null;
> > > KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > try {
> > > keyValueStore =
> > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > } catch (InterruptedException e) {
> > > e.printStackTrace();
> > > }
> > > *KeyValueIterator kviterator
> > > = keyValueStore.range("test_nod","test_node");*
> > > }else {
> > >
> > > *parser.to (stringSerde, jsonSerde, "parser");*}
> > >
> > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > streams.start();
> > >
> > > /// END CODE /
> > >
> > > - S
> > >
> > >
> > >
> > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy 
> > wrote:
> > > >
> > > > It is part of the ReadOnlyKeyValueStore interface:
> > > >
> > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
> > > >
> > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur 
> wrote:
> > > >
> > > > > That's cool. This feature is a part of rocksdb object and not
> ktable?
> > > > >
> > > > > Sent from my iPhone
> > > > >
> > > > > > On Jul 27, 2017, at 07:57, Damian Guy 
> > wrote:
> > > > > >
> > > > > > Yes they can be strings,
> > > > > >
> > > > > > so you could do something like:
> > > > > > store.range("test_host", "test_hosu");
> > > > > >
> > > > > > This would return an iterator containing all of the values
> > > (inclusive)
> > > > > from
> > > > > > "test_host" -> "test_hosu".
> > > > > >
> > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur 
> > > wrote:
> > > > > >>
> > > > > >> Can you please point me to an example? Can from and to be a
> > string?
> > > > > >>
> > > > > >> Sent from my iPhone
> > > > > >>
> > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy 
> > > wrote:
> > > > > >>>
> > > > > >>> Hi,
> > > > > >>>
> > > > > >>> You can't use a regex, but you could use a range query.
> > > > > >>> i.e, keyValueStore.range(from, to)
> > > > > >>>
> > > > > >>> Thanks,
> > > > > >>> Damian
> > > > > >>>
> > > > >  On Wed, 26 Jul 2017 at 22:34 Shekar Tippur  >
> > > wrote:
> > > > > 
> > > > >  Hello,
> > > > > 
> > > > >  I am able to get the 

Re: kafka-consumer-groups tool with SASL_PLAINTEXT

2017-07-28 Thread Vahid S Hashemian
Hi Gabriel,

I have yet to experiment with enabling SSL for Kafka.
However, there are some good documents out there that seem to cover it. 
Examples:
* 
https://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption/
* 
http://coheigea.blogspot.com/2016/09/securing-apache-kafka-broker-part-i.html

Is there anything specific about the SSL and consumer groups that you are 
having issues with?

Thanks.
--Vahid




From:   Gabriel Machado 
To: users@kafka.apache.org
Date:   07/28/2017 08:40 AM
Subject:Re: kafka-consumer-groups tool with SASL_PLAINTEXT



Hi Vahid,

Do you know how to use consumer-group tool with ssl only (without sasl) ?

Gabriel.


Le 24 juil. 2017 11:15 PM, "Vahid S Hashemian" 
a écrit :

Hi Meghana,

I did some experiments with SASL_PLAINTEXT and documented the results
here:
https://developer.ibm.com/opentech/2017/05/31/kafka-acls-in-practice/
I think it covers what you'd like to achieve. If not, please advise.

Thanks.
--Vahid




From:   Meghana Narasimhan 
To: users@kafka.apache.org
Date:   07/24/2017 01:56 PM
Subject:kafka-consumer-groups tool with SASL_PLAINTEXT



Hi,
What is the correct way to use the kafka-consumer-groups tool with
SASL_PLAINTEXT security enabled ?

The tool seems to work fine with PLAINTEXT port but not with
SASL_PLAINTEXT. Can it be configured to work with SASL_PLAINTEXT ? If so
what permissions have to enabled for it ?

Thanks,
Meghana






Socks proxy

2017-07-28 Thread Jay Allen
Hey guys,

We're trying to use the Java Kafka client but it turns out it's not socks
proxy aware - the connect uses a SocketChannel that does not work with
proxies -
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/network/Selector.java

Any ideas how to use the official Java client with a socks proxy?

Thanks!
Jay


Re: kafka-consumer-groups tool with SASL_PLAINTEXT

2017-07-28 Thread Gabriel Machado
Hi Vahid,

Do you know how to use consumer-group tool with ssl only (without sasl) ?

Gabriel.


Le 24 juil. 2017 11:15 PM, "Vahid S Hashemian" 
a écrit :

Hi Meghana,

I did some experiments with SASL_PLAINTEXT and documented the results
here:
https://developer.ibm.com/opentech/2017/05/31/kafka-acls-in-practice/
I think it covers what you'd like to achieve. If not, please advise.

Thanks.
--Vahid




From:   Meghana Narasimhan 
To: users@kafka.apache.org
Date:   07/24/2017 01:56 PM
Subject:kafka-consumer-groups tool with SASL_PLAINTEXT



Hi,
What is the correct way to use the kafka-consumer-groups tool with
SASL_PLAINTEXT security enabled ?

The tool seems to work fine with PLAINTEXT port but not with
SASL_PLAINTEXT. Can it be configured to work with SASL_PLAINTEXT ? If so
what permissions have to enabled for it ?

Thanks,
Meghana


Re: Kafka Streams state store issue on cluster

2017-07-28 Thread Damian Guy
Hmmm, i'm not sure that is going to work as both nodes will have the same
setting for StreamsConfig.APPLICATION_SERVER_PORT, i.e, 0.0.0.0:7070

On Fri, 28 Jul 2017 at 16:02 Debasish Ghosh 
wrote:

> The log file is a huge one. I can send it to you though. Before that let
> me confirm one point ..
>
> I set the APPLICATION_SERVER_CONFIG to
> s"${config.httpInterface}:${config.httpPort}". In my case the
> httpInterface is "0.0.0.0" and the port is set to 7070. Since the two
> instances start on different nodes, this should be ok - right ?
>
> regards.
>
> On Fri, Jul 28, 2017 at 8:18 PM, Damian Guy  wrote:
>
>> Do you have any logs that might help to work out what is going wrong?
>>
>> On Fri, 28 Jul 2017 at 14:16 Damian Guy  wrote:
>>
>>> The config looks ok to me
>>>
>>> On Fri, 28 Jul 2017 at 13:24 Debasish Ghosh 
>>> wrote:
>>>
 I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r
 referring to. Just now I noticed that I may also need to set
 REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1).
 Anything else that I may be missing ?


 regards.

 On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh <
 ghosh.debas...@gmail.com>
 wrote:

 > Hi Damien -
 >
 > I am not sure I understand what u mean .. I have the following set in
 the
 > application .. Do I need to set anything else at the host level ?
 > Environment variable ?
 >
 > val streamingConfig = {
 >   val settings = new Properties
 >   settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
 > "kstream-weblog-processing")
 >   settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
 config.brokers)
 >
 >   config.schemaRegistryUrl.foreach{ url =>
 >
  settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
 > url)
 >   }
 >
 >   settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
 > Serdes.ByteArray.getClass.getName)
 >   settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
 > Serdes.String.getClass.getName)
 >
 >   // setting offset reset to earliest so that we can re-run the
 demo
 > code with the same pre-loaded data
 >   // Note: To re-run the demo, you need to use the offset reset
 tool:
 >   // https://cwiki.apache.org/confluence/display/KAFKA/
 > Kafka+Streams+Application+Reset+Tool
 >   settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
 "earliest")
 >
 >   // need this for query service
 >   settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
 > s"${config.httpInterface}:${config.httpPort}")
 >
 >   // default is /tmp/kafka-streams
 >   settings.put(StreamsConfig.STATE_DIR_CONFIG,
 config.stateStoreDir)
 >
 >   // Set the commit interval to 500ms so that any changes are
 flushed
 > frequently and the summary
 >   // data are updated with low latency.
 >   settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500")
 >
 >   settings
 > }
 >
 > Please explain a bit ..
 >
 > regards.
 >
 >
 > On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy 
 wrote:
 >
 >> Hi,
 >>
 >> Do you have the application.server property set appropriately for
 both
 >> hosts?
 >>
 >> The second stack trace is this bug:
 >> https://issues.apache.org/jira/browse/KAFKA-5556
 >>
 >> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh <
 ghosh.debas...@gmail.com>
 >> wrote:
 >>
 >> > Hi -
 >> >
 >> > In my Kafka Streams application, I have a state store resulting
 from a
 >> > stateful streaming topology. The environment is
 >> >
 >> >- Kafka 0.10.2.1
 >> >- It runs on a DC/OS cluster
 >> >- I am running Confluent-Kafka 3.2.2 on the cluster
 >> >- Each topic that I have has 2 partitions with replication
 factor = 2
 >> >- The application also has an associated http service that does
 >> >interactive queries on the state store
 >> >
 >> > The application runs fine when I invoke a single instance. I can
 use the
 >> > http endpoints to do queries and everything looks good. Problems
 surface
 >> > when I try to spawn another instance of the application. I use the
 same
 >> > APPLICATION_ID_CONFIG and the instance starts on a different node
 of the
 >> > cluster. The data consumption part works fine as the new instance
 also
 >> > starts consuming from the same topic as the first one. But when I
 try
 >> the
 >> > http query, the metadata fetch fails ..
 >> >
 >> > I have some code snippet like this as part of the query that tries
 to
 >> fetch
 >> > the metadata so that 

Re: Kafka Streams state store issue on cluster

2017-07-28 Thread Debasish Ghosh
The log file is a huge one. I can send it to you though. Before that let me
confirm one point ..

I set the APPLICATION_SERVER_CONFIG to
s"${config.httpInterface}:${config.httpPort}". In my case the httpInterface
is "0.0.0.0" and the port is set to 7070. Since the two instances start on
different nodes, this should be ok - right ?

regards.

On Fri, Jul 28, 2017 at 8:18 PM, Damian Guy  wrote:

> Do you have any logs that might help to work out what is going wrong?
>
> On Fri, 28 Jul 2017 at 14:16 Damian Guy  wrote:
>
>> The config looks ok to me
>>
>> On Fri, 28 Jul 2017 at 13:24 Debasish Ghosh 
>> wrote:
>>
>>> I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r
>>> referring to. Just now I noticed that I may also need to set
>>> REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1).
>>> Anything else that I may be missing ?
>>>
>>>
>>> regards.
>>>
>>> On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh <
>>> ghosh.debas...@gmail.com>
>>> wrote:
>>>
>>> > Hi Damien -
>>> >
>>> > I am not sure I understand what u mean .. I have the following set in
>>> the
>>> > application .. Do I need to set anything else at the host level ?
>>> > Environment variable ?
>>> >
>>> > val streamingConfig = {
>>> >   val settings = new Properties
>>> >   settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>> > "kstream-weblog-processing")
>>> >   settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>> config.brokers)
>>> >
>>> >   config.schemaRegistryUrl.foreach{ url =>
>>> > settings.put(AbstractKafkaAvroSerDeConfig.
>>> SCHEMA_REGISTRY_URL_CONFIG,
>>> > url)
>>> >   }
>>> >
>>> >   settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>> > Serdes.ByteArray.getClass.getName)
>>> >   settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>> > Serdes.String.getClass.getName)
>>> >
>>> >   // setting offset reset to earliest so that we can re-run the
>>> demo
>>> > code with the same pre-loaded data
>>> >   // Note: To re-run the demo, you need to use the offset reset
>>> tool:
>>> >   // https://cwiki.apache.org/confluence/display/KAFKA/
>>> > Kafka+Streams+Application+Reset+Tool
>>> >   settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>>> "earliest")
>>> >
>>> >   // need this for query service
>>> >   settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
>>> > s"${config.httpInterface}:${config.httpPort}")
>>> >
>>> >   // default is /tmp/kafka-streams
>>> >   settings.put(StreamsConfig.STATE_DIR_CONFIG,
>>> config.stateStoreDir)
>>> >
>>> >   // Set the commit interval to 500ms so that any changes are
>>> flushed
>>> > frequently and the summary
>>> >   // data are updated with low latency.
>>> >   settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500")
>>> >
>>> >   settings
>>> > }
>>> >
>>> > Please explain a bit ..
>>> >
>>> > regards.
>>> >
>>> >
>>> > On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy 
>>> wrote:
>>> >
>>> >> Hi,
>>> >>
>>> >> Do you have the application.server property set appropriately for both
>>> >> hosts?
>>> >>
>>> >> The second stack trace is this bug:
>>> >> https://issues.apache.org/jira/browse/KAFKA-5556
>>> >>
>>> >> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh >> >
>>> >> wrote:
>>> >>
>>> >> > Hi -
>>> >> >
>>> >> > In my Kafka Streams application, I have a state store resulting
>>> from a
>>> >> > stateful streaming topology. The environment is
>>> >> >
>>> >> >- Kafka 0.10.2.1
>>> >> >- It runs on a DC/OS cluster
>>> >> >- I am running Confluent-Kafka 3.2.2 on the cluster
>>> >> >- Each topic that I have has 2 partitions with replication
>>> factor = 2
>>> >> >- The application also has an associated http service that does
>>> >> >interactive queries on the state store
>>> >> >
>>> >> > The application runs fine when I invoke a single instance. I can
>>> use the
>>> >> > http endpoints to do queries and everything looks good. Problems
>>> surface
>>> >> > when I try to spawn another instance of the application. I use the
>>> same
>>> >> > APPLICATION_ID_CONFIG and the instance starts on a different node
>>> of the
>>> >> > cluster. The data consumption part works fine as the new instance
>>> also
>>> >> > starts consuming from the same topic as the first one. But when I
>>> try
>>> >> the
>>> >> > http query, the metadata fetch fails ..
>>> >> >
>>> >> > I have some code snippet like this as part of the query that tries
>>> to
>>> >> fetch
>>> >> > the metadata so that I can locate the host to query on ..
>>> >> >
>>> >> > metadataService.streamsMetadataForStoreAndKey(store, hostKey,
>>> >> > stringSerializer) match {
>>> >> >   case Success(host) => {
>>> >> > // hostKey is on another instance. call the other instance
>>> to
>>> >> fetch
>>> >> > the data.
>>> >> > if (!thisHost(host)) {
>>> >> >

Re: Kafka Streams state store issue on cluster

2017-07-28 Thread Damian Guy
Do you have any logs that might help to work out what is going wrong?

On Fri, 28 Jul 2017 at 14:16 Damian Guy  wrote:

> The config looks ok to me
>
> On Fri, 28 Jul 2017 at 13:24 Debasish Ghosh 
> wrote:
>
>> I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r
>> referring to. Just now I noticed that I may also need to set
>> REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1).
>> Anything else that I may be missing ?
>>
>>
>> regards.
>>
>> On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh > >
>> wrote:
>>
>> > Hi Damien -
>> >
>> > I am not sure I understand what u mean .. I have the following set in
>> the
>> > application .. Do I need to set anything else at the host level ?
>> > Environment variable ?
>> >
>> > val streamingConfig = {
>> >   val settings = new Properties
>> >   settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> > "kstream-weblog-processing")
>> >   settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>> config.brokers)
>> >
>> >   config.schemaRegistryUrl.foreach{ url =>
>> >
>>  settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> > url)
>> >   }
>> >
>> >   settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > Serdes.ByteArray.getClass.getName)
>> >   settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> > Serdes.String.getClass.getName)
>> >
>> >   // setting offset reset to earliest so that we can re-run the demo
>> > code with the same pre-loaded data
>> >   // Note: To re-run the demo, you need to use the offset reset
>> tool:
>> >   // https://cwiki.apache.org/confluence/display/KAFKA/
>> > Kafka+Streams+Application+Reset+Tool
>> >   settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
>> >
>> >   // need this for query service
>> >   settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
>> > s"${config.httpInterface}:${config.httpPort}")
>> >
>> >   // default is /tmp/kafka-streams
>> >   settings.put(StreamsConfig.STATE_DIR_CONFIG, config.stateStoreDir)
>> >
>> >   // Set the commit interval to 500ms so that any changes are
>> flushed
>> > frequently and the summary
>> >   // data are updated with low latency.
>> >   settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500")
>> >
>> >   settings
>> > }
>> >
>> > Please explain a bit ..
>> >
>> > regards.
>> >
>> >
>> > On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy 
>> wrote:
>> >
>> >> Hi,
>> >>
>> >> Do you have the application.server property set appropriately for both
>> >> hosts?
>> >>
>> >> The second stack trace is this bug:
>> >> https://issues.apache.org/jira/browse/KAFKA-5556
>> >>
>> >> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh 
>> >> wrote:
>> >>
>> >> > Hi -
>> >> >
>> >> > In my Kafka Streams application, I have a state store resulting from
>> a
>> >> > stateful streaming topology. The environment is
>> >> >
>> >> >- Kafka 0.10.2.1
>> >> >- It runs on a DC/OS cluster
>> >> >- I am running Confluent-Kafka 3.2.2 on the cluster
>> >> >- Each topic that I have has 2 partitions with replication factor
>> = 2
>> >> >- The application also has an associated http service that does
>> >> >interactive queries on the state store
>> >> >
>> >> > The application runs fine when I invoke a single instance. I can use
>> the
>> >> > http endpoints to do queries and everything looks good. Problems
>> surface
>> >> > when I try to spawn another instance of the application. I use the
>> same
>> >> > APPLICATION_ID_CONFIG and the instance starts on a different node of
>> the
>> >> > cluster. The data consumption part works fine as the new instance
>> also
>> >> > starts consuming from the same topic as the first one. But when I try
>> >> the
>> >> > http query, the metadata fetch fails ..
>> >> >
>> >> > I have some code snippet like this as part of the query that tries to
>> >> fetch
>> >> > the metadata so that I can locate the host to query on ..
>> >> >
>> >> > metadataService.streamsMetadataForStoreAndKey(store, hostKey,
>> >> > stringSerializer) match {
>> >> >   case Success(host) => {
>> >> > // hostKey is on another instance. call the other instance to
>> >> fetch
>> >> > the data.
>> >> > if (!thisHost(host)) {
>> >> >   logger.warn(s"Key $hostKey is on another instance not on
>> >> $host -
>> >> > requerying ..")
>> >> >   httpRequester.queryFromHost[Long](host, path)
>> >> > } else {
>> >> >   // hostKey is on this instance
>> >> >   localStateStoreQuery.queryStateStore(streams, store,
>> hostKey)
>> >> > }
>> >> >   }
>> >> >   case Failure(ex) => Future.failed(ex)
>> >> > }
>> >> >
>> >> > and the metadataService.streamsMetadataForStoreAndKey has the
>> following
>> >> > call ..
>> >> >
>> >> > streams.metadataForKey(store, key, 

Re: kafka-consumer-groups tool with SASL_PLAINTEXT

2017-07-28 Thread Meghana Narasimhan
Thanks, Vahid ! Nice documentation. All the tools were working fine except
for the kafka-consumer-groups --list which is what I was struggling to get
working. Realized I had missed the cluster permissions for the user. It
looks good now.

Thanks,
Meghana

On Mon, Jul 24, 2017 at 5:14 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi Meghana,
>
> I did some experiments with SASL_PLAINTEXT and documented the results
> here:
> https://developer.ibm.com/opentech/2017/05/31/kafka-acls-in-practice/
> I think it covers what you'd like to achieve. If not, please advise.
>
> Thanks.
> --Vahid
>
>
>
>
> From:   Meghana Narasimhan 
> To: users@kafka.apache.org
> Date:   07/24/2017 01:56 PM
> Subject:kafka-consumer-groups tool with SASL_PLAINTEXT
>
>
>
> Hi,
> What is the correct way to use the kafka-consumer-groups tool with
> SASL_PLAINTEXT security enabled ?
>
> The tool seems to work fine with PLAINTEXT port but not with
> SASL_PLAINTEXT. Can it be configured to work with SASL_PLAINTEXT ? If so
> what permissions have to enabled for it ?
>
> Thanks,
> Meghana
>
>
>
>


Re: Kafka Streams state store issue on cluster

2017-07-28 Thread Damian Guy
The config looks ok to me

On Fri, 28 Jul 2017 at 13:24 Debasish Ghosh 
wrote:

> I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r
> referring to. Just now I noticed that I may also need to set
> REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1).
> Anything else that I may be missing ?
>
>
> regards.
>
> On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh 
> wrote:
>
> > Hi Damien -
> >
> > I am not sure I understand what u mean .. I have the following set in the
> > application .. Do I need to set anything else at the host level ?
> > Environment variable ?
> >
> > val streamingConfig = {
> >   val settings = new Properties
> >   settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > "kstream-weblog-processing")
> >   settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> config.brokers)
> >
> >   config.schemaRegistryUrl.foreach{ url =>
> >
>  settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> > url)
> >   }
> >
> >   settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.ByteArray.getClass.getName)
> >   settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String.getClass.getName)
> >
> >   // setting offset reset to earliest so that we can re-run the demo
> > code with the same pre-loaded data
> >   // Note: To re-run the demo, you need to use the offset reset tool:
> >   // https://cwiki.apache.org/confluence/display/KAFKA/
> > Kafka+Streams+Application+Reset+Tool
> >   settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
> >
> >   // need this for query service
> >   settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
> > s"${config.httpInterface}:${config.httpPort}")
> >
> >   // default is /tmp/kafka-streams
> >   settings.put(StreamsConfig.STATE_DIR_CONFIG, config.stateStoreDir)
> >
> >   // Set the commit interval to 500ms so that any changes are flushed
> > frequently and the summary
> >   // data are updated with low latency.
> >   settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500")
> >
> >   settings
> > }
> >
> > Please explain a bit ..
> >
> > regards.
> >
> >
> > On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy 
> wrote:
> >
> >> Hi,
> >>
> >> Do you have the application.server property set appropriately for both
> >> hosts?
> >>
> >> The second stack trace is this bug:
> >> https://issues.apache.org/jira/browse/KAFKA-5556
> >>
> >> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh 
> >> wrote:
> >>
> >> > Hi -
> >> >
> >> > In my Kafka Streams application, I have a state store resulting from a
> >> > stateful streaming topology. The environment is
> >> >
> >> >- Kafka 0.10.2.1
> >> >- It runs on a DC/OS cluster
> >> >- I am running Confluent-Kafka 3.2.2 on the cluster
> >> >- Each topic that I have has 2 partitions with replication factor
> = 2
> >> >- The application also has an associated http service that does
> >> >interactive queries on the state store
> >> >
> >> > The application runs fine when I invoke a single instance. I can use
> the
> >> > http endpoints to do queries and everything looks good. Problems
> surface
> >> > when I try to spawn another instance of the application. I use the
> same
> >> > APPLICATION_ID_CONFIG and the instance starts on a different node of
> the
> >> > cluster. The data consumption part works fine as the new instance also
> >> > starts consuming from the same topic as the first one. But when I try
> >> the
> >> > http query, the metadata fetch fails ..
> >> >
> >> > I have some code snippet like this as part of the query that tries to
> >> fetch
> >> > the metadata so that I can locate the host to query on ..
> >> >
> >> > metadataService.streamsMetadataForStoreAndKey(store, hostKey,
> >> > stringSerializer) match {
> >> >   case Success(host) => {
> >> > // hostKey is on another instance. call the other instance to
> >> fetch
> >> > the data.
> >> > if (!thisHost(host)) {
> >> >   logger.warn(s"Key $hostKey is on another instance not on
> >> $host -
> >> > requerying ..")
> >> >   httpRequester.queryFromHost[Long](host, path)
> >> > } else {
> >> >   // hostKey is on this instance
> >> >   localStateStoreQuery.queryStateStore(streams, store,
> hostKey)
> >> > }
> >> >   }
> >> >   case Failure(ex) => Future.failed(ex)
> >> > }
> >> >
> >> > and the metadataService.streamsMetadataForStoreAndKey has the
> following
> >> > call ..
> >> >
> >> > streams.metadataForKey(store, key, serializer) match {
> >> >   case null => throw new IllegalArgumentException(s"Metadata for
> >> key
> >> > $key not found in $store")
> >> >   case metadata => new HostStoreInfo(metadata.host, metadata.port,
> >> > metadata.stateStoreNames.asScala.toSet)
> >> > }
> >> >
> >> > When I start 

Kafka Connect - Sink File - Header Record

2017-07-28 Thread mayank rathi
Hello All,

Is their a way to generate a Sink File with Header using Kafka Connect?

Thanks and Regards
MR


Re: Kafka Streams state store issue on cluster

2017-07-28 Thread Debasish Ghosh
I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r
referring to. Just now I noticed that I may also need to set
REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1).
Anything else that I may be missing ?


regards.

On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh 
wrote:

> Hi Damien -
>
> I am not sure I understand what u mean .. I have the following set in the
> application .. Do I need to set anything else at the host level ?
> Environment variable ?
>
> val streamingConfig = {
>   val settings = new Properties
>   settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "kstream-weblog-processing")
>   settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)
>
>   config.schemaRegistryUrl.foreach{ url =>
> settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> url)
>   }
>
>   settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.ByteArray.getClass.getName)
>   settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String.getClass.getName)
>
>   // setting offset reset to earliest so that we can re-run the demo
> code with the same pre-loaded data
>   // Note: To re-run the demo, you need to use the offset reset tool:
>   // https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams+Application+Reset+Tool
>   settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
>
>   // need this for query service
>   settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
> s"${config.httpInterface}:${config.httpPort}")
>
>   // default is /tmp/kafka-streams
>   settings.put(StreamsConfig.STATE_DIR_CONFIG, config.stateStoreDir)
>
>   // Set the commit interval to 500ms so that any changes are flushed
> frequently and the summary
>   // data are updated with low latency.
>   settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500")
>
>   settings
> }
>
> Please explain a bit ..
>
> regards.
>
>
> On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy  wrote:
>
>> Hi,
>>
>> Do you have the application.server property set appropriately for both
>> hosts?
>>
>> The second stack trace is this bug:
>> https://issues.apache.org/jira/browse/KAFKA-5556
>>
>> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh 
>> wrote:
>>
>> > Hi -
>> >
>> > In my Kafka Streams application, I have a state store resulting from a
>> > stateful streaming topology. The environment is
>> >
>> >- Kafka 0.10.2.1
>> >- It runs on a DC/OS cluster
>> >- I am running Confluent-Kafka 3.2.2 on the cluster
>> >- Each topic that I have has 2 partitions with replication factor = 2
>> >- The application also has an associated http service that does
>> >interactive queries on the state store
>> >
>> > The application runs fine when I invoke a single instance. I can use the
>> > http endpoints to do queries and everything looks good. Problems surface
>> > when I try to spawn another instance of the application. I use the same
>> > APPLICATION_ID_CONFIG and the instance starts on a different node of the
>> > cluster. The data consumption part works fine as the new instance also
>> > starts consuming from the same topic as the first one. But when I try
>> the
>> > http query, the metadata fetch fails ..
>> >
>> > I have some code snippet like this as part of the query that tries to
>> fetch
>> > the metadata so that I can locate the host to query on ..
>> >
>> > metadataService.streamsMetadataForStoreAndKey(store, hostKey,
>> > stringSerializer) match {
>> >   case Success(host) => {
>> > // hostKey is on another instance. call the other instance to
>> fetch
>> > the data.
>> > if (!thisHost(host)) {
>> >   logger.warn(s"Key $hostKey is on another instance not on
>> $host -
>> > requerying ..")
>> >   httpRequester.queryFromHost[Long](host, path)
>> > } else {
>> >   // hostKey is on this instance
>> >   localStateStoreQuery.queryStateStore(streams, store, hostKey)
>> > }
>> >   }
>> >   case Failure(ex) => Future.failed(ex)
>> > }
>> >
>> > and the metadataService.streamsMetadataForStoreAndKey has the following
>> > call ..
>> >
>> > streams.metadataForKey(store, key, serializer) match {
>> >   case null => throw new IllegalArgumentException(s"Metadata for
>> key
>> > $key not found in $store")
>> >   case metadata => new HostStoreInfo(metadata.host, metadata.port,
>> > metadata.stateStoreNames.asScala.toSet)
>> > }
>> >
>> > When I start the second instance, streams.metadataForKey returns null
>> for
>> > any key I pass .. Here's the relevant stack trace ..
>> >
>> > java.lang.IllegalArgumentException: Metadata for key mtc.clark.net not
>> > found in access-count-per-host
>> > at
>> >
>> > com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$
>> 

Re: Kafka Streams state store issue on cluster

2017-07-28 Thread Debasish Ghosh
Hi Damien -

I am not sure I understand what u mean .. I have the following set in the
application .. Do I need to set anything else at the host level ?
Environment variable ?

val streamingConfig = {
  val settings = new Properties
  settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
"kstream-weblog-processing")
  settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)

  config.schemaRegistryUrl.foreach{ url =>

settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, url)
  }

  settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArray.getClass.getName)
  settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String.getClass.getName)

  // setting offset reset to earliest so that we can re-run the demo
code with the same pre-loaded data
  // Note: To re-run the demo, you need to use the offset reset tool:
  //
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
  settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  // need this for query service
  settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
s"${config.httpInterface}:${config.httpPort}")

  // default is /tmp/kafka-streams
  settings.put(StreamsConfig.STATE_DIR_CONFIG, config.stateStoreDir)

  // Set the commit interval to 500ms so that any changes are flushed
frequently and the summary
  // data are updated with low latency.
  settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500")

  settings
}

Please explain a bit ..

regards.


On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy  wrote:

> Hi,
>
> Do you have the application.server property set appropriately for both
> hosts?
>
> The second stack trace is this bug:
> https://issues.apache.org/jira/browse/KAFKA-5556
>
> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh 
> wrote:
>
> > Hi -
> >
> > In my Kafka Streams application, I have a state store resulting from a
> > stateful streaming topology. The environment is
> >
> >- Kafka 0.10.2.1
> >- It runs on a DC/OS cluster
> >- I am running Confluent-Kafka 3.2.2 on the cluster
> >- Each topic that I have has 2 partitions with replication factor = 2
> >- The application also has an associated http service that does
> >interactive queries on the state store
> >
> > The application runs fine when I invoke a single instance. I can use the
> > http endpoints to do queries and everything looks good. Problems surface
> > when I try to spawn another instance of the application. I use the same
> > APPLICATION_ID_CONFIG and the instance starts on a different node of the
> > cluster. The data consumption part works fine as the new instance also
> > starts consuming from the same topic as the first one. But when I try the
> > http query, the metadata fetch fails ..
> >
> > I have some code snippet like this as part of the query that tries to
> fetch
> > the metadata so that I can locate the host to query on ..
> >
> > metadataService.streamsMetadataForStoreAndKey(store, hostKey,
> > stringSerializer) match {
> >   case Success(host) => {
> > // hostKey is on another instance. call the other instance to
> fetch
> > the data.
> > if (!thisHost(host)) {
> >   logger.warn(s"Key $hostKey is on another instance not on $host
> -
> > requerying ..")
> >   httpRequester.queryFromHost[Long](host, path)
> > } else {
> >   // hostKey is on this instance
> >   localStateStoreQuery.queryStateStore(streams, store, hostKey)
> > }
> >   }
> >   case Failure(ex) => Future.failed(ex)
> > }
> >
> > and the metadataService.streamsMetadataForStoreAndKey has the following
> > call ..
> >
> > streams.metadataForKey(store, key, serializer) match {
> >   case null => throw new IllegalArgumentException(s"Metadata for key
> > $key not found in $store")
> >   case metadata => new HostStoreInfo(metadata.host, metadata.port,
> > metadata.stateStoreNames.asScala.toSet)
> > }
> >
> > When I start the second instance, streams.metadataForKey returns null for
> > any key I pass .. Here's the relevant stack trace ..
> >
> > java.lang.IllegalArgumentException: Metadata for key mtc.clark.net not
> > found in access-count-per-host
> > at
> >
> > com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$
> streamsMetadataForStoreAndKey$1(MetadataService.scala:51)
> > at scala.util.Try$.apply(Try.scala:209)
> > at
> >
> > com.xx.fdp.sample.kstream.services.MetadataService.
> streamsMetadataForStoreAndKey(MetadataService.scala:46)
> > at
> >
> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryInfo(
> KeyValueFetcher.scala:36)
> > at
> >
> > com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCountSummary(
> KeyValueFetcher.scala:29)
> > at
> >
> > 

Re: Kafka Streams state store issue on cluster

2017-07-28 Thread Damian Guy
Hi,

Do you have the application.server property set appropriately for both
hosts?

The second stack trace is this bug:
https://issues.apache.org/jira/browse/KAFKA-5556

On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh 
wrote:

> Hi -
>
> In my Kafka Streams application, I have a state store resulting from a
> stateful streaming topology. The environment is
>
>- Kafka 0.10.2.1
>- It runs on a DC/OS cluster
>- I am running Confluent-Kafka 3.2.2 on the cluster
>- Each topic that I have has 2 partitions with replication factor = 2
>- The application also has an associated http service that does
>interactive queries on the state store
>
> The application runs fine when I invoke a single instance. I can use the
> http endpoints to do queries and everything looks good. Problems surface
> when I try to spawn another instance of the application. I use the same
> APPLICATION_ID_CONFIG and the instance starts on a different node of the
> cluster. The data consumption part works fine as the new instance also
> starts consuming from the same topic as the first one. But when I try the
> http query, the metadata fetch fails ..
>
> I have some code snippet like this as part of the query that tries to fetch
> the metadata so that I can locate the host to query on ..
>
> metadataService.streamsMetadataForStoreAndKey(store, hostKey,
> stringSerializer) match {
>   case Success(host) => {
> // hostKey is on another instance. call the other instance to fetch
> the data.
> if (!thisHost(host)) {
>   logger.warn(s"Key $hostKey is on another instance not on $host -
> requerying ..")
>   httpRequester.queryFromHost[Long](host, path)
> } else {
>   // hostKey is on this instance
>   localStateStoreQuery.queryStateStore(streams, store, hostKey)
> }
>   }
>   case Failure(ex) => Future.failed(ex)
> }
>
> and the metadataService.streamsMetadataForStoreAndKey has the following
> call ..
>
> streams.metadataForKey(store, key, serializer) match {
>   case null => throw new IllegalArgumentException(s"Metadata for key
> $key not found in $store")
>   case metadata => new HostStoreInfo(metadata.host, metadata.port,
> metadata.stateStoreNames.asScala.toSet)
> }
>
> When I start the second instance, streams.metadataForKey returns null for
> any key I pass .. Here's the relevant stack trace ..
>
> java.lang.IllegalArgumentException: Metadata for key mtc.clark.net not
> found in access-count-per-host
> at
>
> com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$streamsMetadataForStoreAndKey$1(MetadataService.scala:51)
> at scala.util.Try$.apply(Try.scala:209)
> at
>
> com.xx.fdp.sample.kstream.services.MetadataService.streamsMetadataForStoreAndKey(MetadataService.scala:46)
> at
>
> com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryInfo(KeyValueFetcher.scala:36)
> at
>
> com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCountSummary(KeyValueFetcher.scala:29)
> at
>
> com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun$routes$10(WeblogDSLHttpService.scala:41)
> at
>
> akka.http.scaladsl.server.directives.RouteDirectives.$anonfun$complete$1(RouteDirectives.scala:47)
> at
>
> akka.http.scaladsl.server.StandardRoute$$anon$1.apply(StandardRoute.scala:19)
> at
>
> akka.http.scaladsl.server.StandardRoute$$anon$1.apply(StandardRoute.scala:19)
> ...
>
> and following this exception I get another one which looks like an internal
> exception that stops the application ..
>
> 09:57:33.731 TKD [StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread -
> stream-thread [StreamThread-1] Streams application error during processing:
> java.lang.IllegalStateException: Attempt to retrieve exception from future
> which hasn't failed
> at
>
> org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
> at
>
> org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
> at
>
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> 

Re: RocksDB Error on partition assignment

2017-07-28 Thread Damian Guy
It is due to a bug. You should set
StreamsConfig.STATE_DIR_CLEANUP_DELAY_MS_CONFIG to Long.MAX_VALUE - i.e.,
disabling it.

On Fri, 28 Jul 2017 at 10:38 Sameer Kumar  wrote:

> Hi,
>
> I am facing this error, no clue why this occurred. No other exception in
> stacktrace was found.
>
> Only thing different I did was I ran kafka streams jar on machine2 a couple
> of mins after i ran it on machine1.
>
> Please search for this string in the log below:-
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> LICSp-4-25k failed on partition assignment
>
>
> 2017-07-28 14:55:51 INFO  StateDirectory:213 - Deleting obsolete state
> directory 2_43 for task 2_43
> 2017-07-28 14:55:51 INFO  StateDirectory:213 - Deleting obsolete state
> directory 1_29 for task 1_29
> 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> directory 2_22 for task 2_22
> 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> directory 0_9 for task 0_9
> 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> directory 0_49 for task 0_49
> 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> directory 2_27 for task 2_27
> 2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
> directory 2_32 for task 2_32
> 2017-07-28 14:55:52 INFO  StreamThread:767 - stream-thread [StreamThread-7]
> Committing all tasks because the commit interval 5000ms has elapsed
> 2017-07-28 14:55:52 INFO  StreamThread:805 - stream-thread [StreamThread-7]
> Committing task StreamTask 0_1
> 2017-07-28 14:55:52 ERROR StreamThread:813 - stream-thread [StreamThread-2]
> Failed to commit StreamTask 1_35 state:
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_35] Failed
> to flush state store lic3-deb-ci-25k
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
> at
>
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error
> while executing flush from store lic3-deb-ci-25k-201707280900
> at
>
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:354)
> at
>
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
> at
> org.apache.kafka.streams.state.internals.Segments.flush(Segments.java:138)
> at
>
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.flush(RocksDBSegmentedBytesStore.java:117)
> at
>
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
> at
>
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.flush(MeteredSegmentedBytesStore.java:111)
> at
>
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.flush(RocksDBWindowStore.java:92)
> at
>
> org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:120)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)
> ... 8 more
> Caused by: org.rocksdb.RocksDBException: s
> at org.rocksdb.RocksDB.flush(Native Method)
> at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
> at
>
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:352)
> ... 16 more
> 2017-07-28 14:55:52 INFO  StreamThread:767 - stream-thread
> [StreamThread-12] Committing all tasks because the commit interval 5000ms
> has elapsed
> 2017-07-28 14:55:52 INFO  StreamThread:390 - stream-thread [StreamThread-2]
> Shutting down
> 2017-07-28 14:55:52 INFO  StreamThread:805 - stream-thread
> [StreamThread-12] Committing task StreamTask 1_32
> 2017-07-28 14:55:52 INFO  StreamThread:1075 - stream-thread
> [StreamThread-2] Closing task 0_0
> 2017-07-28 14:55:53 INFO  StreamThread:767 - stream-thread
> [StreamThread-15] Committing all tasks because the commit interval 5000ms
> has elapsed
> 2017-07-28 14:55:53 INFO  StreamThread:805 - stream-thread
> [StreamThread-15] Committing task StreamTask 0_32
> 2017-07-28 14:55:53 INFO  StreamThread:767 - stream-thread [StreamThread-5]
> Committing all tasks because the commit interval 5000ms has elapsed
> 2017-07-28 14:55:53 INFO  

Kafka Streams state store issue on cluster

2017-07-28 Thread Debasish Ghosh
Hi -

In my Kafka Streams application, I have a state store resulting from a
stateful streaming topology. The environment is

   - Kafka 0.10.2.1
   - It runs on a DC/OS cluster
   - I am running Confluent-Kafka 3.2.2 on the cluster
   - Each topic that I have has 2 partitions with replication factor = 2
   - The application also has an associated http service that does
   interactive queries on the state store

The application runs fine when I invoke a single instance. I can use the
http endpoints to do queries and everything looks good. Problems surface
when I try to spawn another instance of the application. I use the same
APPLICATION_ID_CONFIG and the instance starts on a different node of the
cluster. The data consumption part works fine as the new instance also
starts consuming from the same topic as the first one. But when I try the
http query, the metadata fetch fails ..

I have some code snippet like this as part of the query that tries to fetch
the metadata so that I can locate the host to query on ..

metadataService.streamsMetadataForStoreAndKey(store, hostKey,
stringSerializer) match {
  case Success(host) => {
// hostKey is on another instance. call the other instance to fetch
the data.
if (!thisHost(host)) {
  logger.warn(s"Key $hostKey is on another instance not on $host -
requerying ..")
  httpRequester.queryFromHost[Long](host, path)
} else {
  // hostKey is on this instance
  localStateStoreQuery.queryStateStore(streams, store, hostKey)
}
  }
  case Failure(ex) => Future.failed(ex)
}

and the metadataService.streamsMetadataForStoreAndKey has the following
call ..

streams.metadataForKey(store, key, serializer) match {
  case null => throw new IllegalArgumentException(s"Metadata for key
$key not found in $store")
  case metadata => new HostStoreInfo(metadata.host, metadata.port,
metadata.stateStoreNames.asScala.toSet)
}

When I start the second instance, streams.metadataForKey returns null for
any key I pass .. Here's the relevant stack trace ..

java.lang.IllegalArgumentException: Metadata for key mtc.clark.net not
found in access-count-per-host
at
com.xx.fdp.sample.kstream.services.MetadataService.$anonfun$streamsMetadataForStoreAndKey$1(MetadataService.scala:51)
at scala.util.Try$.apply(Try.scala:209)
at
com.xx.fdp.sample.kstream.services.MetadataService.streamsMetadataForStoreAndKey(MetadataService.scala:46)
at
com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchSummaryInfo(KeyValueFetcher.scala:36)
at
com.xx.fdp.sample.kstream.http.KeyValueFetcher.fetchAccessCountSummary(KeyValueFetcher.scala:29)
at
com.xx.fdp.sample.kstream.http.WeblogDSLHttpService.$anonfun$routes$10(WeblogDSLHttpService.scala:41)
at
akka.http.scaladsl.server.directives.RouteDirectives.$anonfun$complete$1(RouteDirectives.scala:47)
at
akka.http.scaladsl.server.StandardRoute$$anon$1.apply(StandardRoute.scala:19)
at
akka.http.scaladsl.server.StandardRoute$$anon$1.apply(StandardRoute.scala:19)
...

and following this exception I get another one which looks like an internal
exception that stops the application ..

09:57:33.731 TKD [StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread -
stream-thread [StreamThread-1] Streams application error during processing:
java.lang.IllegalStateException: Attempt to retrieve exception from future
which hasn't failed
at
org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590)
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
at
org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
at
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
at
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
at
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

BTW the application runs fine when I have 2 instances running on the same
host (my laptop) or on 1 instance on the cluster. The problem surfaces only
when I start the second 

Limit of simultaneous consumers/clients?

2017-07-28 Thread Dr. Sven Abels
Hello,

 

we would like to use Kafka as a way to inform users about events of certain
topics. For this purpose, we want to develop Windows and Mac clients which
users would install on their desktop PCs.

 

We got a broad number of users, so it's likely that there will be >10.000
clients running in parallel.

 

If I understand it correctly, then Kafka uses Sockets and the user clients
would maintain an active connection to Kafka. If this is correct, I
wondered:

 

-What is the limit of clients that may run in parallel? Do 10.000 clients
mean 10.000 server connections? Would that be a problem for a typical
server? 

 

-Can we solve this problem by simply running kafka on several servers and
using something like a round-robin for the DNS so that the clients connect
to different servers?

 

-We expect to only send a few messages each day. Messages should arrive
quickly (<30 seconds delay) but we don't need realtime. Considering this: Is
kafka still a good solution or should we better switch to e.g. polling of
clients to the server without Kafka?

 

 

 

Best regards,

 

Sven



RocksDB Error on partition assignment

2017-07-28 Thread Sameer Kumar
Hi,

I am facing this error, no clue why this occurred. No other exception in
stacktrace was found.

Only thing different I did was I ran kafka streams jar on machine2 a couple
of mins after i ran it on machine1.

Please search for this string in the log below:-
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
LICSp-4-25k failed on partition assignment


2017-07-28 14:55:51 INFO  StateDirectory:213 - Deleting obsolete state
directory 2_43 for task 2_43
2017-07-28 14:55:51 INFO  StateDirectory:213 - Deleting obsolete state
directory 1_29 for task 1_29
2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
directory 2_22 for task 2_22
2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
directory 0_9 for task 0_9
2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
directory 0_49 for task 0_49
2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
directory 2_27 for task 2_27
2017-07-28 14:55:52 INFO  StateDirectory:213 - Deleting obsolete state
directory 2_32 for task 2_32
2017-07-28 14:55:52 INFO  StreamThread:767 - stream-thread [StreamThread-7]
Committing all tasks because the commit interval 5000ms has elapsed
2017-07-28 14:55:52 INFO  StreamThread:805 - stream-thread [StreamThread-7]
Committing task StreamTask 0_1
2017-07-28 14:55:52 ERROR StreamThread:813 - stream-thread [StreamThread-2]
Failed to commit StreamTask 1_35 state:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_35] Failed
to flush state store lic3-deb-ci-25k
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
at
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
at
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
at
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error
while executing flush from store lic3-deb-ci-25k-201707280900
at
org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:354)
at
org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
at
org.apache.kafka.streams.state.internals.Segments.flush(Segments.java:138)
at
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.flush(RocksDBSegmentedBytesStore.java:117)
at
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
at
org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.flush(MeteredSegmentedBytesStore.java:111)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.flush(RocksDBWindowStore.java:92)
at
org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:120)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)
... 8 more
Caused by: org.rocksdb.RocksDBException: s
at org.rocksdb.RocksDB.flush(Native Method)
at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
at
org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:352)
... 16 more
2017-07-28 14:55:52 INFO  StreamThread:767 - stream-thread
[StreamThread-12] Committing all tasks because the commit interval 5000ms
has elapsed
2017-07-28 14:55:52 INFO  StreamThread:390 - stream-thread [StreamThread-2]
Shutting down
2017-07-28 14:55:52 INFO  StreamThread:805 - stream-thread
[StreamThread-12] Committing task StreamTask 1_32
2017-07-28 14:55:52 INFO  StreamThread:1075 - stream-thread
[StreamThread-2] Closing task 0_0
2017-07-28 14:55:53 INFO  StreamThread:767 - stream-thread
[StreamThread-15] Committing all tasks because the commit interval 5000ms
has elapsed
2017-07-28 14:55:53 INFO  StreamThread:805 - stream-thread
[StreamThread-15] Committing task StreamTask 0_32
2017-07-28 14:55:53 INFO  StreamThread:767 - stream-thread [StreamThread-5]
Committing all tasks because the commit interval 5000ms has elapsed
2017-07-28 14:55:53 INFO  StreamThread:805 - stream-thread [StreamThread-5]
Committing task StreamTask 2_31
2017-07-28 14:55:53 INFO  StreamThread:767 - stream-thread
[StreamThread-14] Committing all tasks because the commit interval 5000ms
has elapsed
2017-07-28 14:55:53 INFO  StreamThread:805 - stream-thread
[StreamThread-14] Committing task StreamTask 0_34
2017-07-28 14:55:53 INFO  StreamThread:805 - stream-thread
[StreamThread-14] Committing 

"client" and requests in request.timeout.ms

2017-07-28 Thread Stevo Slavić
Hello Apache Kafka community,

In Consumer, Producer, AdminClient and Broker configuration documentation
there's a common config property, request.timeout.ms, with common
description part being:
"The configuration controls the maximum amount of time the client will wait
for the response of a request. If the response is not received before the
timeout elapses the client will resend the request if necessary or fail the
request if retries are exhausted."

If I'm not mistaken "client" term in all the different request.timeout.ms
config property descriptions actually refers to NetworkClient, which is
kind of leaky internal abstraction. It seems there's no mentioning of
NetworkClient on the Kafka documentation page. By it's javadoc
NetworkClient is:
"A network client for asynchronous request/response network i/o. This is an
internal class used to implement the user-facing producer and consumer
clients."
Since it's considered to be internal class maybe it could be moved in
"internal" package as other internal classes.
More importantly NetworkClient javadoc (second sentence) is not entirely
true, since NetworkClient is used on the broker side too, e.g. to exchange
controlled shutdown request/response, which IMO has nothing to do with
"user-facing producer and consumer clients". Because NetworkClient
abstraction is used on the broker side, there's request.timeout.ms config
property not only for producer/consumer but also in broker configuration.

Can somebody please verify if my understanding of the current situation is
correct?

There's no mentioning in the Kafka documentation about which requests will
be affected by tuning each of the request.timeout.ms config properties, or
how if at all are different request timeouts related.

Specifically I'd like to lower producer/consumer request timeout, so
user-facing client requests like Produce/Fetch/Metadata should be affected,
but e.g. controlled shutdown requests on the broker side should not. I'm
not sure whether broker side request timeout can be left unchanged or if
there's combination/chain of client and broker side request/responses that
are related so that the request timeout settings have to be kept in sync. I
guess maybe client side Produce request and broker side replica Fetch form
kind of a chain/dependency - depending on acks Produce cannot finish
successfully until enough of replicas got the message. Producer's "
request.timeout.ms" description explains relationship with Broker's "
replica.lag.time.max.ms" (potential produce failure or duplication of
messages due to retries being negative side-effects) but relationship with
Broker's "request.timeout.ms" is not covered. Similarly Consumer's Fetch
request to lead broker seems can only retrieve messages replicated to rest
of ISR set, so there's again kind of dependency on replica Fetch, this time
dependency has not so negative side-effect, it seems there could be more
empty reads if Consumer request timeout is lower than Broker's which is a
tradeoff, lower latency of individual requests vs lower load / number of
requests.

Is there a reason that the producer/consumer request timeout are set to
same default value as request timeout default on the broker side?

Additionally, is there are reason why there are no per-type-of-request
timeout config properties? Does 30sec timeout really make sense for all
kinds of requests, like user facing and internal coordination requests? New
AdminClient request timeout is the only one it seems to have different,
120sec as default request timeout (old Scala AdminClient it seems has 5sec
default request timeout).

On a related note, even after looking at ServerSocketTest I'm not sure what
happens with request handling on broker side when producer/consumer side
timeout occurs - how will broker treat this, will it stop request
processing and release any resources used, or will it continue to process
the request and consume resources until request is full processed so when
producer/consumer retries its timedout request, the broker side load will
just multiply with number of retries?

Kind regards,
Stevo Slavic.


Re: Kafka streams regex match

2017-07-28 Thread Damian Guy
Hi,
The store won't be queryable until after you have called streams.start().
No stores have been created until the application is up and running and
they are dependent on the underlying partitions.

To check that a stateful operation has produced a result you would normally
add another operation after the join, i.e.,
stream.join(other,...).foreach(..) or stream.join(other,...).to("topic")

Thanks,
Damian

On Thu, 27 Jul 2017 at 22:52 Shekar Tippur  wrote:

> One more thing.. How do we check if the stateful join operation resulted in
> a kstream of some value in it (size of kstream)? How do we check the
> content of a kstream?
>
> - S
>
> On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur  wrote:
>
> > Damien,
> >
> > Thanks a lot for pointing out.
> >
> > I got a little further. I am kind of stuck with the sequencing. Couple of
> > issues:
> > 1. I cannot initialise KafkaStreams before the parser.to().
> > 2. Do I need to create a new KafkaStreams object when I create a
> > KeyValueStore?
> > 3. How do I initialize KeyValueIterator with  I seem to
> > get a error when I try:
> > *KeyValueIterator  kviterator
> > = keyValueStore.range("test_nod","test_node");*
> >
> > /// START CODE /
> > //parser is a kstream as a result of join
> > if (parser.toString().matches("null")){
> >
> > ReadOnlyKeyValueStore keyValueStore =
> > null;
> > KafkaStreams newstreams = new KafkaStreams(builder, props);
> > try {
> > keyValueStore =
> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > QueryableStoreTypes.keyValueStore(), newstreams);
> > } catch (InterruptedException e) {
> > e.printStackTrace();
> > }
> > *KeyValueIterator kviterator
> > = keyValueStore.range("test_nod","test_node");*
> > }else {
> >
> > *parser.to (stringSerde, jsonSerde, "parser");*}
> >
> > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > streams.start();
> >
> > /// END CODE /
> >
> > - S
> >
> >
> >
> > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy 
> wrote:
> > >
> > > It is part of the ReadOnlyKeyValueStore interface:
> > >
> > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
> > >
> > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur  wrote:
> > >
> > > > That's cool. This feature is a part of rocksdb object and not ktable?
> > > >
> > > > Sent from my iPhone
> > > >
> > > > > On Jul 27, 2017, at 07:57, Damian Guy 
> wrote:
> > > > >
> > > > > Yes they can be strings,
> > > > >
> > > > > so you could do something like:
> > > > > store.range("test_host", "test_hosu");
> > > > >
> > > > > This would return an iterator containing all of the values
> > (inclusive)
> > > > from
> > > > > "test_host" -> "test_hosu".
> > > > >
> > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur 
> > wrote:
> > > > >>
> > > > >> Can you please point me to an example? Can from and to be a
> string?
> > > > >>
> > > > >> Sent from my iPhone
> > > > >>
> > > > >>> On Jul 27, 2017, at 04:04, Damian Guy 
> > wrote:
> > > > >>>
> > > > >>> Hi,
> > > > >>>
> > > > >>> You can't use a regex, but you could use a range query.
> > > > >>> i.e, keyValueStore.range(from, to)
> > > > >>>
> > > > >>> Thanks,
> > > > >>> Damian
> > > > >>>
> > > >  On Wed, 26 Jul 2017 at 22:34 Shekar Tippur 
> > wrote:
> > > > 
> > > >  Hello,
> > > > 
> > > >  I am able to get the kstream to ktable join work. I have some
> use
> > > > cases
> > > >  where the key is not always a exact match.
> > > >  I was wondering if there is a way to lookup keys based on regex.
> > > > 
> > > >  For example,
> > > >  I have these entries for a ktable:
> > > >  test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> > > > 
> > > >  test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> > > > 
> > > >  test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> > > > 
> > > >  blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > > > 
> > > >  and this for a kstream:
> > > > 
> > > >  test_host,{ "source": "test_host", "custom": { "test ": {
> > > > >> "creation_time ":
> > > >  "1234 " } } }
> > > > 
> > > >  In this case, if the exact match does not work, I would like to
> > lookup
> > > >  ktable for all entries that contains "test_host*" in it and have
> > > >  application logic to determine what would be the best fit.
> > > > 
> > > >  Appreciate input.
> > > > 
> > > >  - Shekar
> > > > 
> > > > >>
> > > >
> >
>


Re: Can we use builder.stream twice in single application

2017-07-28 Thread Matthias J. Sax
You can do both in a single application via

KStream input = builder.stream("topic");
input.to("output-1");
input.to("output-2");

In general, if you reuse a KStream or KTable and apply multiple
operators (in the example about, two `to()` operators), the input will
be duplicated and sent to each operator (ie, can also be more than two).

Note, that the data is not physically copied within the application as
only references are copied -- the records themselves are the same and
just handed in to all operators. Of course, when using `to()` the data
will be serialized in each and thus duplicated within `to()`.


-Matthias

On 7/28/17 6:33 AM, Sachin Mittal wrote:
> Hi,
> I have a multiple high level streams DSL to execute.
> 
> In first it reads from a source topic processes the data and sends the data
> to a sink.
> 
> In second it again reads from same source topic processes the data and
> sends it to a different topic.
> 
> For now these two operations are independent.
> 
> Now I can start two java applications.
> 
> But I was wondering if I can do all in single java application.
> 
> So my code is like:
> -
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "advice");
> .
> 
> KStreamBuilder builder = new KStreamBuilder();
> .
> 
> //first stream
> builder.stream(Serdes.String(), valueSerde, "advice-stream")
> ...
> 
> //second steam
> builder.stream(Serdes.String(), valueSerde, "advice-stream")
> 
> 
> 
> final KafkaStreams streams = new KafkaStreams(builder, props);
> .
> streams.start();
> 
> Will this work?
> 
> What will be the output of the following command
> kafka-run-class.sh kafka.admin.ConsumerGroupCommand --new-consumer
> --bootstrap-server localhost:9092 --group advice --describe
> 
> Will it show me two offsets one for first stream and one for second as each
> may have different offsets.
> 
> What about rocksdb state stores?
> 
> Or this is not possible?
> 
> Thanks
> Sachin
> 



signature.asc
Description: OpenPGP digital signature