As you read the KTable from a topic via
KStreamBuilder#table("my-table-topic") you should set log cleanup policy
to "compacted" for "my-table-topic").


-Matthias

On 2/13/17 4:49 AM, Eno Thereska wrote:
> Hi Jon,
> 
> If I understand your question correctly:
> - any new KTables created by the DSL will automatically get the right policy. 
> You don't need to do anything special.
> - otherwise you'll have to set the policy on the Kafka topic. 
> 
> Eno
> 
>> On 13 Feb 2017, at 11:16, Jon Yeargers <jon.yearg...@cedexis.com> wrote:
>>
>> If Im doing a KStream.leftJoin(KTable) how would I set this configuration
>> for just the KTable portion?
>>
>> IE I have
>>
>> KStream = KStreamBuilder.stream()
>> KTable = KStreamBuilder.table()
>>
>> ...
>> (join occurs.. data flows.. ppl are brought closer together.. there is
>> peace in the valley.. for me... )
>> ...
>>
>> KafkaStreams = new KafkaStream(KStreamBuilder,
>> config_with_cleanup_policy_or_not?)
>> KafkaStream.start
>>
>> On Wed, Feb 8, 2017 at 12:30 PM, Eno Thereska <eno.there...@gmail.com>
>> wrote:
>>
>>> Yeah makes sense. I was looking at it from the point of view of keeping
>>> all data forever.
>>>
>>> Eno
>>>
>>>> On 8 Feb 2017, at 20:27, Matthias J. Sax <matth...@confluent.io> wrote:
>>>>
>>>> Yes, that could happen if a key was not updated for a longer period than
>>>> topic retention time.
>>>>
>>>> If you want to force a changelog creation, you can do a dummy aggregate
>>>> instead of using KStreamBuilder#table()
>>>>
>>>>
>>>>> KTable table = KStreamBuilder.stream("topic").groupByKey().reduce(new
>>> Reducer() {
>>>>>   @Override
>>>>>   public Object apply(Object oldValue, Object newValue) {
>>>>>       return newValue;
>>>>>   }
>>>>> }, "someStoreName");
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 2/8/17 11:39 AM, Mathieu Fenniak wrote:
>>>>> I think there could be correctness implications... the default
>>>>> cleanup.policy of delete would mean that topic entries past the
>>> retention
>>>>> policy might have been removed.  If you scale up the application, new
>>>>> application instances won't be able to restore a complete table into its
>>>>> local state store.  An operation like a join against that KTable would
>>> find
>>>>> no records where there should be record.
>>>>>
>>>>> Mathieu
>>>>>
>>>>>
>>>>> On Wed, Feb 8, 2017 at 12:15 PM, Eno Thereska <eno.there...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> If you fail to set the policy to compact, there shouldn't be any
>>>>>> correctness implications, however your topics will grow larger than
>>>>>> necessary.
>>>>>>
>>>>>> Eno
>>>>>>
>>>>>>> On 8 Feb 2017, at 18:56, Jon Yeargers <jon.yearg...@cedexis.com>
>>> wrote:
>>>>>>>
>>>>>>> What are the ramifications of failing to do this?
>>>>>>>
>>>>>>> On Tue, Feb 7, 2017 at 9:16 PM, Matthias J. Sax <
>>> matth...@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes, that is correct.
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 2/7/17 6:39 PM, Mathieu Fenniak wrote:
>>>>>>>>> Hey kafka users,
>>>>>>>>>
>>>>>>>>> Is it correct that a Kafka topic that is used for a KTable should be
>>>>>> set
>>>>>>>> to
>>>>>>>>> cleanup.policy=compact?
>>>>>>>>>
>>>>>>>>> I've never noticed until today that the KStreamBuilder#table()
>>>>>>>>> documentation says: "However, no internal changelog topic is created
>>>>>>>> since
>>>>>>>>> the original input topic can be used for recovery"... [1], which
>>> seems
>>>>>>>> like
>>>>>>>>> it is only true if the topic is configured for compaction.
>>> Otherwise
>>>>>> the
>>>>>>>>> original input topic won't necessarily contain the data necessary
>>> for
>>>>>>>>> recovery of the state store.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf07951
>>>>>>>> 7523c83060/streams/src/main/java/org/apache/kafka/streams/
>>>>>>>> kstream/KStreamBuilder.java#L355
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Mathieu
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to