> On Jun 2, 2017, at 2:21 PM, Matthias J. Sax <[email protected]> wrote: > > Hi, > > If you want to populate a GlobalKTable you can only do this by reading a > topic. So the short answer for you head line is: no, you can suppress > the intermediate topic.
Bummer! Maybe this is an opt-in optimization to consider later.
>
> However, I am wondering what the purpose of you secondary index is, and
> why you are using a GlobalKTable for it. Maybe you can elaborate a
> little bit?
Elaborated on this a bit in the other thread, I was trying to keep separate
problems separate, but maybe I just made things more confusing!
tl;dr is that the user requests values knowing K, but there is actually a
"hidden composite key" D that is relevant to the partitioning strategy.
The GlobalKTable allows you to look up K -> D, and then find the right local
KTable K,D -> V
>
> I am also wondering about this code snippet:
>
>>> builder.stream(mainTopic)
>>> .mapValues(...)
>>> .to(secondaryIndex1)
>
> Should it not be .map() that transforms (k,v) ->
> (v.getSecondaryKey1(),k) ? Just for my understanding what you are doing.
>
In this case, the "externally visible" K needs additional information about
the destination D so that it can be partitioned correctly. So the code looks
like:
// TODO: sucks that this materializes an intermediate topic
msgStream
.mapValues(v -> v == null ? null :
v.getResolvedDestination().toString())
.to(Serdes.String(), Serdes.String(), DEST_INDEX);
builder.globalTable(Serdes.String(), Serdes.String(), DEST_INDEX,
DEST_INDEX);
>
> -Matthias
>
>
> On 6/2/17 12:28 PM, Steven Schlansker wrote:
>> Hi everyone, another question for the list :)
>>
>> I'm creating a cluster of KTable (and GlobalKTable) based off the same
>> input stream K,V.
>>
>> It has a number of secondary indices (think like a RDBMS)
>> K1 -> K
>> K2 -> K
>> etc
>>
>> These are all based off of trivial mappings from my main stream that also
>> feeds the K -> V StateStore. Think one liners like v -> v.getSecondaryKey1()
>>
>> Currently, for each one it seems that I have to do
>>
>> builder.stream(mainTopic)
>> .mapValues(...)
>> .to(secondaryIndex1)
>>
>> builder.globalTable(secondaryIndex1, secondaryIndexStore1);
>>
>> Unfortunately the intermediate "secondaryIndex1" topic is relatively
>> low value. In a case where my state stores are lost, I already have to
>> read through the mainTopic to recover the main state store. While it's doing
>> that, I'd much rather it rebuild the GlobalKTable instance from that data
>> directly. Then I could skip having this index in Kafka at all, it's entirely
>> redundant. The data is already loaded and deserialized for the benefit of
>> another Processor.
>>
>> Any thoughts? Happy Friday,
>> Steven
>>
>
signature.asc
Description: Message signed with OpenPGP using GPGMail
