I am running this on a mac laptop. I am using defaults.

Sent from my iPhone

> On Aug 8, 2017, at 03:11, Damian Guy <damian....@gmail.com> wrote:
> 
> Hi Shekar, that warning is expected during rebalances and should generally
> resolve itself.
> How many threads/app instances are you running?
> It is impossible to tell what is happening with the full logs.
> 
> Thanks,
> Damian
> 
>> On Mon, 7 Aug 2017 at 22:46 Shekar Tippur <ctip...@gmail.com> wrote:
>> 
>> Damien,
>> 
>> Thanks for pointing out the error. I had tried a different version of
>> initializing the store.
>> 
>> Now that I am able to compile, I started to get the below error. I looked
>> up other suggestions for the same error and followed up to upgrade Kafka to
>> 0.11.0.0 version. I still get this error :/
>> 
>> [2017-08-07 14:40:41,264] WARN stream-thread
>> [streams-pipe-b67a7ffa-5535-4311-8886-ad6362617dc5-StreamThread-1] Could
>> not create task 0_0. Will retry:
>> (org.apache.kafka.streams.processor.internals.StreamThread)
>> 
>> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock
>> the state directory for task 0_0
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:111)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>> 
>> at
>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>> 
>> at
>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>> 
>>> On Fri, Aug 4, 2017 at 4:16 PM, Shekar Tippur <ctip...@gmail.com> wrote:
>>> 
>>> Damian,
>>> 
>>> I am getting a syntax error. I have responded on gist.
>>> Appreciate any inputs.
>>> 
>>> - Shekar
>>> 
>>> On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy <damian....@gmail.com>
>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I left a comment on your gist.
>>>> 
>>>> Thanks,
>>>> Damian
>>>> 
>>>>> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur <ctip...@gmail.com> wrote:
>>>>> 
>>>>> Damien,
>>>>> 
>>>>> Here is a public gist:
>>>>> https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
>>>>> 
>>>>> - Shekar
>>>>> 
>>>>> On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy <damian....@gmail.com>
>>>> wrote:
>>>>> 
>>>>>> It might be easier if you make a github gist with your code. It is
>>>> quite
>>>>>> difficult to see what is happening in an email.
>>>>>> 
>>>>>> Cheers,
>>>>>> Damian
>>>>>> On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ctip...@gmail.com>
>>>> wrote:
>>>>>> 
>>>>>>> Thanks a lot Damien.
>>>>>>> I am able to get to see if the join worked (using foreach). I
>> tried
>>>> to
>>>>>> add
>>>>>>> the logic to query the store after starting the streams:
>>>>>>> Looks like the code is not getting there. Here is the modified
>> code:
>>>>>>> 
>>>>>>> KafkaStreams streams = new KafkaStreams(builder, props);
>>>>>>> 
>>>>>>> streams.start();
>>>>>>> 
>>>>>>> 
>>>>>>> parser.foreach(new ForeachAction<String, JsonNode>() {
>>>>>>>    @Override
>>>>>>>    public void apply(String key, JsonNode value) {
>>>>>>>        System.out.println(key + ": " + value);
>>>>>>>        if (value == null){
>>>>>>>            System.out.println("null match");
>>>>>>>            ReadOnlyKeyValueStore<String, Long> keyValueStore =
>>>>>>>                    null;
>>>>>>>            try {
>>>>>>>                keyValueStore =
>>>>>>> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
>>>>>>> QueryableStoreTypes.keyValueStore(), streams);
>>>>>>>            } catch (InterruptedException e) {
>>>>>>>                e.printStackTrace();
>>>>>>>            }
>>>>>>> 
>>>>>>>            KeyValueIterator  kviterator =
>>>>>>> keyValueStore.range("test_nod","test_node");
>>>>>>>        }
>>>>>>>    }
>>>>>>> });
>>>>>>> 
>>>>>>> 
>>>>>>> On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy <
>> damian....@gmail.com>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> The store won't be queryable until after you have called
>>>>>> streams.start().
>>>>>>>> No stores have been created until the application is up and
>>>> running
>>>>> and
>>>>>>>> they are dependent on the underlying partitions.
>>>>>>>> 
>>>>>>>> To check that a stateful operation has produced a result you
>> would
>>>>>>> normally
>>>>>>>> add another operation after the join, i.e.,
>>>>>>>> stream.join(other,...).foreach(..) or
>> stream.join(other,...).to("
>>>>>> topic")
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Damian
>>>>>>>> 
>>>>>>>> On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ctip...@gmail.com>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> One more thing.. How do we check if the stateful join
>> operation
>>>>>>> resulted
>>>>>>>> in
>>>>>>>>> a kstream of some value in it (size of kstream)? How do we
>> check
>>>>> the
>>>>>>>>> content of a kstream?
>>>>>>>>> 
>>>>>>>>> - S
>>>>>>>>> 
>>>>>>>>> On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <
>>>> ctip...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Damien,
>>>>>>>>>> 
>>>>>>>>>> Thanks a lot for pointing out.
>>>>>>>>>> 
>>>>>>>>>> I got a little further. I am kind of stuck with the
>>>> sequencing.
>>>>>>> Couple
>>>>>>>> of
>>>>>>>>>> issues:
>>>>>>>>>> 1. I cannot initialise KafkaStreams before the parser.to().
>>>>>>>>>> 2. Do I need to create a new KafkaStreams object when I
>>>> create a
>>>>>>>>>> KeyValueStore?
>>>>>>>>>> 3. How do I initialize KeyValueIterator with <String,
>>>> JsonNode> I
>>>>>>> seem
>>>>>>>> to
>>>>>>>>>> get a error when I try:
>>>>>>>>>> *KeyValueIterator <String,JsonNode> kviterator
>>>>>>>>>> = keyValueStore.range("test_nod","test_node");*
>>>>>>>>>> 
>>>>>>>>>> /////// START CODE /////////
>>>>>>>>>> //parser is a kstream as a result of join
>>>>>>>>>> if (parser.toString().matches("null")){
>>>>>>>>>> 
>>>>>>>>>>    ReadOnlyKeyValueStore<String, Long> keyValueStore =
>>>>>>>>>>            null;
>>>>>>>>>>    KafkaStreams newstreams = new KafkaStreams(builder,
>>>> props);
>>>>>>>>>>    try {
>>>>>>>>>>        keyValueStore =
>>>>>>>>> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
>>>>>>>>>> QueryableStoreTypes.keyValueStore(), newstreams);
>>>>>>>>>>    } catch (InterruptedException e) {
>>>>>>>>>>        e.printStackTrace();
>>>>>>>>>>    }
>>>>>>>>>> *    KeyValueIterator kviterator
>>>>>>>>>> = keyValueStore.range("test_nod","test_node");*
>>>>>>>>>> }else {
>>>>>>>>>> 
>>>>>>>>>> *    parser.to <http://parser.to>(stringSerde, jsonSerde,
>>>>>>> "parser");*}
>>>>>>>>>> 
>>>>>>>>>> *KafkaStreams streams = new KafkaStreams(builder, props);*
>>>>>>>>>> streams.start();
>>>>>>>>>> 
>>>>>>>>>> /////// END CODE /////////
>>>>>>>>>> 
>>>>>>>>>> - S
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <
>>>>> damian....@gmail.com
>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> It is part of the ReadOnlyKeyValueStore interface:
>>>>>>>>>>> 
>>>>>>>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/
>>>>>>>>>> main/java/org/apache/kafka/streams/state/
>>>>>> ReadOnlyKeyValueStore.java
>>>>>>>>>>> 
>>>>>>>>>>> On Thu, 27 Jul 2017 at 17:17 Shekar Tippur <
>>>> ctip...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> That's cool. This feature is a part of rocksdb object
>> and
>>>> not
>>>>>>>> ktable?
>>>>>>>>>>>> 
>>>>>>>>>>>> Sent from my iPhone
>>>>>>>>>>>> 
>>>>>>>>>>>>> On Jul 27, 2017, at 07:57, Damian Guy <
>>>>> damian....@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Yes they can be strings,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> so you could do something like:
>>>>>>>>>>>>> store.range("test_host", "test_hosu");
>>>>>>>>>>>>> 
>>>>>>>>>>>>> This would return an iterator containing all of the
>>>> values
>>>>>>>>>> (inclusive)
>>>>>>>>>>>> from
>>>>>>>>>>>>> "test_host" -> "test_hosu".
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur <
>>>>>> ctip...@gmail.com
>>>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Can you please point me to an example? Can from and
>> to
>>>> be
>>>>> a
>>>>>>>>> string?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Sent from my iPhone
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Jul 27, 2017, at 04:04, Damian Guy <
>>>>>> damian....@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> You can't use a regex, but you could use a range
>>>> query.
>>>>>>>>>>>>>>> i.e, keyValueStore.range(from, to)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur <
>>>>>>> ctip...@gmail.com
>>>>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I am able to get the kstream to ktable join work. I
>>>> have
>>>>>>> some
>>>>>>>>> use
>>>>>>>>>>>> cases
>>>>>>>>>>>>>>>> where the key is not always a exact match.
>>>>>>>>>>>>>>>> I was wondering if there is a way to lookup keys
>>>> based
>>>>> on
>>>>>>>> regex.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> For example,
>>>>>>>>>>>>>>>> I have these entries for a ktable:
>>>>>>>>>>>>>>>> test_host1,{ "source": "test_host", "UL1":
>>>> "test1_l1" }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> test_host2,{ "source": "test_host2", "UL1":
>>>> "test2_l2" }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> test_host3,{ "source": "test_host3", "UL1":
>>>> "test3_l3" }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> blah,{ "source": "blah_host", "UL1": "blah_l3" }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> and this for a kstream:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> test_host,{ "source": "test_host", "custom": {
>> "test
>>>> ":
>>>>> {
>>>>>>>>>>>>>> "creation_time ":
>>>>>>>>>>>>>>>> "1234 " } } }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> In this case, if the exact match does not work, I
>>>> would
>>>>>> like
>>>>>>>> to
>>>>>>>>>> lookup
>>>>>>>>>>>>>>>> ktable for all entries that contains "test_host*"
>> in
>>>> it
>>>>>> and
>>>>>>>> have
>>>>>>>>>>>>>>>> application logic to determine what would be the
>> best
>>>>> fit.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Appreciate input.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> - Shekar
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 

Reply via email to