As you read the KTable from a topic via KStreamBuilder#table("my-table-topic") you should set log cleanup policy to "compacted" for "my-table-topic").
-Matthias On 2/13/17 4:49 AM, Eno Thereska wrote: > Hi Jon, > > If I understand your question correctly: > - any new KTables created by the DSL will automatically get the right policy. > You don't need to do anything special. > - otherwise you'll have to set the policy on the Kafka topic. > > Eno > >> On 13 Feb 2017, at 11:16, Jon Yeargers <jon.yearg...@cedexis.com> wrote: >> >> If Im doing a KStream.leftJoin(KTable) how would I set this configuration >> for just the KTable portion? >> >> IE I have >> >> KStream = KStreamBuilder.stream() >> KTable = KStreamBuilder.table() >> >> ... >> (join occurs.. data flows.. ppl are brought closer together.. there is >> peace in the valley.. for me... ) >> ... >> >> KafkaStreams = new KafkaStream(KStreamBuilder, >> config_with_cleanup_policy_or_not?) >> KafkaStream.start >> >> On Wed, Feb 8, 2017 at 12:30 PM, Eno Thereska <eno.there...@gmail.com> >> wrote: >> >>> Yeah makes sense. I was looking at it from the point of view of keeping >>> all data forever. >>> >>> Eno >>> >>>> On 8 Feb 2017, at 20:27, Matthias J. Sax <matth...@confluent.io> wrote: >>>> >>>> Yes, that could happen if a key was not updated for a longer period than >>>> topic retention time. >>>> >>>> If you want to force a changelog creation, you can do a dummy aggregate >>>> instead of using KStreamBuilder#table() >>>> >>>> >>>>> KTable table = KStreamBuilder.stream("topic").groupByKey().reduce(new >>> Reducer() { >>>>> @Override >>>>> public Object apply(Object oldValue, Object newValue) { >>>>> return newValue; >>>>> } >>>>> }, "someStoreName"); >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 2/8/17 11:39 AM, Mathieu Fenniak wrote: >>>>> I think there could be correctness implications... the default >>>>> cleanup.policy of delete would mean that topic entries past the >>> retention >>>>> policy might have been removed. If you scale up the application, new >>>>> application instances won't be able to restore a complete table into its >>>>> local state store. An operation like a join against that KTable would >>> find >>>>> no records where there should be record. >>>>> >>>>> Mathieu >>>>> >>>>> >>>>> On Wed, Feb 8, 2017 at 12:15 PM, Eno Thereska <eno.there...@gmail.com> >>>>> wrote: >>>>> >>>>>> If you fail to set the policy to compact, there shouldn't be any >>>>>> correctness implications, however your topics will grow larger than >>>>>> necessary. >>>>>> >>>>>> Eno >>>>>> >>>>>>> On 8 Feb 2017, at 18:56, Jon Yeargers <jon.yearg...@cedexis.com> >>> wrote: >>>>>>> >>>>>>> What are the ramifications of failing to do this? >>>>>>> >>>>>>> On Tue, Feb 7, 2017 at 9:16 PM, Matthias J. Sax < >>> matth...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> Yes, that is correct. >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> On 2/7/17 6:39 PM, Mathieu Fenniak wrote: >>>>>>>>> Hey kafka users, >>>>>>>>> >>>>>>>>> Is it correct that a Kafka topic that is used for a KTable should be >>>>>> set >>>>>>>> to >>>>>>>>> cleanup.policy=compact? >>>>>>>>> >>>>>>>>> I've never noticed until today that the KStreamBuilder#table() >>>>>>>>> documentation says: "However, no internal changelog topic is created >>>>>>>> since >>>>>>>>> the original input topic can be used for recovery"... [1], which >>> seems >>>>>>>> like >>>>>>>>> it is only true if the topic is configured for compaction. >>> Otherwise >>>>>> the >>>>>>>>> original input topic won't necessarily contain the data necessary >>> for >>>>>>>>> recovery of the state store. >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf07951 >>>>>>>> 7523c83060/streams/src/main/java/org/apache/kafka/streams/ >>>>>>>> kstream/KStreamBuilder.java#L355 >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Mathieu >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >>> >
signature.asc
Description: OpenPGP digital signature