hm, strange. It keeps appending records, even in the state store. The number of records grows for each run.
/Artur On Thu, Dec 14, 2017 at 8:18 PM, Artur Mrozowski <art...@gmail.com> wrote: > Ok I see, what was the default value before I've changed it? > > On Thu, Dec 14, 2017 at 7:47 PM, Artur Mrozowski <art...@gmail.com> wrote: > >> Hi Gouzhang, >> thank you for the answer. Indeed the value is being populated now, >> however the application behaves oddly and not how it used to. I suspect >> that disabling caching by setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0 >> has been persisited somehow. >> >> It seems as log compaction has been disabled permanently. What I observe >> now is rather log append. For each run the output will be more and more >> duplicates. >> >> Normally I would have hard time to reproduce duplication with that number >> of records, 3 in this case. I am trying to implement same idea as you >> described in KIP 150. Normally I would not observe duplicates until >> aggregation in line 495 >> >> https://github.com/afuyo/KStreamsDemo/blob/master/src/main/ >> java/kstream.demo/CustomerStreamPipelineHDI.java#L425 >> >> I could get rid of large number of duplicates using exactly once >> semantics but not anymore. I run on version 0.11 of Kafka Streams. What do >> you think could be causing it? Is version 1.0 more stable in this aspect? >> Best regards >> Artur >> >> On Thu, Dec 14, 2017 at 6:42 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >>> Artur, >>> >>> This is because Properties#getProperty() is expecting a String value, and >>> hence 10 * 1024 * 1024L is not recognized; you can try "10485760". >>> >>> >>> Guozhang >>> >>> On Wed, Dec 13, 2017 at 10:51 PM, Artur Mrozowski <art...@gmail.com> >>> wrote: >>> >>> > Sure. >>> > >>> > Another observation I've made is that before I started modifying these >>> > properties I could spot quite a few duplicates in the state store. >>> Then I >>> > applied exactly once semantics which removed most of the duplicates. >>> > Finally I disabled cache by setting CACHE_MAX_BYTES_BUFFERING_CONFIG >>> to 0 >>> > which duplicates each record. Since then I've been trying to reenable >>> it. >>> > >>> > StreamsConfig config = new StreamsConfig(getProperties()); >>> > >>> > >>> > System.out.println(getProperties().getProperty(StreamsConfig >>> .PROCESSING_ >>> > GUARANTEE_CONFIG)); >>> > >>> > System.out.println(getProperties().getProperty( >>> > StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)); >>> > >>> > System.out.println(getProperties().getProperty(StreamsConfig >>> .STATE_DIR_ >>> > CONFIG)); >>> > >>> > exactly_once >>> > null >>> > /tmp/customerStoreLocal6 >>> > >>> > >>> > private static Properties getProperties() { >>> > Properties settings = new Properties(); >>> > settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); >>> > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>> > "localhost:9092"); >>> > settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, >>> > "localhost:2181"); >>> > settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, >>> > Serdes.String().getClass()); >>> > settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, >>> > Serdes.String().getClass()); >>> > >>> > settings.put(StreamsConfig.STATE_DIR_CONFIG,"/tmp/customerSt >>> oreLocal6"); >>> > settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, >>> > WallclockTimestampExtractor.class); >>> > >>> > settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,"exac >>> tly_once"); >>> > settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earli >>> est"); >>> > settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10 >>> * >>> > 1024 * 1024L); >>> > return settings; >>> > } >>> > >>> > On Wed, Dec 13, 2017 at 11:53 PM, Guozhang Wang <wangg...@gmail.com> >>> > wrote: >>> > >>> > > Could you show us the testing code snippet that shows how you set the >>> > > configs and how you read from it for verification? >>> > > >>> > > >>> > > >>> > > Guozhang >>> > > >>> > > On Wed, Dec 13, 2017 at 1:07 PM, Artur Mrozowski <art...@gmail.com> >>> > wrote: >>> > > >>> > > > Hello Guozhang, >>> > > > >>> > > > I print out some values that I assign to StreamsConfig in the >>> console, >>> > > but >>> > > > the CACHE_MAX_BYTES_BUFFERING_CONFIG is always null. I disabled >>> > caching >>> > > by >>> > > > setting it to 0 today, and it seems to have the expected effect. >>> > > > But after this I am not able to assign any value to it, it is >>> always >>> > nul. >>> > > > >>> > > > Best Regards >>> > > > Artur >>> > > > >>> > > > On Wed, Dec 13, 2017 at 5:44 PM, Guozhang Wang <wangg...@gmail.com >>> > >>> > > wrote: >>> > > > >>> > > > > Hello Artur, >>> > > > > >>> > > > > What do you mean exactly by "It simply returns null no matter >>> what >>> > > value >>> > > > I >>> > > > > provide."? >>> > > > > >>> > > > > >>> > > > > Guozhang >>> > > > > >>> > > > > >>> > > > > On Wed, Dec 13, 2017 at 8:02 AM, Artur Mrozowski < >>> art...@gmail.com> >>> > > > wrote: >>> > > > > >>> > > > > > Hi Bill, >>> > > > > > No, but I'll be happy to generate it. How do I generate logs >>> for >>> > > > > > StreamsConfig? >>> > > > > > >>> > > > > > Thanks, >>> > > > > > Artur >>> > > > > > >>> > > > > > On Wed, Dec 13, 2017 at 3:44 PM, Bill Bejeck < >>> b...@confluent.io> >>> > > > wrote: >>> > > > > > >>> > > > > > > H Artur, >>> > > > > > > >>> > > > > > > Do you have any log files you can share for this issue? >>> > > > > > > >>> > > > > > > Thanks, >>> > > > > > > Bill >>> > > > > > > >>> > > > > > > On Wed, Dec 13, 2017 at 8:15 AM, Artur Mrozowski < >>> > art...@gmail.com >>> > > > >>> > > > > > wrote: >>> > > > > > > >>> > > > > > > > Actually I can see all other properties being set, except >>> for >>> > > > > > > > CACHE_MAX_BYTES_BUFFERING_CONFIG that is null. >>> > > > > > > > I use 0.11.0.2 Kafka Streams. >>> > > > > > > > Has anyone encountered this issue? >>> > > > > > > > >>> > > > > > > > /Artur >>> > > > > > > > >>> > > > > > > > On Wed, Dec 13, 2017 at 1:11 PM, Artur Mrozowski < >>> > > art...@gmail.com >>> > > > > >>> > > > > > > wrote: >>> > > > > > > > >>> > > > > > > > > Hi, >>> > > > > > > > > I played around with caching on Confluent platform 3.3 by >>> > first >>> > > > > > > > disabling, >>> > > > > > > > > setting to zero. Now, it seems I can not enable it >>> again. It >>> > > > simply >>> > > > > > > > returns >>> > > > > > > > > null no matter what value I provide. >>> > > > > > > > > >>> > > > > > > > > e.g >>> > > > > > > > > settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_ >>> > > > > > > > > CONFIG,10*1024*1024L); >>> > > > > > > > > >>> > > > > > > > > How can I enable it again? It generates a lot of >>> duplicates. >>> > > > > > > > > >>> > > > > > > > > Best Regards >>> > > > > > > > > Artur >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > > >>> > > > > >>> > > > > -- >>> > > > > -- Guozhang >>> > > > > >>> > > > >>> > > >>> > > >>> > > >>> > > -- >>> > > -- Guozhang >>> > > >>> > >>> >>> >>> >>> -- >>> -- Guozhang >>> >> >> >