Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-12 Thread Matthias J. Sax

Glad to hear you figured it out (and that it's not a KS bug :)).

It's a contract in Kafka that serdes must do a `null <-> null` mapping. 
If that's not well documented / unclear, we should update the docs.


Maybe 
https://kafka.apache.org/33/documentation/streams/developer-guide/datatypes.html#implementing-custom-serdes 
and/or the JavaDocs of the corresponding interfaces?



-Matthias


On 12/12/22 9:44 AM, Colt McNealy wrote:

Patrick—Glad you got it cleared up, and good find re: empty byte[] vs null.

KAFKA-7663 is very interesting. Just yesterday I thought of a use-case
where supplying the processor for the Global State Store would be
useful—what if you have an in-memory object that computes and caches
aggregations of what's in the state store. You can derive that view by
querying the state store, but such queries are expensive; it's far better
to have them cached in an in-memory POJO. But to keep that POJO up-to-date,
you need to be alerted every time an event comes into the state store.

I think the best way to implement that (if I were to submit a KIP) would be
to:

- Deprecate the ability to add a processor
- Add an optional "onChange" callback that is called every time a new
record is processed.

There's lots of details to be ironed out; and furthermore this is a big API
change so it would be slow to implement.

Colt McNealy
*Founder, LittleHorse.io*


On Mon, Dec 12, 2022 at 3:30 AM Patrick D’Addona
 wrote:


No it does not encrypt the keys. And it works fine for key like "bar"
where the latest record on the topic is not a tombstone.

But that got me thinking about how the values are actually written and
read from kafka and I found the issue in my case, it was related to the
serializer not writing actual "null" values onto the topic, but empty
byte[] arrays instead.
The serializer I used looks like this
```java
public byte[] serialize(final String s, final T data) {
 if (data == null) {
 return new byte[0];
 }
 try {
 return objectMapper.writeValueAsBytes(data);
 } catch (final IOException e) {
 throw new SerializationException(e);
 }
}
```

And then during restore in
`org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState`
```java
for (final ConsumerRecord record :
records.records(topicPartition)) {
 if (record.key() != null) {
 restoreRecords.add(recordConverter.convert(record));
 }
}
```
using
`org.apache.kafka.streams.state.internals.RecordConverters#RAW_TO_TIMESTAMED_INSTANCE`
that empty array is turned into an 8 byte timestamp

```java
it actually checks the value for `null` but not an empty array
final byte[] recordValue = rawValue == null ? null :
 ByteBuffer.allocate(8 + rawValue.length)
 .putLong(timestamp)
 .put(rawValue)
 .array();
```

that is then passed to
`org.apache.kafka.streams.state.internals.RocksDBStore.SingleColumnFamilyAccessor#addToBatch`
and gets "put" instead of "delete" because it's not null
```java
public void addToBatch(final byte[] key,
final byte[] value,
final WriteBatch batch) throws RocksDBException {
 if (value == null) {
 batch.delete(columnFamily, key);
 } else {
 batch.put(columnFamily, key, value);
 }
}
```

That's why `store.get("foo")` gives "null" because it actually finds the
empty "byte[]" record and my Deserializer turns it into null.
So Colt McNealy is right, this is exactly the issue from KAFKA-7663, I
just did not see it until I found that the values on the topic which akhq
shows as "null" in the visualization and are treated like null everywhere
in my applicaton are not actually real tombstones to RocksDB and the
restore process.

So this thread can be closed, since there is nothing in the way kafka
streams behaves here, just another implication of KAFKA-7663 that might
confuse users.
But also has the chance to lead them onto very interesting deep dives into
kafka streams ;-)

Thanks for all the responses!
Kind regards,
Patrick


From: Colt McNealy 
Sent: Saturday, December 10, 2022 00:33
To: dev@kafka.apache.org 
Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally
restores previously deleted records

Does the KeySerde that you provided to your store encrypt the keys? I've
never done so myself, but I've seen others report similar behavior (the
store iterator shows the correct values but store.get('foo') returns null)
in the Confluent Community slack. Here's a relevant message:


"From the behaviour in your code snippet above, I would say that the key

is stored in encrypted form. The deserializer decrypts it correctly (thus
you see the key and value while iterating). But when you requests the key
individually you are passing it in plain text (the serializer might not be
encrypting) and it’s not

Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-12 Thread Colt McNealy
Patrick—Glad you got it cleared up, and good find re: empty byte[] vs null.

KAFKA-7663 is very interesting. Just yesterday I thought of a use-case
where supplying the processor for the Global State Store would be
useful—what if you have an in-memory object that computes and caches
aggregations of what's in the state store. You can derive that view by
querying the state store, but such queries are expensive; it's far better
to have them cached in an in-memory POJO. But to keep that POJO up-to-date,
you need to be alerted every time an event comes into the state store.

I think the best way to implement that (if I were to submit a KIP) would be
to:

   - Deprecate the ability to add a processor
   - Add an optional "onChange" callback that is called every time a new
   record is processed.

There's lots of details to be ironed out; and furthermore this is a big API
change so it would be slow to implement.

Colt McNealy
*Founder, LittleHorse.io*


On Mon, Dec 12, 2022 at 3:30 AM Patrick D’Addona
 wrote:

> No it does not encrypt the keys. And it works fine for key like "bar"
> where the latest record on the topic is not a tombstone.
>
> But that got me thinking about how the values are actually written and
> read from kafka and I found the issue in my case, it was related to the
> serializer not writing actual "null" values onto the topic, but empty
> byte[] arrays instead.
> The serializer I used looks like this
> ```java
> public byte[] serialize(final String s, final T data) {
> if (data == null) {
> return new byte[0];
> }
> try {
> return objectMapper.writeValueAsBytes(data);
> } catch (final IOException e) {
> throw new SerializationException(e);
> }
> }
> ```
>
> And then during restore in
> `org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState`
> ```java
> for (final ConsumerRecord record :
> records.records(topicPartition)) {
> if (record.key() != null) {
> restoreRecords.add(recordConverter.convert(record));
> }
> }
> ```
> using
> `org.apache.kafka.streams.state.internals.RecordConverters#RAW_TO_TIMESTAMED_INSTANCE`
> that empty array is turned into an 8 byte timestamp
>
> ```java
> it actually checks the value for `null` but not an empty array
> final byte[] recordValue = rawValue == null ? null :
> ByteBuffer.allocate(8 + rawValue.length)
> .putLong(timestamp)
> .put(rawValue)
> .array();
> ```
>
> that is then passed to
> `org.apache.kafka.streams.state.internals.RocksDBStore.SingleColumnFamilyAccessor#addToBatch`
> and gets "put" instead of "delete" because it's not null
> ```java
> public void addToBatch(final byte[] key,
>final byte[] value,
>final WriteBatch batch) throws RocksDBException {
> if (value == null) {
> batch.delete(columnFamily, key);
> } else {
> batch.put(columnFamily, key, value);
> }
> }
> ```
>
> That's why `store.get("foo")` gives "null" because it actually finds the
> empty "byte[]" record and my Deserializer turns it into null.
> So Colt McNealy is right, this is exactly the issue from KAFKA-7663, I
> just did not see it until I found that the values on the topic which akhq
> shows as "null" in the visualization and are treated like null everywhere
> in my applicaton are not actually real tombstones to RocksDB and the
> restore process.
>
> So this thread can be closed, since there is nothing in the way kafka
> streams behaves here, just another implication of KAFKA-7663 that might
> confuse users.
> But also has the chance to lead them onto very interesting deep dives into
> kafka streams ;-)
>
> Thanks for all the responses!
> Kind regards,
> Patrick
>
> 
> From: Colt McNealy 
> Sent: Saturday, December 10, 2022 00:33
> To: dev@kafka.apache.org 
> Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally
> restores previously deleted records
>
> Does the KeySerde that you provided to your store encrypt the keys? I've
> never done so myself, but I've seen others report similar behavior (the
> store iterator shows the correct values but store.get('foo') returns null)
> in the Confluent Community slack. Here's a relevant message:
>
> > "From the behaviour in your code snippet above, I would say that the key
> is stored in encrypted form. The deserializer decrypts it correctly (thus
> you see the key and value while iterating). But when you requests the key
> individually you are passing it in plain text (the serializer might not be
> encrypting) and it’s not found

Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-12 Thread Patrick D’Addona
No it does not encrypt the keys. And it works fine for key like "bar" where the 
latest record on the topic is not a tombstone.

But that got me thinking about how the values are actually written and read 
from kafka and I found the issue in my case, it was related to the serializer 
not writing actual "null" values onto the topic, but empty byte[] arrays 
instead.
The serializer I used looks like this
```java
public byte[] serialize(final String s, final T data) {
if (data == null) {
return new byte[0];
}
try {
return objectMapper.writeValueAsBytes(data);
} catch (final IOException e) {
throw new SerializationException(e);
}
}
```

And then during restore in 
`org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState`
```java
for (final ConsumerRecord record : 
records.records(topicPartition)) {
if (record.key() != null) {
restoreRecords.add(recordConverter.convert(record));
}
}
```
using 
`org.apache.kafka.streams.state.internals.RecordConverters#RAW_TO_TIMESTAMED_INSTANCE`
 that empty array is turned into an 8 byte timestamp

```java
it actually checks the value for `null` but not an empty array
final byte[] recordValue = rawValue == null ? null :
ByteBuffer.allocate(8 + rawValue.length)
.putLong(timestamp)
.put(rawValue)
.array();
```

that is then passed to 
`org.apache.kafka.streams.state.internals.RocksDBStore.SingleColumnFamilyAccessor#addToBatch`
 and gets "put" instead of "delete" because it's not null
```java
public void addToBatch(final byte[] key,
   final byte[] value,
   final WriteBatch batch) throws RocksDBException {
if (value == null) {
batch.delete(columnFamily, key);
} else {
batch.put(columnFamily, key, value);
}
}
```

That's why `store.get("foo")` gives "null" because it actually finds the empty 
"byte[]" record and my Deserializer turns it into null.
So Colt McNealy is right, this is exactly the issue from KAFKA-7663, I just did 
not see it until I found that the values on the topic which akhq shows as 
"null" in the visualization and are treated like null everywhere in my 
applicaton are not actually real tombstones to RocksDB and the restore process.

So this thread can be closed, since there is nothing in the way kafka streams 
behaves here, just another implication of KAFKA-7663 that might confuse users.
But also has the chance to lead them onto very interesting deep dives into 
kafka streams ;-)

Thanks for all the responses!
Kind regards,
Patrick


From: Colt McNealy 
Sent: Saturday, December 10, 2022 00:33
To: dev@kafka.apache.org 
Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores 
previously deleted records

Does the KeySerde that you provided to your store encrypt the keys? I've
never done so myself, but I've seen others report similar behavior (the
store iterator shows the correct values but store.get('foo') returns null)
in the Confluent Community slack. Here's a relevant message:

> "From the behaviour in your code snippet above, I would say that the key
is stored in encrypted form. The deserializer decrypts it correctly (thus
you see the key and value while iterating). But when you requests the key
individually you are passing it in plain text (the serializer might not be
encrypting) and it’s not found in the keystore."

I can't help too much beyond that; but you may want to look into that issue.

Colt McNealy
*Founder, LittleHorse.io*


On Thu, Dec 8, 2022 at 11:51 PM Patrick D’Addona
 wrote:

> > In your case you also delete if the value is not null and if the value
> not-equals "deleteme", right? Ie, you use non-tombstone records as deletes
> what is just not allowed/supported.
>
> The "deleteme" String was only for testing, the issue also happens without
> it, i.e. if there is a "real" tombstone with `value == null` on the input
> topic.
> I do use the input topic as a changelog for my global table. tombstones
> are sent directly to that topic from a kafka streams operation before the
> actual store.
>
> > I cannot explain why all() and get(key) actually give you different
> result with respect to `key`. If a key is resurrected during a restore,
> both method should return it. Not sure why `get(key)` returns `null`
> even if `all()` contains the key... I would rather expect that both
> return the resurrected key.
>
> That's why I think this is different from KAFKA-7663.
> The **foo.bar.globaltopic** topic currently looks like this
> |timestamp|key|value|
> |2022-08-10T14:23:51.768|foo|foo|
> |2022-08-10T14:23:51.836|foo|foo|
> |2022-08-10T14:23:52.126|bar|bar|
> |2022-08-10T14:23:52.398|foo|foo|
> |2022-08-10T14:23:53.353|b

Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-09 Thread Colt McNealy
Does the KeySerde that you provided to your store encrypt the keys? I've
never done so myself, but I've seen others report similar behavior (the
store iterator shows the correct values but store.get('foo') returns null)
in the Confluent Community slack. Here's a relevant message:

> "From the behaviour in your code snippet above, I would say that the key
is stored in encrypted form. The deserializer decrypts it correctly (thus
you see the key and value while iterating). But when you requests the key
individually you are passing it in plain text (the serializer might not be
encrypting) and it’s not found in the keystore."

I can't help too much beyond that; but you may want to look into that issue.

Colt McNealy
*Founder, LittleHorse.io*


On Thu, Dec 8, 2022 at 11:51 PM Patrick D’Addona
 wrote:

> > In your case you also delete if the value is not null and if the value
> not-equals "deleteme", right? Ie, you use non-tombstone records as deletes
> what is just not allowed/supported.
>
> The "deleteme" String was only for testing, the issue also happens without
> it, i.e. if there is a "real" tombstone with `value == null` on the input
> topic.
> I do use the input topic as a changelog for my global table. tombstones
> are sent directly to that topic from a kafka streams operation before the
> actual store.
>
> > I cannot explain why all() and get(key) actually give you different
> result with respect to `key`. If a key is resurrected during a restore,
> both method should return it. Not sure why `get(key)` returns `null`
> even if `all()` contains the key... I would rather expect that both
> return the resurrected key.
>
> That's why I think this is different from KAFKA-7663.
> The **foo.bar.globaltopic** topic currently looks like this
> |timestamp|key|value|
> |2022-08-10T14:23:51.768|foo|foo|
> |2022-08-10T14:23:51.836|foo|foo|
> |2022-08-10T14:23:52.126|bar|bar|
> |2022-08-10T14:23:52.398|foo|foo|
> |2022-08-10T14:23:53.353|bar|bar|
> |2022-08-10T14:23:53.098|foo||
> |2022-08-10T14:23:54.367|bar|bar|
>
> After I delete the kafka-streams.state.dir and restart the application, I
> get
> store.get("foo") -> null
> store.get("bar") -> "bar"
> store.all() -> "foo" and "bar"
>
> Hope that explains it better.
>
> - Patrick
>
>
>
>
> Patrick D’Addona
> Senior Lead IT Architect
>
>
> Mobile: +49 151 544 22 161
> patrick.dadd...@maibornwolff.de
> Theresienhöhe 13, 80339 München
>
> MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
> www.maibornwolff.de, Phone +49 89 544 253 000
> USt-ID DE 129 299 525, Munich District Court HRB 98058
> Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
> Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.
> ____________
>
> 
> From: Matthias J. Sax 
> Sent: Friday, December 9, 2022 01:11
> To: dev@kafka.apache.org 
> Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally
> restores previously deleted records
>
> > The way I see it, KAFKA-7663 says, "a global store will be exactly the
> input topic after restore, regardless of the processor"
>
> Not sure what you mean by this? The issue the tickets describe is, that
> if you don't do a plain `put(key,value)` in your processor, stuff breaks
> right now. (Note that `delete(key)` and `put(key,null)` is the same).
>
>
> It's a known issue, bad API, and also bad documentation on our side, and
> I guess you can call it a bug if you wish. However, you can only use
> tombstones as deletes right now. Thus, what you do "wrong" is
>
> > if (record.value() == null == record.value().equals("deleteme")) {
> > store.delete(record.key());
> > }
>
> In your case you also delete if the value is not null and if the value
> not-equals "deleteme", right? Ie, you use non-tombstone records as
> deletes what is just not allowed/supported.
>
> The issue is that during restore only `null` values, ie, actual
> tombstones are handled as deletes and thus, if you delete a key using a
> non-tombstone record in your processor, this key can be resurrected
> during restore.
>
>
> I cannot explain why all() and get(key) actually give you different
> result with respect to `key`. If a key is resurrected during a restore,
> both method should return it. Not sure why `get(key)` returns `null`
> even if `all()` contains the key... I would rather expect that both
> return the resurrected key.
>
> Hope this helps.
>
>
> -Matthias
>
>
> On 12/8/22 12:00 PM,

Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-08 Thread Patrick D’Addona
> In your case you also delete if the value is not null and if the value 
> not-equals "deleteme", right? Ie, you use non-tombstone records as deletes 
> what is just not allowed/supported.

The "deleteme" String was only for testing, the issue also happens without it, 
i.e. if there is a "real" tombstone with `value == null` on the input topic.
I do use the input topic as a changelog for my global table. tombstones are 
sent directly to that topic from a kafka streams operation before the actual 
store.

> I cannot explain why all() and get(key) actually give you different
result with respect to `key`. If a key is resurrected during a restore,
both method should return it. Not sure why `get(key)` returns `null`
even if `all()` contains the key... I would rather expect that both
return the resurrected key.

That's why I think this is different from KAFKA-7663.
The **foo.bar.globaltopic** topic currently looks like this
|timestamp|key|value|
|2022-08-10T14:23:51.768|foo|foo|
|2022-08-10T14:23:51.836|foo|foo|
|2022-08-10T14:23:52.126|bar|bar|
|2022-08-10T14:23:52.398|foo|foo|
|2022-08-10T14:23:53.353|bar|bar|
|2022-08-10T14:23:53.098|foo||
|2022-08-10T14:23:54.367|bar|bar|

After I delete the kafka-streams.state.dir and restart the application, I get
store.get("foo") -> null
store.get("bar") -> "bar"
store.all() -> "foo" and "bar"

Hope that explains it better.

- Patrick




Patrick D’Addona
Senior Lead IT Architect


Mobile: +49 151 544 22 161
patrick.dadd...@maibornwolff.de
Theresienhöhe 13, 80339 München

MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
www.maibornwolff.de, Phone +49 89 544 253 000
USt-ID DE 129 299 525, Munich District Court HRB 98058
Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.



From: Matthias J. Sax 
Sent: Friday, December 9, 2022 01:11
To: dev@kafka.apache.org 
Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores 
previously deleted records

> The way I see it, KAFKA-7663 says, "a global store will be exactly the input 
> topic after restore, regardless of the processor"

Not sure what you mean by this? The issue the tickets describe is, that
if you don't do a plain `put(key,value)` in your processor, stuff breaks
right now. (Note that `delete(key)` and `put(key,null)` is the same).


It's a known issue, bad API, and also bad documentation on our side, and
I guess you can call it a bug if you wish. However, you can only use
tombstones as deletes right now. Thus, what you do "wrong" is

> if (record.value() == null == record.value().equals("deleteme")) {
> store.delete(record.key());
> }

In your case you also delete if the value is not null and if the value
not-equals "deleteme", right? Ie, you use non-tombstone records as
deletes what is just not allowed/supported.

The issue is that during restore only `null` values, ie, actual
tombstones are handled as deletes and thus, if you delete a key using a
non-tombstone record in your processor, this key can be resurrected
during restore.


I cannot explain why all() and get(key) actually give you different
result with respect to `key`. If a key is resurrected during a restore,
both method should return it. Not sure why `get(key)` returns `null`
even if `all()` contains the key... I would rather expect that both
return the resurrected key.

Hope this helps.


-Matthias


On 12/8/22 12:00 PM, Patrick D’Addona wrote:
> Hi,
>
> I don't think this issue is exactly the same as KAFKA-7663.
>
> The way I see it, KAFKA-7663 says, "a global store will be exactly the input 
> topic after restore, regardless of the processor"
> My issue here, is that the global store after restore is inconsistent with 
> the input topic and the store itself.
> Because it finds records with key "foo" using **store.all()** that it can not 
> find via **store.get("foo")**.
> The **store.get()** is consistent with my input topic, where the tombstone is 
> the latest entry for the key "foo", reflecting the **delete("foo")** 
> operation on the store.
> But still, looping over the store returns a record with "foo" as a key and a 
> non null value.
>
> If the store acts like a Map, where you can call **get(k)** and **put(k, 
> v)**, then looping over it should only find entries, that actually exist and 
> have a value when using **get(k)**.
> Restoring something that breaks this connection seems wrong, even if that 
> restoring ignores the processor and directly writes to the store.
> It should remove keys, for which the last entry is a tombstone from the 
&g

Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-08 Thread Matthias J. Sax

The way I see it, KAFKA-7663 says, "a global store will be exactly the input topic 
after restore, regardless of the processor"


Not sure what you mean by this? The issue the tickets describe is, that 
if you don't do a plain `put(key,value)` in your processor, stuff breaks 
right now. (Note that `delete(key)` and `put(key,null)` is the same).



It's a known issue, bad API, and also bad documentation on our side, and 
I guess you can call it a bug if you wish. However, you can only use 
tombstones as deletes right now. Thus, what you do "wrong" is



if (record.value() == null == record.value().equals("deleteme")) {
store.delete(record.key());
} 


In your case you also delete if the value is not null and if the value 
not-equals "deleteme", right? Ie, you use non-tombstone records as 
deletes what is just not allowed/supported.


The issue is that during restore only `null` values, ie, actual 
tombstones are handled as deletes and thus, if you delete a key using a 
non-tombstone record in your processor, this key can be resurrected 
during restore.



I cannot explain why all() and get(key) actually give you different 
result with respect to `key`. If a key is resurrected during a restore, 
both method should return it. Not sure why `get(key)` returns `null` 
even if `all()` contains the key... I would rather expect that both 
return the resurrected key.


Hope this helps.


-Matthias


On 12/8/22 12:00 PM, Patrick D’Addona wrote:

Hi,

I don't think this issue is exactly the same as KAFKA-7663.

The way I see it, KAFKA-7663 says, "a global store will be exactly the input topic 
after restore, regardless of the processor"
My issue here, is that the global store after restore is inconsistent with the 
input topic and the store itself.
Because it finds records with key "foo" using **store.all()** that it can not find via 
**store.get("foo")**.
The **store.get()** is consistent with my input topic, where the tombstone is the latest entry for 
the key "foo", reflecting the **delete("foo")** operation on the store.
But still, looping over the store returns a record with "foo" as a key and a 
non null value.

If the store acts like a Map, where you can call **get(k)** and **put(k, v)**, 
then looping over it should only find entries, that actually exist and have a 
value when using **get(k)**.
Restoring something that breaks this connection seems wrong, even if that 
restoring ignores the processor and directly writes to the store.
It should remove keys, for which the last entry is a tombstone from the 
**all()** iterator, regardless whether the restore process uses a custom 
processor as KAFKA-7663 wants, or simply reads the topic as it currently does.

Kind Regards,
Patrick


From: Colt McNealy 
Sent: Thursday, December 8, 2022 17:54
To: patrick.dadd...@maibornwolff.de.invalid 

Cc: dev@kafka.apache.org 
Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores 
previously deleted records

Hi Patrick,

Your issue is in fact identical to KAFKA-7663. As per that
issue/bug/discussion, if your processor does anything other than simply
pass-through records, the results of initial processing vs restoration are
different.

Global State Stores don't have a changelog topic (for example, in the
processor API, Global State Stores are only valid if the builder has
.withLoggingDisabled()). That's because the processor for the global store
runs on each of your N streams instances, and if the processor on each
instance published to the changelog, then each put/delete would be written
N times, which is wasteful.

The implications to this are that your input topic should be "like" a
changelog:
- Your input topic should NOT have limited retention otherwise you'll lose
old data.
- Your input topic should ideally be compacted if possible

I agree that the API as it stands is highly confusing—why allow users to
provide a processor if it offers a way to "shoot oneself in one's foot?"

Changing that API would probably require a KIP. I don't quite have the
bandwidth to propose + implement such a KIP right now, but if you would
like to, feel free! (perhaps in the spring I may have time)

Your workaround (the init() method) is a good one. Another way to do it
might be to simply have a regular processing step which converts the input
topic into the true "changelog" format before you push it to a global store.

Cheers,
Colt McNealy
*Founder, LittleHorse.io*


On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona
 wrote:


Hello,

I have a quarkus application using
**org.apache.kafka:kafka-streams:3.1.0** and found that
* when creating a global table using a compacted topic as input
* entries that have been deleted at some point
* are then no longer returned when iterating over the store with
**store.all()**

Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-08 Thread Patrick D’Addona
Hi,

I don't think this issue is exactly the same as KAFKA-7663.

The way I see it, KAFKA-7663 says, "a global store will be exactly the input 
topic after restore, regardless of the processor"
My issue here, is that the global store after restore is inconsistent with the 
input topic and the store itself.
Because it finds records with key "foo" using **store.all()** that it can not 
find via **store.get("foo")**.
The **store.get()** is consistent with my input topic, where the tombstone is 
the latest entry for the key "foo", reflecting the **delete("foo")** operation 
on the store.
But still, looping over the store returns a record with "foo" as a key and a 
non null value.

If the store acts like a Map, where you can call **get(k)** and **put(k, v)**, 
then looping over it should only find entries, that actually exist and have a 
value when using **get(k)**.
Restoring something that breaks this connection seems wrong, even if that 
restoring ignores the processor and directly writes to the store.
It should remove keys, for which the last entry is a tombstone from the 
**all()** iterator, regardless whether the restore process uses a custom 
processor as KAFKA-7663 wants, or simply reads the topic as it currently does.

Kind Regards,
Patrick


From: Colt McNealy 
Sent: Thursday, December 8, 2022 17:54
To: patrick.dadd...@maibornwolff.de.invalid 

Cc: dev@kafka.apache.org 
Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores 
previously deleted records

Hi Patrick,

Your issue is in fact identical to KAFKA-7663. As per that
issue/bug/discussion, if your processor does anything other than simply
pass-through records, the results of initial processing vs restoration are
different.

Global State Stores don't have a changelog topic (for example, in the
processor API, Global State Stores are only valid if the builder has
.withLoggingDisabled()). That's because the processor for the global store
runs on each of your N streams instances, and if the processor on each
instance published to the changelog, then each put/delete would be written
N times, which is wasteful.

The implications to this are that your input topic should be "like" a
changelog:
- Your input topic should NOT have limited retention otherwise you'll lose
old data.
- Your input topic should ideally be compacted if possible

I agree that the API as it stands is highly confusing—why allow users to
provide a processor if it offers a way to "shoot oneself in one's foot?"

Changing that API would probably require a KIP. I don't quite have the
bandwidth to propose + implement such a KIP right now, but if you would
like to, feel free! (perhaps in the spring I may have time)

Your workaround (the init() method) is a good one. Another way to do it
might be to simply have a regular processing step which converts the input
topic into the true "changelog" format before you push it to a global store.

Cheers,
Colt McNealy
*Founder, LittleHorse.io*


On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona
 wrote:

> Hello,
>
> I have a quarkus application using
> **org.apache.kafka:kafka-streams:3.1.0** and found that
> * when creating a global table using a compacted topic as input
> * entries that have been deleted at some point
> * are then no longer returned when iterating over the store with
> **store.all()** - as expected
> * but after the pod restarts and its kafka streams state directory is
> deleted, after restoring from the topic using
> **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState**
> * those formerly deleted records are once again returned by that store
> when using **store.all()** - not expected
> * however they return null, using **store.get("foo")** - as expected
>
> This is somewhat similar to
> https://issues.apache.org/jira/browse/KAFKA-7663, in that I would like to
> be able to modify this restore behaviour.
> However it is also different, because I think it is not documented
> anywhere and it is unintuitive (to me) - since it changes how the
> application behaves after restarting it even if the kafka cluster itself
> was not changed - so I think it's more of a bug than missing documentation.
>
> Some more information, the topic is configured like this
> ```java
> cleanup.policy: compact
> compression.type: producer
> delete.retention.ms: 8640
> max.compaction.lag.ms: 9223372036854776000
> min.compaction.lag.ms: 0
> retention.bytes: -1
> retention.ms: 8640
> ```
>
> I am adding the global store like so
> ```java
> streamsBuilder.addGlobalStore(
> Stores.timestampedKeyValueStoreBuilder(
> Stores.persistentTimestampedKeyValueStore("foobar"),
> Serdes.String(),
>

Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-08 Thread Colt McNealy
Hi Patrick,

Your issue is in fact identical to KAFKA-7663. As per that
issue/bug/discussion, if your processor does anything other than simply
pass-through records, the results of initial processing vs restoration are
different.

Global State Stores don't have a changelog topic (for example, in the
processor API, Global State Stores are only valid if the builder has
.withLoggingDisabled()). That's because the processor for the global store
runs on each of your N streams instances, and if the processor on each
instance published to the changelog, then each put/delete would be written
N times, which is wasteful.

The implications to this are that your input topic should be "like" a
changelog:
- Your input topic should NOT have limited retention otherwise you'll lose
old data.
- Your input topic should ideally be compacted if possible

I agree that the API as it stands is highly confusing—why allow users to
provide a processor if it offers a way to "shoot oneself in one's foot?"

Changing that API would probably require a KIP. I don't quite have the
bandwidth to propose + implement such a KIP right now, but if you would
like to, feel free! (perhaps in the spring I may have time)

Your workaround (the init() method) is a good one. Another way to do it
might be to simply have a regular processing step which converts the input
topic into the true "changelog" format before you push it to a global store.

Cheers,
Colt McNealy
*Founder, LittleHorse.io*


On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona
 wrote:

> Hello,
>
> I have a quarkus application using
> **org.apache.kafka:kafka-streams:3.1.0** and found that
> * when creating a global table using a compacted topic as input
> * entries that have been deleted at some point
> * are then no longer returned when iterating over the store with
> **store.all()** - as expected
> * but after the pod restarts and its kafka streams state directory is
> deleted, after restoring from the topic using
> **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState**
> * those formerly deleted records are once again returned by that store
> when using **store.all()** - not expected
> * however they return null, using **store.get("foo")** - as expected
>
> This is somewhat similar to
> https://issues.apache.org/jira/browse/KAFKA-7663, in that I would like to
> be able to modify this restore behaviour.
> However it is also different, because I think it is not documented
> anywhere and it is unintuitive (to me) - since it changes how the
> application behaves after restarting it even if the kafka cluster itself
> was not changed - so I think it's more of a bug than missing documentation.
>
> Some more information, the topic is configured like this
> ```java
> cleanup.policy: compact
> compression.type: producer
> delete.retention.ms: 8640
> max.compaction.lag.ms: 9223372036854776000
> min.compaction.lag.ms: 0
> retention.bytes: -1
> retention.ms: 8640
> ```
>
> I am adding the global store like so
> ```java
> streamsBuilder.addGlobalStore(
> Stores.timestampedKeyValueStoreBuilder(
> Stores.persistentTimestampedKeyValueStore("foobar"),
> Serdes.String(),
> Serdes.String()),
> "foo.bar.globaltopic",
> Consumed.with(Serdes.String(), Serdes.String()),
> () -> new FooBarUpdateHandler(timeService)
> );
> ```
>
> and here is the definition of 'FooBarUpdateHandler'
> ```java
> import java.time.Instant;
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.kafka.streams.processor.api.Processor;
> import org.apache.kafka.streams.processor.api.Record;
> import org.apache.kafka.streams.state.KeyValueIterator;
> import org.apache.kafka.streams.state.TimestampedKeyValueStore;
> import org.apache.kafka.streams.state.ValueAndTimestamp;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> /**
>  * Internal class handling partFamily updates.
>  */
> public class FooBarUpdateHandler implements Processor Void, Void> {
>
> private static final Logger logger =
> LoggerFactory.getLogger(FooBarUpdateHandler.class);
> private TimestampedKeyValueStore store;
>
> @Override
> public void init(final
> org.apache.kafka.streams.processor.api.ProcessorContext
> context) {
> store = context.getStateStore("foobar");
> }
>
> @Override
> public void process(final Record record) {
>
> // handle tombstones from input topic
> if (record.value() == null == record.value().equals("deleteme")) {
> store.delete(record.key());
> } else {
> store.put(
> record.key(),
> ValueAndTimestamp.make(
> record.key(),
> Instant.now().toEpochMilli()
> )
> );
> }
>
> // this is not relevant
> // it's only to show the issue when restarting and restoring the
>  

[Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records

2022-12-08 Thread Patrick D’Addona
Hello,

I have a quarkus application using **org.apache.kafka:kafka-streams:3.1.0** and 
found that
* when creating a global table using a compacted topic as input
* entries that have been deleted at some point
* are then no longer returned when iterating over the store with 
**store.all()** - as expected
* but after the pod restarts and its kafka streams state directory is deleted, 
after restoring from the topic using 
**org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState**
* those formerly deleted records are once again returned by that store when 
using **store.all()** - not expected
* however they return null, using **store.get("foo")** - as expected

This is somewhat similar to https://issues.apache.org/jira/browse/KAFKA-7663, 
in that I would like to be able to modify this restore behaviour.
However it is also different, because I think it is not documented anywhere and 
it is unintuitive (to me) - since it changes how the application behaves after 
restarting it even if the kafka cluster itself was not changed - so I think 
it's more of a bug than missing documentation.

Some more information, the topic is configured like this
```java
cleanup.policy: compact
compression.type: producer
delete.retention.ms: 8640
max.compaction.lag.ms: 9223372036854776000
min.compaction.lag.ms: 0
retention.bytes: -1
retention.ms: 8640
```

I am adding the global store like so
```java
streamsBuilder.addGlobalStore(
Stores.timestampedKeyValueStoreBuilder(
Stores.persistentTimestampedKeyValueStore("foobar"),
Serdes.String(),
Serdes.String()),
"foo.bar.globaltopic",
Consumed.with(Serdes.String(), Serdes.String()),
() -> new FooBarUpdateHandler(timeService)
);
```

and here is the definition of 'FooBarUpdateHandler'
```java
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Internal class handling partFamily updates.
 */
public class FooBarUpdateHandler implements Processor {

private static final Logger logger = 
LoggerFactory.getLogger(FooBarUpdateHandler.class);
private TimestampedKeyValueStore store;

@Override
public void init(final 
org.apache.kafka.streams.processor.api.ProcessorContext context) {
store = context.getStateStore("foobar");
}

@Override
public void process(final Record record) {

// handle tombstones from input topic
if (record.value() == null == record.value().equals("deleteme")) {
store.delete(record.key());
} else {
store.put(
record.key(),
ValueAndTimestamp.make(
record.key(),
Instant.now().toEpochMilli()
)
);
}

// this is not relevant
// it's only to show the issue when restarting and restoring the
final List existingKeys = new ArrayList<>();
try (final KeyValueIterator> all = 
store.all()) {
all.forEachRemaining((r) -> {
existingKeys.add(r.key);
});
}
logger.info("Got {} records in the store, with keys {}", 
existingKeys.size(), String.join(",", existingKeys));
}
}
```

My workaround is to add this to the 'init' method of the 'FooBarUpdateHandler'
```java
try (final KeyValueIterator> all = 
store.all()) {
if (all == null) {
return;
}
logger.info("Removing already deleted records from rocksdb representing the 
global store {}", storeName);
all.forEachRemaining(r -> {
if (r != null && r.key != null && store.get(r.key) == null) {
store.delete(r.key);
}
});
}
```
Now it is again consistent across restarts.

Kind Regards,
Patrick


Patrick D’Addona
Senior Lead IT Architect


Mobile: +49 151 544 22 161
patrick.dadd...@maibornwolff.de
Theresienhöhe 13, 80339 München

MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
www.maibornwolff.de, Phone +49 89 544 253 000
USt-ID DE 129 299 525, Munich District Court HRB 98058
Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.