I am running this on a mac laptop. I am using defaults. Sent from my iPhone
> On Aug 8, 2017, at 03:11, Damian Guy <damian....@gmail.com> wrote: > > Hi Shekar, that warning is expected during rebalances and should generally > resolve itself. > How many threads/app instances are you running? > It is impossible to tell what is happening with the full logs. > > Thanks, > Damian > >> On Mon, 7 Aug 2017 at 22:46 Shekar Tippur <ctip...@gmail.com> wrote: >> >> Damien, >> >> Thanks for pointing out the error. I had tried a different version of >> initializing the store. >> >> Now that I am able to compile, I started to get the below error. I looked >> up other suggestions for the same error and followed up to upgrade Kafka to >> 0.11.0.0 version. I still get this error :/ >> >> [2017-08-07 14:40:41,264] WARN stream-thread >> [streams-pipe-b67a7ffa-5535-4311-8886-ad6362617dc5-StreamThread-1] Could >> not create task 0_0. Will retry: >> (org.apache.kafka.streams.processor.internals.StreamThread) >> >> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock >> the state directory for task 0_0 >> >> at >> >> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99) >> >> at >> >> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80) >> >> at >> >> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:111) >> >> at >> >> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) >> >> at >> >> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) >> >> at >> >> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) >> >> at >> >> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) >> >> at >> >> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) >> >> at >> >> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) >> >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) >> >> at >> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) >> >> at >> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) >> >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) >> >> at >> >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) >> >> at >> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) >> >> at >> >> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) >> >> at >> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) >> >> at >> >> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) >> >>> On Fri, Aug 4, 2017 at 4:16 PM, Shekar Tippur <ctip...@gmail.com> wrote: >>> >>> Damian, >>> >>> I am getting a syntax error. I have responded on gist. >>> Appreciate any inputs. >>> >>> - Shekar >>> >>> On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy <damian....@gmail.com> >> wrote: >>> >>>> Hi, >>>> >>>> I left a comment on your gist. >>>> >>>> Thanks, >>>> Damian >>>> >>>>> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur <ctip...@gmail.com> wrote: >>>>> >>>>> Damien, >>>>> >>>>> Here is a public gist: >>>>> https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8 >>>>> >>>>> - Shekar >>>>> >>>>> On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy <damian....@gmail.com> >>>> wrote: >>>>> >>>>>> It might be easier if you make a github gist with your code. It is >>>> quite >>>>>> difficult to see what is happening in an email. >>>>>> >>>>>> Cheers, >>>>>> Damian >>>>>> On Fri, 28 Jul 2017 at 19:22, Shekar Tippur <ctip...@gmail.com> >>>> wrote: >>>>>> >>>>>>> Thanks a lot Damien. >>>>>>> I am able to get to see if the join worked (using foreach). I >> tried >>>> to >>>>>> add >>>>>>> the logic to query the store after starting the streams: >>>>>>> Looks like the code is not getting there. Here is the modified >> code: >>>>>>> >>>>>>> KafkaStreams streams = new KafkaStreams(builder, props); >>>>>>> >>>>>>> streams.start(); >>>>>>> >>>>>>> >>>>>>> parser.foreach(new ForeachAction<String, JsonNode>() { >>>>>>> @Override >>>>>>> public void apply(String key, JsonNode value) { >>>>>>> System.out.println(key + ": " + value); >>>>>>> if (value == null){ >>>>>>> System.out.println("null match"); >>>>>>> ReadOnlyKeyValueStore<String, Long> keyValueStore = >>>>>>> null; >>>>>>> try { >>>>>>> keyValueStore = >>>>>>> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store", >>>>>>> QueryableStoreTypes.keyValueStore(), streams); >>>>>>> } catch (InterruptedException e) { >>>>>>> e.printStackTrace(); >>>>>>> } >>>>>>> >>>>>>> KeyValueIterator kviterator = >>>>>>> keyValueStore.range("test_nod","test_node"); >>>>>>> } >>>>>>> } >>>>>>> }); >>>>>>> >>>>>>> >>>>>>> On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy < >> damian....@gmail.com> >>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> The store won't be queryable until after you have called >>>>>> streams.start(). >>>>>>>> No stores have been created until the application is up and >>>> running >>>>> and >>>>>>>> they are dependent on the underlying partitions. >>>>>>>> >>>>>>>> To check that a stateful operation has produced a result you >> would >>>>>>> normally >>>>>>>> add another operation after the join, i.e., >>>>>>>> stream.join(other,...).foreach(..) or >> stream.join(other,...).to(" >>>>>> topic") >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Damian >>>>>>>> >>>>>>>> On Thu, 27 Jul 2017 at 22:52 Shekar Tippur <ctip...@gmail.com> >>>>> wrote: >>>>>>>> >>>>>>>>> One more thing.. How do we check if the stateful join >> operation >>>>>>> resulted >>>>>>>> in >>>>>>>>> a kstream of some value in it (size of kstream)? How do we >> check >>>>> the >>>>>>>>> content of a kstream? >>>>>>>>> >>>>>>>>> - S >>>>>>>>> >>>>>>>>> On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur < >>>> ctip...@gmail.com> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Damien, >>>>>>>>>> >>>>>>>>>> Thanks a lot for pointing out. >>>>>>>>>> >>>>>>>>>> I got a little further. I am kind of stuck with the >>>> sequencing. >>>>>>> Couple >>>>>>>> of >>>>>>>>>> issues: >>>>>>>>>> 1. I cannot initialise KafkaStreams before the parser.to(). >>>>>>>>>> 2. Do I need to create a new KafkaStreams object when I >>>> create a >>>>>>>>>> KeyValueStore? >>>>>>>>>> 3. How do I initialize KeyValueIterator with <String, >>>> JsonNode> I >>>>>>> seem >>>>>>>> to >>>>>>>>>> get a error when I try: >>>>>>>>>> *KeyValueIterator <String,JsonNode> kviterator >>>>>>>>>> = keyValueStore.range("test_nod","test_node");* >>>>>>>>>> >>>>>>>>>> /////// START CODE ///////// >>>>>>>>>> //parser is a kstream as a result of join >>>>>>>>>> if (parser.toString().matches("null")){ >>>>>>>>>> >>>>>>>>>> ReadOnlyKeyValueStore<String, Long> keyValueStore = >>>>>>>>>> null; >>>>>>>>>> KafkaStreams newstreams = new KafkaStreams(builder, >>>> props); >>>>>>>>>> try { >>>>>>>>>> keyValueStore = >>>>>>>>> IntegrationTestUtils.waitUntilStoreIsQueryable("local-store", >>>>>>>>>> QueryableStoreTypes.keyValueStore(), newstreams); >>>>>>>>>> } catch (InterruptedException e) { >>>>>>>>>> e.printStackTrace(); >>>>>>>>>> } >>>>>>>>>> * KeyValueIterator kviterator >>>>>>>>>> = keyValueStore.range("test_nod","test_node");* >>>>>>>>>> }else { >>>>>>>>>> >>>>>>>>>> * parser.to <http://parser.to>(stringSerde, jsonSerde, >>>>>>> "parser");*} >>>>>>>>>> >>>>>>>>>> *KafkaStreams streams = new KafkaStreams(builder, props);* >>>>>>>>>> streams.start(); >>>>>>>>>> >>>>>>>>>> /////// END CODE ///////// >>>>>>>>>> >>>>>>>>>> - S >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy < >>>>> damian....@gmail.com >>>>>>> >>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> It is part of the ReadOnlyKeyValueStore interface: >>>>>>>>>>> >>>>>>>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/ >>>>>>>>>> main/java/org/apache/kafka/streams/state/ >>>>>> ReadOnlyKeyValueStore.java >>>>>>>>>>> >>>>>>>>>>> On Thu, 27 Jul 2017 at 17:17 Shekar Tippur < >>>> ctip...@gmail.com> >>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> That's cool. This feature is a part of rocksdb object >> and >>>> not >>>>>>>> ktable? >>>>>>>>>>>> >>>>>>>>>>>> Sent from my iPhone >>>>>>>>>>>> >>>>>>>>>>>>> On Jul 27, 2017, at 07:57, Damian Guy < >>>>> damian....@gmail.com> >>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Yes they can be strings, >>>>>>>>>>>>> >>>>>>>>>>>>> so you could do something like: >>>>>>>>>>>>> store.range("test_host", "test_hosu"); >>>>>>>>>>>>> >>>>>>>>>>>>> This would return an iterator containing all of the >>>> values >>>>>>>>>> (inclusive) >>>>>>>>>>>> from >>>>>>>>>>>>> "test_host" -> "test_hosu". >>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, 27 Jul 2017 at 14:48 Shekar Tippur < >>>>>> ctip...@gmail.com >>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Can you please point me to an example? Can from and >> to >>>> be >>>>> a >>>>>>>>> string? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Sent from my iPhone >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Jul 27, 2017, at 04:04, Damian Guy < >>>>>> damian....@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> You can't use a regex, but you could use a range >>>> query. >>>>>>>>>>>>>>> i.e, keyValueStore.range(from, to) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, 26 Jul 2017 at 22:34 Shekar Tippur < >>>>>>> ctip...@gmail.com >>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I am able to get the kstream to ktable join work. I >>>> have >>>>>>> some >>>>>>>>> use >>>>>>>>>>>> cases >>>>>>>>>>>>>>>> where the key is not always a exact match. >>>>>>>>>>>>>>>> I was wondering if there is a way to lookup keys >>>> based >>>>> on >>>>>>>> regex. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For example, >>>>>>>>>>>>>>>> I have these entries for a ktable: >>>>>>>>>>>>>>>> test_host1,{ "source": "test_host", "UL1": >>>> "test1_l1" } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> test_host2,{ "source": "test_host2", "UL1": >>>> "test2_l2" } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> test_host3,{ "source": "test_host3", "UL1": >>>> "test3_l3" } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> blah,{ "source": "blah_host", "UL1": "blah_l3" } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> and this for a kstream: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> test_host,{ "source": "test_host", "custom": { >> "test >>>> ": >>>>> { >>>>>>>>>>>>>> "creation_time ": >>>>>>>>>>>>>>>> "1234 " } } } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> In this case, if the exact match does not work, I >>>> would >>>>>> like >>>>>>>> to >>>>>>>>>> lookup >>>>>>>>>>>>>>>> ktable for all entries that contains "test_host*" >> in >>>> it >>>>>> and >>>>>>>> have >>>>>>>>>>>>>>>> application logic to determine what would be the >> best >>>>> fit. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Appreciate input. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - Shekar >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >>