Re: Kafka streams regex match

2017-08-08 Thread Shekar Tippur
I am running this on a mac laptop. I am using defaults.

Sent from my iPhone

> On Aug 8, 2017, at 03:11, Damian Guy  wrote:
> 
> Hi Shekar, that warning is expected during rebalances and should generally
> resolve itself.
> How many threads/app instances are you running?
> It is impossible to tell what is happening with the full logs.
> 
> Thanks,
> Damian
> 
>> On Mon, 7 Aug 2017 at 22:46 Shekar Tippur  wrote:
>> 
>> Damien,
>> 
>> Thanks for pointing out the error. I had tried a different version of
>> initializing the store.
>> 
>> Now that I am able to compile, I started to get the below error. I looked
>> up other suggestions for the same error and followed up to upgrade Kafka to
>> 0.11.0.0 version. I still get this error :/
>> 
>> [2017-08-07 14:40:41,264] WARN stream-thread
>> [streams-pipe-b67a7ffa-5535-4311-8886-ad6362617dc5-StreamThread-1] Could
>> not create task 0_0. Will retry:
>> (org.apache.kafka.streams.processor.internals.StreamThread)
>> 
>> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock
>> the state directory for task 0_0
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:99)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:80)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:111)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>> 
>>> On Fri, Aug 4, 2017 at 4:16 PM, Shekar Tippur  wrote:
>>> 
>>> Damian,
>>> 
>>> I am getting a syntax error. I have responded on gist.
>>> Appreciate any inputs.
>>> 
>>> - Shekar
>>> 
>>> On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy 
>> wrote:
>>> 
 Hi,
 
 I left a comment on your gist.
 
 Thanks,
 Damian
 
> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur  wrote:
> 
> Damien,
> 
> Here is a public gist:
> https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
> 
> - Shekar
> 
> On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy 
 wrote:
> 
>> It might be easier if you make a github gist with your code. It is
 quite
>> difficult to see what is happening in an email.
>> 
>> Cheers,
>> Damian
>> On Fri, 28 Jul 2017 at 19:22, Shekar Tippur 
 wrote:
>> 
>>> Thanks a lot Damien.
>>> I am able to get to see if the join worked (using foreach). I
>> tried
 to
>> add
>>> the logic to query the store after starting the streams:
>>> Looks like the code is not getting there. Here is the modified
>> code:
>>> 
>>> KafkaStreams streams = new KafkaStreams(builder, props);
>>> 
>>> streams.start();
>>> 
>>> 
>>> parser.foreach(new ForeachAction() {
>>>@Override
>>>public void apply(String key, JsonNode value) {
>>>System.out.println(key + ": " + value);
>>>if (value == null){
>>>System.out.println("null match");
>>>ReadOnlyKeyValueStore keyValueStore =
>>>null;
>>>try {
>>>keyValueStore =
>>> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
>>> QueryableStoreTypes.keyValueStore(), streams);
>>>} catch (Interrup

Re: Kafka streams regex match

2017-08-08 Thread Damian Guy
Hi Shekar, that warning is expected during rebalances and should generally
resolve itself.
How many threads/app instances are you running?
It is impossible to tell what is happening with the full logs.

Thanks,
Damian

On Mon, 7 Aug 2017 at 22:46 Shekar Tippur  wrote:

> Damien,
>
> Thanks for pointing out the error. I had tried a different version of
> initializing the store.
>
> Now that I am able to compile, I started to get the below error. I looked
> up other suggestions for the same error and followed up to upgrade Kafka to
> 0.11.0.0 version. I still get this error :/
>
> [2017-08-07 14:40:41,264] WARN stream-thread
> [streams-pipe-b67a7ffa-5535-4311-8886-ad6362617dc5-StreamThread-1] Could
> not create task 0_0. Will retry:
> (org.apache.kafka.streams.processor.internals.StreamThread)
>
> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock
> the state directory for task 0_0
>
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:99)
>
> at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:80)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:111)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>
> On Fri, Aug 4, 2017 at 4:16 PM, Shekar Tippur  wrote:
>
> > Damian,
> >
> > I am getting a syntax error. I have responded on gist.
> > Appreciate any inputs.
> >
> > - Shekar
> >
> > On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy 
> wrote:
> >
> >> Hi,
> >>
> >> I left a comment on your gist.
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur  wrote:
> >>
> >> > Damien,
> >> >
> >> > Here is a public gist:
> >> > https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
> >> >
> >> > - Shekar
> >> >
> >> > On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy 
> >> wrote:
> >> >
> >> > > It might be easier if you make a github gist with your code. It is
> >> quite
> >> > > difficult to see what is happening in an email.
> >> > >
> >> > > Cheers,
> >> > > Damian
> >> > > On Fri, 28 Jul 2017 at 19:22, Shekar Tippur 
> >> wrote:
> >> > >
> >> > > > Thanks a lot Damien.
> >> > > > I am able to get to see if the join worked (using foreach). I
> tried
> >> to
> >> > > add
> >> > > > the logic to query the store after starting the streams:
> >> > > > Looks like the code is not getting there. Here is the modified
> code:
> >> > > >
> >> > > > KafkaStreams streams = new KafkaStreams(builder, props);
> >> > > >
> >> > > > streams.start();
> >> > > >
> >> > > >
> >> > > > parser.foreach(new ForeachAction() {
> >> > > > @Override
> >> > > > public void apply(String key, JsonNode value) {
> >> > > > System.out.println(key + ": " + value);
> >> > > > if (value == null){
> >> > > > System.out.println("null match");
> >> > > > ReadOnlyKeyValueStore keyValueStore =
> >> > > > null;
> >> > > > try {
> >> > > > keyValueStore =
> >> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> >> > > > QueryableStoreTypes.keyValueStore(), streams);
> >> > > > } catch (InterruptedException e) {
> >> > > > e.printStackTrace();
> >> > > > }
> >> > > >
> >> > > > KeyValueIterator  kviterator =
> >> > > > keyValueStore.range("test_nod","test_

Re: Kafka streams regex match

2017-08-07 Thread Shekar Tippur
Damien,

Thanks for pointing out the error. I had tried a different version of
initializing the store.

Now that I am able to compile, I started to get the below error. I looked
up other suggestions for the same error and followed up to upgrade Kafka to
0.11.0.0 version. I still get this error :/

[2017-08-07 14:40:41,264] WARN stream-thread
[streams-pipe-b67a7ffa-5535-4311-8886-ad6362617dc5-StreamThread-1] Could
not create task 0_0. Will retry:
(org.apache.kafka.streams.processor.internals.StreamThread)

org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock
the state directory for task 0_0

at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:99)

at
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:80)

at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:111)

at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)

at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)

at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)

at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)

at
org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)

at
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)

at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)

at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)

at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)

at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)

On Fri, Aug 4, 2017 at 4:16 PM, Shekar Tippur  wrote:

> Damian,
>
> I am getting a syntax error. I have responded on gist.
> Appreciate any inputs.
>
> - Shekar
>
> On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy  wrote:
>
>> Hi,
>>
>> I left a comment on your gist.
>>
>> Thanks,
>> Damian
>>
>> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur  wrote:
>>
>> > Damien,
>> >
>> > Here is a public gist:
>> > https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
>> >
>> > - Shekar
>> >
>> > On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy 
>> wrote:
>> >
>> > > It might be easier if you make a github gist with your code. It is
>> quite
>> > > difficult to see what is happening in an email.
>> > >
>> > > Cheers,
>> > > Damian
>> > > On Fri, 28 Jul 2017 at 19:22, Shekar Tippur 
>> wrote:
>> > >
>> > > > Thanks a lot Damien.
>> > > > I am able to get to see if the join worked (using foreach). I tried
>> to
>> > > add
>> > > > the logic to query the store after starting the streams:
>> > > > Looks like the code is not getting there. Here is the modified code:
>> > > >
>> > > > KafkaStreams streams = new KafkaStreams(builder, props);
>> > > >
>> > > > streams.start();
>> > > >
>> > > >
>> > > > parser.foreach(new ForeachAction() {
>> > > > @Override
>> > > > public void apply(String key, JsonNode value) {
>> > > > System.out.println(key + ": " + value);
>> > > > if (value == null){
>> > > > System.out.println("null match");
>> > > > ReadOnlyKeyValueStore keyValueStore =
>> > > > null;
>> > > > try {
>> > > > keyValueStore =
>> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
>> > > > QueryableStoreTypes.keyValueStore(), streams);
>> > > > } catch (InterruptedException e) {
>> > > > e.printStackTrace();
>> > > > }
>> > > >
>> > > > KeyValueIterator  kviterator =
>> > > > keyValueStore.range("test_nod","test_node");
>> > > > }
>> > > > }
>> > > > });
>> > > >
>> > > >
>> > > > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy 
>> > > wrote:
>> > > >
>> > > > > Hi,
>> > > > > The store won't be queryable until after you have called
>> > > streams.start().
>> > > > > No stores have been created until the application is up and
>> running
>> > and
>> > > > > they are dependent on the underlying partitions.
>> > > > >
>> > > > > To check that a stateful operation has produced a result you would
>> > > > normally
>> > > > > add another operation after the join, i.e

Re: Kafka streams regex match

2017-08-04 Thread Shekar Tippur
Damian,

I am getting a syntax error. I have responded on gist.
Appreciate any inputs.

- Shekar

On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy  wrote:

> Hi,
>
> I left a comment on your gist.
>
> Thanks,
> Damian
>
> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur  wrote:
>
> > Damien,
> >
> > Here is a public gist:
> > https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
> >
> > - Shekar
> >
> > On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy 
> wrote:
> >
> > > It might be easier if you make a github gist with your code. It is
> quite
> > > difficult to see what is happening in an email.
> > >
> > > Cheers,
> > > Damian
> > > On Fri, 28 Jul 2017 at 19:22, Shekar Tippur  wrote:
> > >
> > > > Thanks a lot Damien.
> > > > I am able to get to see if the join worked (using foreach). I tried
> to
> > > add
> > > > the logic to query the store after starting the streams:
> > > > Looks like the code is not getting there. Here is the modified code:
> > > >
> > > > KafkaStreams streams = new KafkaStreams(builder, props);
> > > >
> > > > streams.start();
> > > >
> > > >
> > > > parser.foreach(new ForeachAction() {
> > > > @Override
> > > > public void apply(String key, JsonNode value) {
> > > > System.out.println(key + ": " + value);
> > > > if (value == null){
> > > > System.out.println("null match");
> > > > ReadOnlyKeyValueStore keyValueStore =
> > > > null;
> > > > try {
> > > > keyValueStore =
> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > QueryableStoreTypes.keyValueStore(), streams);
> > > > } catch (InterruptedException e) {
> > > > e.printStackTrace();
> > > > }
> > > >
> > > > KeyValueIterator  kviterator =
> > > > keyValueStore.range("test_nod","test_node");
> > > > }
> > > > }
> > > > });
> > > >
> > > >
> > > > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy 
> > > wrote:
> > > >
> > > > > Hi,
> > > > > The store won't be queryable until after you have called
> > > streams.start().
> > > > > No stores have been created until the application is up and running
> > and
> > > > > they are dependent on the underlying partitions.
> > > > >
> > > > > To check that a stateful operation has produced a result you would
> > > > normally
> > > > > add another operation after the join, i.e.,
> > > > > stream.join(other,...).foreach(..) or stream.join(other,...).to("
> > > topic")
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur 
> > wrote:
> > > > >
> > > > > > One more thing.. How do we check if the stateful join operation
> > > > resulted
> > > > > in
> > > > > > a kstream of some value in it (size of kstream)? How do we check
> > the
> > > > > > content of a kstream?
> > > > > >
> > > > > > - S
> > > > > >
> > > > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <
> ctip...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Damien,
> > > > > > >
> > > > > > > Thanks a lot for pointing out.
> > > > > > >
> > > > > > > I got a little further. I am kind of stuck with the sequencing.
> > > > Couple
> > > > > of
> > > > > > > issues:
> > > > > > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > > > > > 2. Do I need to create a new KafkaStreams object when I create
> a
> > > > > > > KeyValueStore?
> > > > > > > 3. How do I initialize KeyValueIterator with  JsonNode> I
> > > > seem
> > > > > to
> > > > > > > get a error when I try:
> > > > > > > *KeyValueIterator  kviterator
> > > > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > > >
> > > > > > > /// START CODE /
> > > > > > > //parser is a kstream as a result of join
> > > > > > > if (parser.toString().matches("null")){
> > > > > > >
> > > > > > > ReadOnlyKeyValueStore keyValueStore =
> > > > > > > null;
> > > > > > > KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > > > > > try {
> > > > > > > keyValueStore =
> > > > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > > > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > > > > > } catch (InterruptedException e) {
> > > > > > > e.printStackTrace();
> > > > > > > }
> > > > > > > *KeyValueIterator kviterator
> > > > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > > > }else {
> > > > > > >
> > > > > > > *parser.to (stringSerde, jsonSerde,
> > > > "parser");*}
> > > > > > >
> > > > > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > > > > > streams.start();
> > > > > > >
> > > > > > > /// END CODE /
> > > > > > >
> > > > > > > - S
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <
> > damian@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > It is part of the ReadOnlyKeyValueStore interfa

Re: Kafka streams regex match

2017-07-29 Thread Damian Guy
Hi,

I left a comment on your gist.

Thanks,
Damian

On Fri, 28 Jul 2017 at 21:50 Shekar Tippur  wrote:

> Damien,
>
> Here is a public gist:
> https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
>
> - Shekar
>
> On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy  wrote:
>
> > It might be easier if you make a github gist with your code. It is quite
> > difficult to see what is happening in an email.
> >
> > Cheers,
> > Damian
> > On Fri, 28 Jul 2017 at 19:22, Shekar Tippur  wrote:
> >
> > > Thanks a lot Damien.
> > > I am able to get to see if the join worked (using foreach). I tried to
> > add
> > > the logic to query the store after starting the streams:
> > > Looks like the code is not getting there. Here is the modified code:
> > >
> > > KafkaStreams streams = new KafkaStreams(builder, props);
> > >
> > > streams.start();
> > >
> > >
> > > parser.foreach(new ForeachAction() {
> > > @Override
> > > public void apply(String key, JsonNode value) {
> > > System.out.println(key + ": " + value);
> > > if (value == null){
> > > System.out.println("null match");
> > > ReadOnlyKeyValueStore keyValueStore =
> > > null;
> > > try {
> > > keyValueStore =
> > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > QueryableStoreTypes.keyValueStore(), streams);
> > > } catch (InterruptedException e) {
> > > e.printStackTrace();
> > > }
> > >
> > > KeyValueIterator  kviterator =
> > > keyValueStore.range("test_nod","test_node");
> > > }
> > > }
> > > });
> > >
> > >
> > > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy 
> > wrote:
> > >
> > > > Hi,
> > > > The store won't be queryable until after you have called
> > streams.start().
> > > > No stores have been created until the application is up and running
> and
> > > > they are dependent on the underlying partitions.
> > > >
> > > > To check that a stateful operation has produced a result you would
> > > normally
> > > > add another operation after the join, i.e.,
> > > > stream.join(other,...).foreach(..) or stream.join(other,...).to("
> > topic")
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur 
> wrote:
> > > >
> > > > > One more thing.. How do we check if the stateful join operation
> > > resulted
> > > > in
> > > > > a kstream of some value in it (size of kstream)? How do we check
> the
> > > > > content of a kstream?
> > > > >
> > > > > - S
> > > > >
> > > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur 
> > > > wrote:
> > > > >
> > > > > > Damien,
> > > > > >
> > > > > > Thanks a lot for pointing out.
> > > > > >
> > > > > > I got a little further. I am kind of stuck with the sequencing.
> > > Couple
> > > > of
> > > > > > issues:
> > > > > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > > > > 2. Do I need to create a new KafkaStreams object when I create a
> > > > > > KeyValueStore?
> > > > > > 3. How do I initialize KeyValueIterator with  I
> > > seem
> > > > to
> > > > > > get a error when I try:
> > > > > > *KeyValueIterator  kviterator
> > > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > >
> > > > > > /// START CODE /
> > > > > > //parser is a kstream as a result of join
> > > > > > if (parser.toString().matches("null")){
> > > > > >
> > > > > > ReadOnlyKeyValueStore keyValueStore =
> > > > > > null;
> > > > > > KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > > > > try {
> > > > > > keyValueStore =
> > > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > > > > } catch (InterruptedException e) {
> > > > > > e.printStackTrace();
> > > > > > }
> > > > > > *KeyValueIterator kviterator
> > > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > > }else {
> > > > > >
> > > > > > *parser.to (stringSerde, jsonSerde,
> > > "parser");*}
> > > > > >
> > > > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > > > > streams.start();
> > > > > >
> > > > > > /// END CODE /
> > > > > >
> > > > > > - S
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <
> damian@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > It is part of the ReadOnlyKeyValueStore interface:
> > > > > > >
> > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > > > main/java/org/apache/kafka/streams/state/
> > ReadOnlyKeyValueStore.java
> > > > > > >
> > > > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur 
> > > > wrote:
> > > > > > >
> > > > > > > > That's cool. This feature is a part of rocksdb object and not
> > > > ktable?
> > > > > > > >
> > > > > > > > Sent from my iPhone
> > > > > > > >
> > > > > > > > > On Jul 27, 2017, at 07:57, 

Re: Kafka streams regex match

2017-07-28 Thread Shekar Tippur
Damien,

Here is a public gist:
https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8

- Shekar

On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy  wrote:

> It might be easier if you make a github gist with your code. It is quite
> difficult to see what is happening in an email.
>
> Cheers,
> Damian
> On Fri, 28 Jul 2017 at 19:22, Shekar Tippur  wrote:
>
> > Thanks a lot Damien.
> > I am able to get to see if the join worked (using foreach). I tried to
> add
> > the logic to query the store after starting the streams:
> > Looks like the code is not getting there. Here is the modified code:
> >
> > KafkaStreams streams = new KafkaStreams(builder, props);
> >
> > streams.start();
> >
> >
> > parser.foreach(new ForeachAction() {
> > @Override
> > public void apply(String key, JsonNode value) {
> > System.out.println(key + ": " + value);
> > if (value == null){
> > System.out.println("null match");
> > ReadOnlyKeyValueStore keyValueStore =
> > null;
> > try {
> > keyValueStore =
> > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > QueryableStoreTypes.keyValueStore(), streams);
> > } catch (InterruptedException e) {
> > e.printStackTrace();
> > }
> >
> > KeyValueIterator  kviterator =
> > keyValueStore.range("test_nod","test_node");
> > }
> > }
> > });
> >
> >
> > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy 
> wrote:
> >
> > > Hi,
> > > The store won't be queryable until after you have called
> streams.start().
> > > No stores have been created until the application is up and running and
> > > they are dependent on the underlying partitions.
> > >
> > > To check that a stateful operation has produced a result you would
> > normally
> > > add another operation after the join, i.e.,
> > > stream.join(other,...).foreach(..) or stream.join(other,...).to("
> topic")
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur  wrote:
> > >
> > > > One more thing.. How do we check if the stateful join operation
> > resulted
> > > in
> > > > a kstream of some value in it (size of kstream)? How do we check the
> > > > content of a kstream?
> > > >
> > > > - S
> > > >
> > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur 
> > > wrote:
> > > >
> > > > > Damien,
> > > > >
> > > > > Thanks a lot for pointing out.
> > > > >
> > > > > I got a little further. I am kind of stuck with the sequencing.
> > Couple
> > > of
> > > > > issues:
> > > > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > > > 2. Do I need to create a new KafkaStreams object when I create a
> > > > > KeyValueStore?
> > > > > 3. How do I initialize KeyValueIterator with  I
> > seem
> > > to
> > > > > get a error when I try:
> > > > > *KeyValueIterator  kviterator
> > > > > = keyValueStore.range("test_nod","test_node");*
> > > > >
> > > > > /// START CODE /
> > > > > //parser is a kstream as a result of join
> > > > > if (parser.toString().matches("null")){
> > > > >
> > > > > ReadOnlyKeyValueStore keyValueStore =
> > > > > null;
> > > > > KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > > > try {
> > > > > keyValueStore =
> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > > > } catch (InterruptedException e) {
> > > > > e.printStackTrace();
> > > > > }
> > > > > *KeyValueIterator kviterator
> > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > }else {
> > > > >
> > > > > *parser.to (stringSerde, jsonSerde,
> > "parser");*}
> > > > >
> > > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > > > streams.start();
> > > > >
> > > > > /// END CODE /
> > > > >
> > > > > - S
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy  >
> > > > wrote:
> > > > > >
> > > > > > It is part of the ReadOnlyKeyValueStore interface:
> > > > > >
> > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > > main/java/org/apache/kafka/streams/state/
> ReadOnlyKeyValueStore.java
> > > > > >
> > > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur 
> > > wrote:
> > > > > >
> > > > > > > That's cool. This feature is a part of rocksdb object and not
> > > ktable?
> > > > > > >
> > > > > > > Sent from my iPhone
> > > > > > >
> > > > > > > > On Jul 27, 2017, at 07:57, Damian Guy 
> > > > wrote:
> > > > > > > >
> > > > > > > > Yes they can be strings,
> > > > > > > >
> > > > > > > > so you could do something like:
> > > > > > > > store.range("test_host", "test_hosu");
> > > > > > > >
> > > > > > > > This would return an iterator containing all of the values
> > > > > (inclusive)
> > > > > > > from
> > > > > > > > "test_host" -> "test_hosu".
> > > > > > > >
> > > > > > > >> On Thu, 27 Jul 20

Re: Kafka streams regex match

2017-07-28 Thread Damian Guy
It might be easier if you make a github gist with your code. It is quite
difficult to see what is happening in an email.

Cheers,
Damian
On Fri, 28 Jul 2017 at 19:22, Shekar Tippur  wrote:

> Thanks a lot Damien.
> I am able to get to see if the join worked (using foreach). I tried to add
> the logic to query the store after starting the streams:
> Looks like the code is not getting there. Here is the modified code:
>
> KafkaStreams streams = new KafkaStreams(builder, props);
>
> streams.start();
>
>
> parser.foreach(new ForeachAction() {
> @Override
> public void apply(String key, JsonNode value) {
> System.out.println(key + ": " + value);
> if (value == null){
> System.out.println("null match");
> ReadOnlyKeyValueStore keyValueStore =
> null;
> try {
> keyValueStore =
> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> QueryableStoreTypes.keyValueStore(), streams);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
>
> KeyValueIterator  kviterator =
> keyValueStore.range("test_nod","test_node");
> }
> }
> });
>
>
> On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy  wrote:
>
> > Hi,
> > The store won't be queryable until after you have called streams.start().
> > No stores have been created until the application is up and running and
> > they are dependent on the underlying partitions.
> >
> > To check that a stateful operation has produced a result you would
> normally
> > add another operation after the join, i.e.,
> > stream.join(other,...).foreach(..) or stream.join(other,...).to("topic")
> >
> > Thanks,
> > Damian
> >
> > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur  wrote:
> >
> > > One more thing.. How do we check if the stateful join operation
> resulted
> > in
> > > a kstream of some value in it (size of kstream)? How do we check the
> > > content of a kstream?
> > >
> > > - S
> > >
> > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur 
> > wrote:
> > >
> > > > Damien,
> > > >
> > > > Thanks a lot for pointing out.
> > > >
> > > > I got a little further. I am kind of stuck with the sequencing.
> Couple
> > of
> > > > issues:
> > > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > > 2. Do I need to create a new KafkaStreams object when I create a
> > > > KeyValueStore?
> > > > 3. How do I initialize KeyValueIterator with  I
> seem
> > to
> > > > get a error when I try:
> > > > *KeyValueIterator  kviterator
> > > > = keyValueStore.range("test_nod","test_node");*
> > > >
> > > > /// START CODE /
> > > > //parser is a kstream as a result of join
> > > > if (parser.toString().matches("null")){
> > > >
> > > > ReadOnlyKeyValueStore keyValueStore =
> > > > null;
> > > > KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > > try {
> > > > keyValueStore =
> > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > > } catch (InterruptedException e) {
> > > > e.printStackTrace();
> > > > }
> > > > *KeyValueIterator kviterator
> > > > = keyValueStore.range("test_nod","test_node");*
> > > > }else {
> > > >
> > > > *parser.to (stringSerde, jsonSerde,
> "parser");*}
> > > >
> > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > > streams.start();
> > > >
> > > > /// END CODE /
> > > >
> > > > - S
> > > >
> > > >
> > > >
> > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy 
> > > wrote:
> > > > >
> > > > > It is part of the ReadOnlyKeyValueStore interface:
> > > > >
> > > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > > main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
> > > > >
> > > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur 
> > wrote:
> > > > >
> > > > > > That's cool. This feature is a part of rocksdb object and not
> > ktable?
> > > > > >
> > > > > > Sent from my iPhone
> > > > > >
> > > > > > > On Jul 27, 2017, at 07:57, Damian Guy 
> > > wrote:
> > > > > > >
> > > > > > > Yes they can be strings,
> > > > > > >
> > > > > > > so you could do something like:
> > > > > > > store.range("test_host", "test_hosu");
> > > > > > >
> > > > > > > This would return an iterator containing all of the values
> > > > (inclusive)
> > > > > > from
> > > > > > > "test_host" -> "test_hosu".
> > > > > > >
> > > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur  >
> > > > wrote:
> > > > > > >>
> > > > > > >> Can you please point me to an example? Can from and to be a
> > > string?
> > > > > > >>
> > > > > > >> Sent from my iPhone
> > > > > > >>
> > > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy 
> > > > wrote:
> > > > > > >>>
> > > > > > >>> Hi,
> > > > > > >>>
> > > > > > >>> You can't use a regex, but you could use a range query.
> > > > > > >>> i.e, keyValueStore.range(from, to)
> > > > 

Re: Kafka streams regex match

2017-07-28 Thread Shekar Tippur
Thanks a lot Damien.
I am able to get to see if the join worked (using foreach). I tried to add
the logic to query the store after starting the streams:
Looks like the code is not getting there. Here is the modified code:

KafkaStreams streams = new KafkaStreams(builder, props);

streams.start();


parser.foreach(new ForeachAction() {
@Override
public void apply(String key, JsonNode value) {
System.out.println(key + ": " + value);
if (value == null){
System.out.println("null match");
ReadOnlyKeyValueStore keyValueStore =
null;
try {
keyValueStore =
IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
QueryableStoreTypes.keyValueStore(), streams);
} catch (InterruptedException e) {
e.printStackTrace();
}

KeyValueIterator  kviterator =
keyValueStore.range("test_nod","test_node");
}
}
});


On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy  wrote:

> Hi,
> The store won't be queryable until after you have called streams.start().
> No stores have been created until the application is up and running and
> they are dependent on the underlying partitions.
>
> To check that a stateful operation has produced a result you would normally
> add another operation after the join, i.e.,
> stream.join(other,...).foreach(..) or stream.join(other,...).to("topic")
>
> Thanks,
> Damian
>
> On Thu, 27 Jul 2017 at 22:52 Shekar Tippur  wrote:
>
> > One more thing.. How do we check if the stateful join operation resulted
> in
> > a kstream of some value in it (size of kstream)? How do we check the
> > content of a kstream?
> >
> > - S
> >
> > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur 
> wrote:
> >
> > > Damien,
> > >
> > > Thanks a lot for pointing out.
> > >
> > > I got a little further. I am kind of stuck with the sequencing. Couple
> of
> > > issues:
> > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > 2. Do I need to create a new KafkaStreams object when I create a
> > > KeyValueStore?
> > > 3. How do I initialize KeyValueIterator with  I seem
> to
> > > get a error when I try:
> > > *KeyValueIterator  kviterator
> > > = keyValueStore.range("test_nod","test_node");*
> > >
> > > /// START CODE /
> > > //parser is a kstream as a result of join
> > > if (parser.toString().matches("null")){
> > >
> > > ReadOnlyKeyValueStore keyValueStore =
> > > null;
> > > KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > try {
> > > keyValueStore =
> > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > } catch (InterruptedException e) {
> > > e.printStackTrace();
> > > }
> > > *KeyValueIterator kviterator
> > > = keyValueStore.range("test_nod","test_node");*
> > > }else {
> > >
> > > *parser.to (stringSerde, jsonSerde, "parser");*}
> > >
> > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > streams.start();
> > >
> > > /// END CODE /
> > >
> > > - S
> > >
> > >
> > >
> > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy 
> > wrote:
> > > >
> > > > It is part of the ReadOnlyKeyValueStore interface:
> > > >
> > > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > > main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
> > > >
> > > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur 
> wrote:
> > > >
> > > > > That's cool. This feature is a part of rocksdb object and not
> ktable?
> > > > >
> > > > > Sent from my iPhone
> > > > >
> > > > > > On Jul 27, 2017, at 07:57, Damian Guy 
> > wrote:
> > > > > >
> > > > > > Yes they can be strings,
> > > > > >
> > > > > > so you could do something like:
> > > > > > store.range("test_host", "test_hosu");
> > > > > >
> > > > > > This would return an iterator containing all of the values
> > > (inclusive)
> > > > > from
> > > > > > "test_host" -> "test_hosu".
> > > > > >
> > > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur 
> > > wrote:
> > > > > >>
> > > > > >> Can you please point me to an example? Can from and to be a
> > string?
> > > > > >>
> > > > > >> Sent from my iPhone
> > > > > >>
> > > > > >>> On Jul 27, 2017, at 04:04, Damian Guy 
> > > wrote:
> > > > > >>>
> > > > > >>> Hi,
> > > > > >>>
> > > > > >>> You can't use a regex, but you could use a range query.
> > > > > >>> i.e, keyValueStore.range(from, to)
> > > > > >>>
> > > > > >>> Thanks,
> > > > > >>> Damian
> > > > > >>>
> > > > >  On Wed, 26 Jul 2017 at 22:34 Shekar Tippur  >
> > > wrote:
> > > > > 
> > > > >  Hello,
> > > > > 
> > > > >  I am able to get the kstream to ktable join work. I have some
> > use
> > > > > cases
> > > > >  where the key is not always a exact match.
> > > > >  I was wondering if there is a way to lookup keys based on
> regex.
> > > > > 
> > > > >  For example,
> > > > >  I 

Re: Kafka streams regex match

2017-07-28 Thread Damian Guy
Hi,
The store won't be queryable until after you have called streams.start().
No stores have been created until the application is up and running and
they are dependent on the underlying partitions.

To check that a stateful operation has produced a result you would normally
add another operation after the join, i.e.,
stream.join(other,...).foreach(..) or stream.join(other,...).to("topic")

Thanks,
Damian

On Thu, 27 Jul 2017 at 22:52 Shekar Tippur  wrote:

> One more thing.. How do we check if the stateful join operation resulted in
> a kstream of some value in it (size of kstream)? How do we check the
> content of a kstream?
>
> - S
>
> On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur  wrote:
>
> > Damien,
> >
> > Thanks a lot for pointing out.
> >
> > I got a little further. I am kind of stuck with the sequencing. Couple of
> > issues:
> > 1. I cannot initialise KafkaStreams before the parser.to().
> > 2. Do I need to create a new KafkaStreams object when I create a
> > KeyValueStore?
> > 3. How do I initialize KeyValueIterator with  I seem to
> > get a error when I try:
> > *KeyValueIterator  kviterator
> > = keyValueStore.range("test_nod","test_node");*
> >
> > /// START CODE /
> > //parser is a kstream as a result of join
> > if (parser.toString().matches("null")){
> >
> > ReadOnlyKeyValueStore keyValueStore =
> > null;
> > KafkaStreams newstreams = new KafkaStreams(builder, props);
> > try {
> > keyValueStore =
> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > QueryableStoreTypes.keyValueStore(), newstreams);
> > } catch (InterruptedException e) {
> > e.printStackTrace();
> > }
> > *KeyValueIterator kviterator
> > = keyValueStore.range("test_nod","test_node");*
> > }else {
> >
> > *parser.to (stringSerde, jsonSerde, "parser");*}
> >
> > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > streams.start();
> >
> > /// END CODE /
> >
> > - S
> >
> >
> >
> > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy 
> wrote:
> > >
> > > It is part of the ReadOnlyKeyValueStore interface:
> > >
> > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
> > >
> > > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur  wrote:
> > >
> > > > That's cool. This feature is a part of rocksdb object and not ktable?
> > > >
> > > > Sent from my iPhone
> > > >
> > > > > On Jul 27, 2017, at 07:57, Damian Guy 
> wrote:
> > > > >
> > > > > Yes they can be strings,
> > > > >
> > > > > so you could do something like:
> > > > > store.range("test_host", "test_hosu");
> > > > >
> > > > > This would return an iterator containing all of the values
> > (inclusive)
> > > > from
> > > > > "test_host" -> "test_hosu".
> > > > >
> > > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur 
> > wrote:
> > > > >>
> > > > >> Can you please point me to an example? Can from and to be a
> string?
> > > > >>
> > > > >> Sent from my iPhone
> > > > >>
> > > > >>> On Jul 27, 2017, at 04:04, Damian Guy 
> > wrote:
> > > > >>>
> > > > >>> Hi,
> > > > >>>
> > > > >>> You can't use a regex, but you could use a range query.
> > > > >>> i.e, keyValueStore.range(from, to)
> > > > >>>
> > > > >>> Thanks,
> > > > >>> Damian
> > > > >>>
> > > >  On Wed, 26 Jul 2017 at 22:34 Shekar Tippur 
> > wrote:
> > > > 
> > > >  Hello,
> > > > 
> > > >  I am able to get the kstream to ktable join work. I have some
> use
> > > > cases
> > > >  where the key is not always a exact match.
> > > >  I was wondering if there is a way to lookup keys based on regex.
> > > > 
> > > >  For example,
> > > >  I have these entries for a ktable:
> > > >  test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> > > > 
> > > >  test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> > > > 
> > > >  test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> > > > 
> > > >  blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > > > 
> > > >  and this for a kstream:
> > > > 
> > > >  test_host,{ "source": "test_host", "custom": { "test ": {
> > > > >> "creation_time ":
> > > >  "1234 " } } }
> > > > 
> > > >  In this case, if the exact match does not work, I would like to
> > lookup
> > > >  ktable for all entries that contains "test_host*" in it and have
> > > >  application logic to determine what would be the best fit.
> > > > 
> > > >  Appreciate input.
> > > > 
> > > >  - Shekar
> > > > 
> > > > >>
> > > >
> >
>


Re: Kafka streams regex match

2017-07-27 Thread Shekar Tippur
One more thing.. How do we check if the stateful join operation resulted in
a kstream of some value in it (size of kstream)? How do we check the
content of a kstream?

- S

On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur  wrote:

> Damien,
>
> Thanks a lot for pointing out.
>
> I got a little further. I am kind of stuck with the sequencing. Couple of
> issues:
> 1. I cannot initialise KafkaStreams before the parser.to().
> 2. Do I need to create a new KafkaStreams object when I create a
> KeyValueStore?
> 3. How do I initialize KeyValueIterator with  I seem to
> get a error when I try:
> *KeyValueIterator  kviterator
> = keyValueStore.range("test_nod","test_node");*
>
> /// START CODE /
> //parser is a kstream as a result of join
> if (parser.toString().matches("null")){
>
> ReadOnlyKeyValueStore keyValueStore =
> null;
> KafkaStreams newstreams = new KafkaStreams(builder, props);
> try {
> keyValueStore = 
> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> QueryableStoreTypes.keyValueStore(), newstreams);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> *KeyValueIterator kviterator
> = keyValueStore.range("test_nod","test_node");*
> }else {
>
> *parser.to (stringSerde, jsonSerde, "parser");*}
>
> *KafkaStreams streams = new KafkaStreams(builder, props);*
> streams.start();
>
> /// END CODE /
>
> - S
>
>
>
> On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy  wrote:
> >
> > It is part of the ReadOnlyKeyValueStore interface:
> >
> > https://github.com/apache/kafka/blob/trunk/streams/src/
> main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
> >
> > On Thu, 27 Jul 2017 at 17:17 Shekar Tippur  wrote:
> >
> > > That's cool. This feature is a part of rocksdb object and not ktable?
> > >
> > > Sent from my iPhone
> > >
> > > > On Jul 27, 2017, at 07:57, Damian Guy  wrote:
> > > >
> > > > Yes they can be strings,
> > > >
> > > > so you could do something like:
> > > > store.range("test_host", "test_hosu");
> > > >
> > > > This would return an iterator containing all of the values
> (inclusive)
> > > from
> > > > "test_host" -> "test_hosu".
> > > >
> > > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur 
> wrote:
> > > >>
> > > >> Can you please point me to an example? Can from and to be a string?
> > > >>
> > > >> Sent from my iPhone
> > > >>
> > > >>> On Jul 27, 2017, at 04:04, Damian Guy 
> wrote:
> > > >>>
> > > >>> Hi,
> > > >>>
> > > >>> You can't use a regex, but you could use a range query.
> > > >>> i.e, keyValueStore.range(from, to)
> > > >>>
> > > >>> Thanks,
> > > >>> Damian
> > > >>>
> > >  On Wed, 26 Jul 2017 at 22:34 Shekar Tippur 
> wrote:
> > > 
> > >  Hello,
> > > 
> > >  I am able to get the kstream to ktable join work. I have some use
> > > cases
> > >  where the key is not always a exact match.
> > >  I was wondering if there is a way to lookup keys based on regex.
> > > 
> > >  For example,
> > >  I have these entries for a ktable:
> > >  test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> > > 
> > >  test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> > > 
> > >  test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> > > 
> > >  blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > > 
> > >  and this for a kstream:
> > > 
> > >  test_host,{ "source": "test_host", "custom": { "test ": {
> > > >> "creation_time ":
> > >  "1234 " } } }
> > > 
> > >  In this case, if the exact match does not work, I would like to
> lookup
> > >  ktable for all entries that contains "test_host*" in it and have
> > >  application logic to determine what would be the best fit.
> > > 
> > >  Appreciate input.
> > > 
> > >  - Shekar
> > > 
> > > >>
> > >
>


Re: Kafka streams regex match

2017-07-27 Thread Shekar Tippur
Damien,

Thanks a lot for pointing out.

I got a little further. I am kind of stuck with the sequencing. Couple of
issues:
1. I cannot initialise KafkaStreams before the parser.to().
2. Do I need to create a new KafkaStreams object when I create a
KeyValueStore?
3. How do I initialize KeyValueIterator with  I seem to
get a error when I try:
*KeyValueIterator  kviterator
= keyValueStore.range("test_nod","test_node");*

/// START CODE /
//parser is a kstream as a result of join
if (parser.toString().matches("null")){

ReadOnlyKeyValueStore keyValueStore =
null;
KafkaStreams newstreams = new KafkaStreams(builder, props);
try {
keyValueStore =
IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
QueryableStoreTypes.keyValueStore(), newstreams);
} catch (InterruptedException e) {
e.printStackTrace();
}
*KeyValueIterator kviterator
= keyValueStore.range("test_nod","test_node");*
}else {

*parser.to (stringSerde, jsonSerde, "parser");*}

*KafkaStreams streams = new KafkaStreams(builder, props);*
streams.start();

/// END CODE /

- S


On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy  wrote:
>
> It is part of the ReadOnlyKeyValueStore interface:
>
>
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
>
> On Thu, 27 Jul 2017 at 17:17 Shekar Tippur  wrote:
>
> > That's cool. This feature is a part of rocksdb object and not ktable?
> >
> > Sent from my iPhone
> >
> > > On Jul 27, 2017, at 07:57, Damian Guy  wrote:
> > >
> > > Yes they can be strings,
> > >
> > > so you could do something like:
> > > store.range("test_host", "test_hosu");
> > >
> > > This would return an iterator containing all of the values (inclusive)
> > from
> > > "test_host" -> "test_hosu".
> > >
> > >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur  wrote:
> > >>
> > >> Can you please point me to an example? Can from and to be a string?
> > >>
> > >> Sent from my iPhone
> > >>
> > >>> On Jul 27, 2017, at 04:04, Damian Guy  wrote:
> > >>>
> > >>> Hi,
> > >>>
> > >>> You can't use a regex, but you could use a range query.
> > >>> i.e, keyValueStore.range(from, to)
> > >>>
> > >>> Thanks,
> > >>> Damian
> > >>>
> >  On Wed, 26 Jul 2017 at 22:34 Shekar Tippur 
wrote:
> > 
> >  Hello,
> > 
> >  I am able to get the kstream to ktable join work. I have some use
> > cases
> >  where the key is not always a exact match.
> >  I was wondering if there is a way to lookup keys based on regex.
> > 
> >  For example,
> >  I have these entries for a ktable:
> >  test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> > 
> >  test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> > 
> >  test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> > 
> >  blah,{ "source": "blah_host", "UL1": "blah_l3" }
> > 
> >  and this for a kstream:
> > 
> >  test_host,{ "source": "test_host", "custom": { "test ": {
> > >> "creation_time ":
> >  "1234 " } } }
> > 
> >  In this case, if the exact match does not work, I would like to
lookup
> >  ktable for all entries that contains "test_host*" in it and have
> >  application logic to determine what would be the best fit.
> > 
> >  Appreciate input.
> > 
> >  - Shekar
> > 
> > >>
> >


Re: Kafka streams regex match

2017-07-27 Thread Damian Guy
It is part of the ReadOnlyKeyValueStore interface:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java

On Thu, 27 Jul 2017 at 17:17 Shekar Tippur  wrote:

> That's cool. This feature is a part of rocksdb object and not ktable?
>
> Sent from my iPhone
>
> > On Jul 27, 2017, at 07:57, Damian Guy  wrote:
> >
> > Yes they can be strings,
> >
> > so you could do something like:
> > store.range("test_host", "test_hosu");
> >
> > This would return an iterator containing all of the values (inclusive)
> from
> > "test_host" -> "test_hosu".
> >
> >> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur  wrote:
> >>
> >> Can you please point me to an example? Can from and to be a string?
> >>
> >> Sent from my iPhone
> >>
> >>> On Jul 27, 2017, at 04:04, Damian Guy  wrote:
> >>>
> >>> Hi,
> >>>
> >>> You can't use a regex, but you could use a range query.
> >>> i.e, keyValueStore.range(from, to)
> >>>
> >>> Thanks,
> >>> Damian
> >>>
>  On Wed, 26 Jul 2017 at 22:34 Shekar Tippur  wrote:
> 
>  Hello,
> 
>  I am able to get the kstream to ktable join work. I have some use
> cases
>  where the key is not always a exact match.
>  I was wondering if there is a way to lookup keys based on regex.
> 
>  For example,
>  I have these entries for a ktable:
>  test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> 
>  test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> 
>  test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> 
>  blah,{ "source": "blah_host", "UL1": "blah_l3" }
> 
>  and this for a kstream:
> 
>  test_host,{ "source": "test_host", "custom": { "test ": {
> >> "creation_time ":
>  "1234 " } } }
> 
>  In this case, if the exact match does not work, I would like to lookup
>  ktable for all entries that contains "test_host*" in it and have
>  application logic to determine what would be the best fit.
> 
>  Appreciate input.
> 
>  - Shekar
> 
> >>
>


Re: Kafka streams regex match

2017-07-27 Thread Shekar Tippur
That's cool. This feature is a part of rocksdb object and not ktable?

Sent from my iPhone

> On Jul 27, 2017, at 07:57, Damian Guy  wrote:
> 
> Yes they can be strings,
> 
> so you could do something like:
> store.range("test_host", "test_hosu");
> 
> This would return an iterator containing all of the values (inclusive) from
> "test_host" -> "test_hosu".
> 
>> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur  wrote:
>> 
>> Can you please point me to an example? Can from and to be a string?
>> 
>> Sent from my iPhone
>> 
>>> On Jul 27, 2017, at 04:04, Damian Guy  wrote:
>>> 
>>> Hi,
>>> 
>>> You can't use a regex, but you could use a range query.
>>> i.e, keyValueStore.range(from, to)
>>> 
>>> Thanks,
>>> Damian
>>> 
 On Wed, 26 Jul 2017 at 22:34 Shekar Tippur  wrote:
 
 Hello,
 
 I am able to get the kstream to ktable join work. I have some use cases
 where the key is not always a exact match.
 I was wondering if there is a way to lookup keys based on regex.
 
 For example,
 I have these entries for a ktable:
 test_host1,{ "source": "test_host", "UL1": "test1_l1" }
 
 test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
 
 test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
 
 blah,{ "source": "blah_host", "UL1": "blah_l3" }
 
 and this for a kstream:
 
 test_host,{ "source": "test_host", "custom": { "test ": {
>> "creation_time ":
 "1234 " } } }
 
 In this case, if the exact match does not work, I would like to lookup
 ktable for all entries that contains "test_host*" in it and have
 application logic to determine what would be the best fit.
 
 Appreciate input.
 
 - Shekar
 
>> 


Re: Kafka streams regex match

2017-07-27 Thread Damian Guy
Yes they can be strings,

so you could do something like:
store.range("test_host", "test_hosu");

This would return an iterator containing all of the values (inclusive) from
"test_host" -> "test_hosu".

On Thu, 27 Jul 2017 at 14:48 Shekar Tippur  wrote:

> Can you please point me to an example? Can from and to be a string?
>
> Sent from my iPhone
>
> > On Jul 27, 2017, at 04:04, Damian Guy  wrote:
> >
> > Hi,
> >
> > You can't use a regex, but you could use a range query.
> > i.e, keyValueStore.range(from, to)
> >
> > Thanks,
> > Damian
> >
> >> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur  wrote:
> >>
> >> Hello,
> >>
> >> I am able to get the kstream to ktable join work. I have some use cases
> >> where the key is not always a exact match.
> >> I was wondering if there is a way to lookup keys based on regex.
> >>
> >> For example,
> >> I have these entries for a ktable:
> >> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
> >>
> >> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
> >>
> >> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
> >>
> >> blah,{ "source": "blah_host", "UL1": "blah_l3" }
> >>
> >> and this for a kstream:
> >>
> >> test_host,{ "source": "test_host", "custom": { "test ": {
> "creation_time ":
> >> "1234 " } } }
> >>
> >> In this case, if the exact match does not work, I would like to lookup
> >> ktable for all entries that contains "test_host*" in it and have
> >> application logic to determine what would be the best fit.
> >>
> >> Appreciate input.
> >>
> >> - Shekar
> >>
>


Re: Kafka streams regex match

2017-07-27 Thread Shekar Tippur
Can you please point me to an example? Can from and to be a string?

Sent from my iPhone

> On Jul 27, 2017, at 04:04, Damian Guy  wrote:
> 
> Hi,
> 
> You can't use a regex, but you could use a range query.
> i.e, keyValueStore.range(from, to)
> 
> Thanks,
> Damian
> 
>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur  wrote:
>> 
>> Hello,
>> 
>> I am able to get the kstream to ktable join work. I have some use cases
>> where the key is not always a exact match.
>> I was wondering if there is a way to lookup keys based on regex.
>> 
>> For example,
>> I have these entries for a ktable:
>> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
>> 
>> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
>> 
>> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
>> 
>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
>> 
>> and this for a kstream:
>> 
>> test_host,{ "source": "test_host", "custom": { "test ": { "creation_time ":
>> "1234 " } } }
>> 
>> In this case, if the exact match does not work, I would like to lookup
>> ktable for all entries that contains "test_host*" in it and have
>> application logic to determine what would be the best fit.
>> 
>> Appreciate input.
>> 
>> - Shekar
>> 


Re: Kafka streams regex match

2017-07-27 Thread Damian Guy
Hi,

You can't use a regex, but you could use a range query.
i.e, keyValueStore.range(from, to)

Thanks,
Damian

On Wed, 26 Jul 2017 at 22:34 Shekar Tippur  wrote:

> Hello,
>
> I am able to get the kstream to ktable join work. I have some use cases
> where the key is not always a exact match.
> I was wondering if there is a way to lookup keys based on regex.
>
> For example,
> I have these entries for a ktable:
> test_host1,{ "source": "test_host", "UL1": "test1_l1" }
>
> test_host2,{ "source": "test_host2", "UL1": "test2_l2" }
>
> test_host3,{ "source": "test_host3", "UL1": "test3_l3" }
>
> blah,{ "source": "blah_host", "UL1": "blah_l3" }
>
> and this for a kstream:
>
> test_host,{ "source": "test_host", "custom": { "test ": { "creation_time ":
> "1234 " } } }
>
> In this case, if the exact match does not work, I would like to lookup
> ktable for all entries that contains "test_host*" in it and have
> application logic to determine what would be the best fit.
>
> Appreciate input.
>
> - Shekar
>