Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records
Glad to hear you figured it out (and that it's not a KS bug :)). It's a contract in Kafka that serdes must do a `null <-> null` mapping. If that's not well documented / unclear, we should update the docs. Maybe https://kafka.apache.org/33/documentation/streams/developer-guide/datatypes.html#implementing-custom-serdes and/or the JavaDocs of the corresponding interfaces? -Matthias On 12/12/22 9:44 AM, Colt McNealy wrote: Patrick—Glad you got it cleared up, and good find re: empty byte[] vs null. KAFKA-7663 is very interesting. Just yesterday I thought of a use-case where supplying the processor for the Global State Store would be useful—what if you have an in-memory object that computes and caches aggregations of what's in the state store. You can derive that view by querying the state store, but such queries are expensive; it's far better to have them cached in an in-memory POJO. But to keep that POJO up-to-date, you need to be alerted every time an event comes into the state store. I think the best way to implement that (if I were to submit a KIP) would be to: - Deprecate the ability to add a processor - Add an optional "onChange" callback that is called every time a new record is processed. There's lots of details to be ironed out; and furthermore this is a big API change so it would be slow to implement. Colt McNealy *Founder, LittleHorse.io* On Mon, Dec 12, 2022 at 3:30 AM Patrick D’Addona wrote: No it does not encrypt the keys. And it works fine for key like "bar" where the latest record on the topic is not a tombstone. But that got me thinking about how the values are actually written and read from kafka and I found the issue in my case, it was related to the serializer not writing actual "null" values onto the topic, but empty byte[] arrays instead. The serializer I used looks like this ```java public byte[] serialize(final String s, final T data) { if (data == null) { return new byte[0]; } try { return objectMapper.writeValueAsBytes(data); } catch (final IOException e) { throw new SerializationException(e); } } ``` And then during restore in `org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState` ```java for (final ConsumerRecord record : records.records(topicPartition)) { if (record.key() != null) { restoreRecords.add(recordConverter.convert(record)); } } ``` using `org.apache.kafka.streams.state.internals.RecordConverters#RAW_TO_TIMESTAMED_INSTANCE` that empty array is turned into an 8 byte timestamp ```java it actually checks the value for `null` but not an empty array final byte[] recordValue = rawValue == null ? null : ByteBuffer.allocate(8 + rawValue.length) .putLong(timestamp) .put(rawValue) .array(); ``` that is then passed to `org.apache.kafka.streams.state.internals.RocksDBStore.SingleColumnFamilyAccessor#addToBatch` and gets "put" instead of "delete" because it's not null ```java public void addToBatch(final byte[] key, final byte[] value, final WriteBatch batch) throws RocksDBException { if (value == null) { batch.delete(columnFamily, key); } else { batch.put(columnFamily, key, value); } } ``` That's why `store.get("foo")` gives "null" because it actually finds the empty "byte[]" record and my Deserializer turns it into null. So Colt McNealy is right, this is exactly the issue from KAFKA-7663, I just did not see it until I found that the values on the topic which akhq shows as "null" in the visualization and are treated like null everywhere in my applicaton are not actually real tombstones to RocksDB and the restore process. So this thread can be closed, since there is nothing in the way kafka streams behaves here, just another implication of KAFKA-7663 that might confuse users. But also has the chance to lead them onto very interesting deep dives into kafka streams ;-) Thanks for all the responses! Kind regards, Patrick From: Colt McNealy Sent: Saturday, December 10, 2022 00:33 To: dev@kafka.apache.org Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records Does the KeySerde that you provided to your store encrypt the keys? I've never done so myself, but I've seen others report similar behavior (the store iterator shows the correct values but store.get('foo') returns null) in the Confluent Community slack. Here's a relevant message: "From the behaviour in your code snippet above, I would say that the key is stored in encrypted form. The deserializer decrypts it correctly (thus you see the key and value while iterating). But when you requests the key individually you are passing it in plain text (the serializer might not be encrypting) and it’s not
Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records
Patrick—Glad you got it cleared up, and good find re: empty byte[] vs null. KAFKA-7663 is very interesting. Just yesterday I thought of a use-case where supplying the processor for the Global State Store would be useful—what if you have an in-memory object that computes and caches aggregations of what's in the state store. You can derive that view by querying the state store, but such queries are expensive; it's far better to have them cached in an in-memory POJO. But to keep that POJO up-to-date, you need to be alerted every time an event comes into the state store. I think the best way to implement that (if I were to submit a KIP) would be to: - Deprecate the ability to add a processor - Add an optional "onChange" callback that is called every time a new record is processed. There's lots of details to be ironed out; and furthermore this is a big API change so it would be slow to implement. Colt McNealy *Founder, LittleHorse.io* On Mon, Dec 12, 2022 at 3:30 AM Patrick D’Addona wrote: > No it does not encrypt the keys. And it works fine for key like "bar" > where the latest record on the topic is not a tombstone. > > But that got me thinking about how the values are actually written and > read from kafka and I found the issue in my case, it was related to the > serializer not writing actual "null" values onto the topic, but empty > byte[] arrays instead. > The serializer I used looks like this > ```java > public byte[] serialize(final String s, final T data) { > if (data == null) { > return new byte[0]; > } > try { > return objectMapper.writeValueAsBytes(data); > } catch (final IOException e) { > throw new SerializationException(e); > } > } > ``` > > And then during restore in > `org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState` > ```java > for (final ConsumerRecord record : > records.records(topicPartition)) { > if (record.key() != null) { > restoreRecords.add(recordConverter.convert(record)); > } > } > ``` > using > `org.apache.kafka.streams.state.internals.RecordConverters#RAW_TO_TIMESTAMED_INSTANCE` > that empty array is turned into an 8 byte timestamp > > ```java > it actually checks the value for `null` but not an empty array > final byte[] recordValue = rawValue == null ? null : > ByteBuffer.allocate(8 + rawValue.length) > .putLong(timestamp) > .put(rawValue) > .array(); > ``` > > that is then passed to > `org.apache.kafka.streams.state.internals.RocksDBStore.SingleColumnFamilyAccessor#addToBatch` > and gets "put" instead of "delete" because it's not null > ```java > public void addToBatch(final byte[] key, >final byte[] value, >final WriteBatch batch) throws RocksDBException { > if (value == null) { > batch.delete(columnFamily, key); > } else { > batch.put(columnFamily, key, value); > } > } > ``` > > That's why `store.get("foo")` gives "null" because it actually finds the > empty "byte[]" record and my Deserializer turns it into null. > So Colt McNealy is right, this is exactly the issue from KAFKA-7663, I > just did not see it until I found that the values on the topic which akhq > shows as "null" in the visualization and are treated like null everywhere > in my applicaton are not actually real tombstones to RocksDB and the > restore process. > > So this thread can be closed, since there is nothing in the way kafka > streams behaves here, just another implication of KAFKA-7663 that might > confuse users. > But also has the chance to lead them onto very interesting deep dives into > kafka streams ;-) > > Thanks for all the responses! > Kind regards, > Patrick > > > From: Colt McNealy > Sent: Saturday, December 10, 2022 00:33 > To: dev@kafka.apache.org > Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally > restores previously deleted records > > Does the KeySerde that you provided to your store encrypt the keys? I've > never done so myself, but I've seen others report similar behavior (the > store iterator shows the correct values but store.get('foo') returns null) > in the Confluent Community slack. Here's a relevant message: > > > "From the behaviour in your code snippet above, I would say that the key > is stored in encrypted form. The deserializer decrypts it correctly (thus > you see the key and value while iterating). But when you requests the key > individually you are passing it in plain text (the serializer might not be > encrypting) and it’s not found
Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records
No it does not encrypt the keys. And it works fine for key like "bar" where the latest record on the topic is not a tombstone. But that got me thinking about how the values are actually written and read from kafka and I found the issue in my case, it was related to the serializer not writing actual "null" values onto the topic, but empty byte[] arrays instead. The serializer I used looks like this ```java public byte[] serialize(final String s, final T data) { if (data == null) { return new byte[0]; } try { return objectMapper.writeValueAsBytes(data); } catch (final IOException e) { throw new SerializationException(e); } } ``` And then during restore in `org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState` ```java for (final ConsumerRecord record : records.records(topicPartition)) { if (record.key() != null) { restoreRecords.add(recordConverter.convert(record)); } } ``` using `org.apache.kafka.streams.state.internals.RecordConverters#RAW_TO_TIMESTAMED_INSTANCE` that empty array is turned into an 8 byte timestamp ```java it actually checks the value for `null` but not an empty array final byte[] recordValue = rawValue == null ? null : ByteBuffer.allocate(8 + rawValue.length) .putLong(timestamp) .put(rawValue) .array(); ``` that is then passed to `org.apache.kafka.streams.state.internals.RocksDBStore.SingleColumnFamilyAccessor#addToBatch` and gets "put" instead of "delete" because it's not null ```java public void addToBatch(final byte[] key, final byte[] value, final WriteBatch batch) throws RocksDBException { if (value == null) { batch.delete(columnFamily, key); } else { batch.put(columnFamily, key, value); } } ``` That's why `store.get("foo")` gives "null" because it actually finds the empty "byte[]" record and my Deserializer turns it into null. So Colt McNealy is right, this is exactly the issue from KAFKA-7663, I just did not see it until I found that the values on the topic which akhq shows as "null" in the visualization and are treated like null everywhere in my applicaton are not actually real tombstones to RocksDB and the restore process. So this thread can be closed, since there is nothing in the way kafka streams behaves here, just another implication of KAFKA-7663 that might confuse users. But also has the chance to lead them onto very interesting deep dives into kafka streams ;-) Thanks for all the responses! Kind regards, Patrick From: Colt McNealy Sent: Saturday, December 10, 2022 00:33 To: dev@kafka.apache.org Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records Does the KeySerde that you provided to your store encrypt the keys? I've never done so myself, but I've seen others report similar behavior (the store iterator shows the correct values but store.get('foo') returns null) in the Confluent Community slack. Here's a relevant message: > "From the behaviour in your code snippet above, I would say that the key is stored in encrypted form. The deserializer decrypts it correctly (thus you see the key and value while iterating). But when you requests the key individually you are passing it in plain text (the serializer might not be encrypting) and it’s not found in the keystore." I can't help too much beyond that; but you may want to look into that issue. Colt McNealy *Founder, LittleHorse.io* On Thu, Dec 8, 2022 at 11:51 PM Patrick D’Addona wrote: > > In your case you also delete if the value is not null and if the value > not-equals "deleteme", right? Ie, you use non-tombstone records as deletes > what is just not allowed/supported. > > The "deleteme" String was only for testing, the issue also happens without > it, i.e. if there is a "real" tombstone with `value == null` on the input > topic. > I do use the input topic as a changelog for my global table. tombstones > are sent directly to that topic from a kafka streams operation before the > actual store. > > > I cannot explain why all() and get(key) actually give you different > result with respect to `key`. If a key is resurrected during a restore, > both method should return it. Not sure why `get(key)` returns `null` > even if `all()` contains the key... I would rather expect that both > return the resurrected key. > > That's why I think this is different from KAFKA-7663. > The **foo.bar.globaltopic** topic currently looks like this > |timestamp|key|value| > |2022-08-10T14:23:51.768|foo|foo| > |2022-08-10T14:23:51.836|foo|foo| > |2022-08-10T14:23:52.126|bar|bar| > |2022-08-10T14:23:52.398|foo|foo| > |2022-08-10T14:23:53.353|b
Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records
Does the KeySerde that you provided to your store encrypt the keys? I've never done so myself, but I've seen others report similar behavior (the store iterator shows the correct values but store.get('foo') returns null) in the Confluent Community slack. Here's a relevant message: > "From the behaviour in your code snippet above, I would say that the key is stored in encrypted form. The deserializer decrypts it correctly (thus you see the key and value while iterating). But when you requests the key individually you are passing it in plain text (the serializer might not be encrypting) and it’s not found in the keystore." I can't help too much beyond that; but you may want to look into that issue. Colt McNealy *Founder, LittleHorse.io* On Thu, Dec 8, 2022 at 11:51 PM Patrick D’Addona wrote: > > In your case you also delete if the value is not null and if the value > not-equals "deleteme", right? Ie, you use non-tombstone records as deletes > what is just not allowed/supported. > > The "deleteme" String was only for testing, the issue also happens without > it, i.e. if there is a "real" tombstone with `value == null` on the input > topic. > I do use the input topic as a changelog for my global table. tombstones > are sent directly to that topic from a kafka streams operation before the > actual store. > > > I cannot explain why all() and get(key) actually give you different > result with respect to `key`. If a key is resurrected during a restore, > both method should return it. Not sure why `get(key)` returns `null` > even if `all()` contains the key... I would rather expect that both > return the resurrected key. > > That's why I think this is different from KAFKA-7663. > The **foo.bar.globaltopic** topic currently looks like this > |timestamp|key|value| > |2022-08-10T14:23:51.768|foo|foo| > |2022-08-10T14:23:51.836|foo|foo| > |2022-08-10T14:23:52.126|bar|bar| > |2022-08-10T14:23:52.398|foo|foo| > |2022-08-10T14:23:53.353|bar|bar| > |2022-08-10T14:23:53.098|foo|| > |2022-08-10T14:23:54.367|bar|bar| > > After I delete the kafka-streams.state.dir and restart the application, I > get > store.get("foo") -> null > store.get("bar") -> "bar" > store.all() -> "foo" and "bar" > > Hope that explains it better. > > - Patrick > > > > > Patrick D’Addona > Senior Lead IT Architect > > > Mobile: +49 151 544 22 161 > patrick.dadd...@maibornwolff.de > Theresienhöhe 13, 80339 München > > MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany > www.maibornwolff.de, Phone +49 89 544 253 000 > USt-ID DE 129 299 525, Munich District Court HRB 98058 > Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann, > Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos. > ____________ > > > From: Matthias J. Sax > Sent: Friday, December 9, 2022 01:11 > To: dev@kafka.apache.org > Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally > restores previously deleted records > > > The way I see it, KAFKA-7663 says, "a global store will be exactly the > input topic after restore, regardless of the processor" > > Not sure what you mean by this? The issue the tickets describe is, that > if you don't do a plain `put(key,value)` in your processor, stuff breaks > right now. (Note that `delete(key)` and `put(key,null)` is the same). > > > It's a known issue, bad API, and also bad documentation on our side, and > I guess you can call it a bug if you wish. However, you can only use > tombstones as deletes right now. Thus, what you do "wrong" is > > > if (record.value() == null == record.value().equals("deleteme")) { > > store.delete(record.key()); > > } > > In your case you also delete if the value is not null and if the value > not-equals "deleteme", right? Ie, you use non-tombstone records as > deletes what is just not allowed/supported. > > The issue is that during restore only `null` values, ie, actual > tombstones are handled as deletes and thus, if you delete a key using a > non-tombstone record in your processor, this key can be resurrected > during restore. > > > I cannot explain why all() and get(key) actually give you different > result with respect to `key`. If a key is resurrected during a restore, > both method should return it. Not sure why `get(key)` returns `null` > even if `all()` contains the key... I would rather expect that both > return the resurrected key. > > Hope this helps. > > > -Matthias > > > On 12/8/22 12:00 PM,
Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records
> In your case you also delete if the value is not null and if the value > not-equals "deleteme", right? Ie, you use non-tombstone records as deletes > what is just not allowed/supported. The "deleteme" String was only for testing, the issue also happens without it, i.e. if there is a "real" tombstone with `value == null` on the input topic. I do use the input topic as a changelog for my global table. tombstones are sent directly to that topic from a kafka streams operation before the actual store. > I cannot explain why all() and get(key) actually give you different result with respect to `key`. If a key is resurrected during a restore, both method should return it. Not sure why `get(key)` returns `null` even if `all()` contains the key... I would rather expect that both return the resurrected key. That's why I think this is different from KAFKA-7663. The **foo.bar.globaltopic** topic currently looks like this |timestamp|key|value| |2022-08-10T14:23:51.768|foo|foo| |2022-08-10T14:23:51.836|foo|foo| |2022-08-10T14:23:52.126|bar|bar| |2022-08-10T14:23:52.398|foo|foo| |2022-08-10T14:23:53.353|bar|bar| |2022-08-10T14:23:53.098|foo|| |2022-08-10T14:23:54.367|bar|bar| After I delete the kafka-streams.state.dir and restart the application, I get store.get("foo") -> null store.get("bar") -> "bar" store.all() -> "foo" and "bar" Hope that explains it better. - Patrick Patrick D’Addona Senior Lead IT Architect Mobile: +49 151 544 22 161 patrick.dadd...@maibornwolff.de Theresienhöhe 13, 80339 München MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany www.maibornwolff.de, Phone +49 89 544 253 000 USt-ID DE 129 299 525, Munich District Court HRB 98058 Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann, Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos. From: Matthias J. Sax Sent: Friday, December 9, 2022 01:11 To: dev@kafka.apache.org Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records > The way I see it, KAFKA-7663 says, "a global store will be exactly the input > topic after restore, regardless of the processor" Not sure what you mean by this? The issue the tickets describe is, that if you don't do a plain `put(key,value)` in your processor, stuff breaks right now. (Note that `delete(key)` and `put(key,null)` is the same). It's a known issue, bad API, and also bad documentation on our side, and I guess you can call it a bug if you wish. However, you can only use tombstones as deletes right now. Thus, what you do "wrong" is > if (record.value() == null == record.value().equals("deleteme")) { > store.delete(record.key()); > } In your case you also delete if the value is not null and if the value not-equals "deleteme", right? Ie, you use non-tombstone records as deletes what is just not allowed/supported. The issue is that during restore only `null` values, ie, actual tombstones are handled as deletes and thus, if you delete a key using a non-tombstone record in your processor, this key can be resurrected during restore. I cannot explain why all() and get(key) actually give you different result with respect to `key`. If a key is resurrected during a restore, both method should return it. Not sure why `get(key)` returns `null` even if `all()` contains the key... I would rather expect that both return the resurrected key. Hope this helps. -Matthias On 12/8/22 12:00 PM, Patrick D’Addona wrote: > Hi, > > I don't think this issue is exactly the same as KAFKA-7663. > > The way I see it, KAFKA-7663 says, "a global store will be exactly the input > topic after restore, regardless of the processor" > My issue here, is that the global store after restore is inconsistent with > the input topic and the store itself. > Because it finds records with key "foo" using **store.all()** that it can not > find via **store.get("foo")**. > The **store.get()** is consistent with my input topic, where the tombstone is > the latest entry for the key "foo", reflecting the **delete("foo")** > operation on the store. > But still, looping over the store returns a record with "foo" as a key and a > non null value. > > If the store acts like a Map, where you can call **get(k)** and **put(k, > v)**, then looping over it should only find entries, that actually exist and > have a value when using **get(k)**. > Restoring something that breaks this connection seems wrong, even if that > restoring ignores the processor and directly writes to the store. > It should remove keys, for which the last entry is a tombstone from the &g
Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records
The way I see it, KAFKA-7663 says, "a global store will be exactly the input topic after restore, regardless of the processor" Not sure what you mean by this? The issue the tickets describe is, that if you don't do a plain `put(key,value)` in your processor, stuff breaks right now. (Note that `delete(key)` and `put(key,null)` is the same). It's a known issue, bad API, and also bad documentation on our side, and I guess you can call it a bug if you wish. However, you can only use tombstones as deletes right now. Thus, what you do "wrong" is if (record.value() == null == record.value().equals("deleteme")) { store.delete(record.key()); } In your case you also delete if the value is not null and if the value not-equals "deleteme", right? Ie, you use non-tombstone records as deletes what is just not allowed/supported. The issue is that during restore only `null` values, ie, actual tombstones are handled as deletes and thus, if you delete a key using a non-tombstone record in your processor, this key can be resurrected during restore. I cannot explain why all() and get(key) actually give you different result with respect to `key`. If a key is resurrected during a restore, both method should return it. Not sure why `get(key)` returns `null` even if `all()` contains the key... I would rather expect that both return the resurrected key. Hope this helps. -Matthias On 12/8/22 12:00 PM, Patrick D’Addona wrote: Hi, I don't think this issue is exactly the same as KAFKA-7663. The way I see it, KAFKA-7663 says, "a global store will be exactly the input topic after restore, regardless of the processor" My issue here, is that the global store after restore is inconsistent with the input topic and the store itself. Because it finds records with key "foo" using **store.all()** that it can not find via **store.get("foo")**. The **store.get()** is consistent with my input topic, where the tombstone is the latest entry for the key "foo", reflecting the **delete("foo")** operation on the store. But still, looping over the store returns a record with "foo" as a key and a non null value. If the store acts like a Map, where you can call **get(k)** and **put(k, v)**, then looping over it should only find entries, that actually exist and have a value when using **get(k)**. Restoring something that breaks this connection seems wrong, even if that restoring ignores the processor and directly writes to the store. It should remove keys, for which the last entry is a tombstone from the **all()** iterator, regardless whether the restore process uses a custom processor as KAFKA-7663 wants, or simply reads the topic as it currently does. Kind Regards, Patrick From: Colt McNealy Sent: Thursday, December 8, 2022 17:54 To: patrick.dadd...@maibornwolff.de.invalid Cc: dev@kafka.apache.org Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records Hi Patrick, Your issue is in fact identical to KAFKA-7663. As per that issue/bug/discussion, if your processor does anything other than simply pass-through records, the results of initial processing vs restoration are different. Global State Stores don't have a changelog topic (for example, in the processor API, Global State Stores are only valid if the builder has .withLoggingDisabled()). That's because the processor for the global store runs on each of your N streams instances, and if the processor on each instance published to the changelog, then each put/delete would be written N times, which is wasteful. The implications to this are that your input topic should be "like" a changelog: - Your input topic should NOT have limited retention otherwise you'll lose old data. - Your input topic should ideally be compacted if possible I agree that the API as it stands is highly confusing—why allow users to provide a processor if it offers a way to "shoot oneself in one's foot?" Changing that API would probably require a KIP. I don't quite have the bandwidth to propose + implement such a KIP right now, but if you would like to, feel free! (perhaps in the spring I may have time) Your workaround (the init() method) is a good one. Another way to do it might be to simply have a regular processing step which converts the input topic into the true "changelog" format before you push it to a global store. Cheers, Colt McNealy *Founder, LittleHorse.io* On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona wrote: Hello, I have a quarkus application using **org.apache.kafka:kafka-streams:3.1.0** and found that * when creating a global table using a compacted topic as input * entries that have been deleted at some point * are then no longer returned when iterating over the store with **store.all()**
Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records
Hi, I don't think this issue is exactly the same as KAFKA-7663. The way I see it, KAFKA-7663 says, "a global store will be exactly the input topic after restore, regardless of the processor" My issue here, is that the global store after restore is inconsistent with the input topic and the store itself. Because it finds records with key "foo" using **store.all()** that it can not find via **store.get("foo")**. The **store.get()** is consistent with my input topic, where the tombstone is the latest entry for the key "foo", reflecting the **delete("foo")** operation on the store. But still, looping over the store returns a record with "foo" as a key and a non null value. If the store acts like a Map, where you can call **get(k)** and **put(k, v)**, then looping over it should only find entries, that actually exist and have a value when using **get(k)**. Restoring something that breaks this connection seems wrong, even if that restoring ignores the processor and directly writes to the store. It should remove keys, for which the last entry is a tombstone from the **all()** iterator, regardless whether the restore process uses a custom processor as KAFKA-7663 wants, or simply reads the topic as it currently does. Kind Regards, Patrick From: Colt McNealy Sent: Thursday, December 8, 2022 17:54 To: patrick.dadd...@maibornwolff.de.invalid Cc: dev@kafka.apache.org Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records Hi Patrick, Your issue is in fact identical to KAFKA-7663. As per that issue/bug/discussion, if your processor does anything other than simply pass-through records, the results of initial processing vs restoration are different. Global State Stores don't have a changelog topic (for example, in the processor API, Global State Stores are only valid if the builder has .withLoggingDisabled()). That's because the processor for the global store runs on each of your N streams instances, and if the processor on each instance published to the changelog, then each put/delete would be written N times, which is wasteful. The implications to this are that your input topic should be "like" a changelog: - Your input topic should NOT have limited retention otherwise you'll lose old data. - Your input topic should ideally be compacted if possible I agree that the API as it stands is highly confusing—why allow users to provide a processor if it offers a way to "shoot oneself in one's foot?" Changing that API would probably require a KIP. I don't quite have the bandwidth to propose + implement such a KIP right now, but if you would like to, feel free! (perhaps in the spring I may have time) Your workaround (the init() method) is a good one. Another way to do it might be to simply have a regular processing step which converts the input topic into the true "changelog" format before you push it to a global store. Cheers, Colt McNealy *Founder, LittleHorse.io* On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona wrote: > Hello, > > I have a quarkus application using > **org.apache.kafka:kafka-streams:3.1.0** and found that > * when creating a global table using a compacted topic as input > * entries that have been deleted at some point > * are then no longer returned when iterating over the store with > **store.all()** - as expected > * but after the pod restarts and its kafka streams state directory is > deleted, after restoring from the topic using > **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState** > * those formerly deleted records are once again returned by that store > when using **store.all()** - not expected > * however they return null, using **store.get("foo")** - as expected > > This is somewhat similar to > https://issues.apache.org/jira/browse/KAFKA-7663, in that I would like to > be able to modify this restore behaviour. > However it is also different, because I think it is not documented > anywhere and it is unintuitive (to me) - since it changes how the > application behaves after restarting it even if the kafka cluster itself > was not changed - so I think it's more of a bug than missing documentation. > > Some more information, the topic is configured like this > ```java > cleanup.policy: compact > compression.type: producer > delete.retention.ms: 8640 > max.compaction.lag.ms: 9223372036854776000 > min.compaction.lag.ms: 0 > retention.bytes: -1 > retention.ms: 8640 > ``` > > I am adding the global store like so > ```java > streamsBuilder.addGlobalStore( > Stores.timestampedKeyValueStoreBuilder( > Stores.persistentTimestampedKeyValueStore("foobar"), > Serdes.String(), >
Re: [Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records
Hi Patrick, Your issue is in fact identical to KAFKA-7663. As per that issue/bug/discussion, if your processor does anything other than simply pass-through records, the results of initial processing vs restoration are different. Global State Stores don't have a changelog topic (for example, in the processor API, Global State Stores are only valid if the builder has .withLoggingDisabled()). That's because the processor for the global store runs on each of your N streams instances, and if the processor on each instance published to the changelog, then each put/delete would be written N times, which is wasteful. The implications to this are that your input topic should be "like" a changelog: - Your input topic should NOT have limited retention otherwise you'll lose old data. - Your input topic should ideally be compacted if possible I agree that the API as it stands is highly confusing—why allow users to provide a processor if it offers a way to "shoot oneself in one's foot?" Changing that API would probably require a KIP. I don't quite have the bandwidth to propose + implement such a KIP right now, but if you would like to, feel free! (perhaps in the spring I may have time) Your workaround (the init() method) is a good one. Another way to do it might be to simply have a regular processing step which converts the input topic into the true "changelog" format before you push it to a global store. Cheers, Colt McNealy *Founder, LittleHorse.io* On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona wrote: > Hello, > > I have a quarkus application using > **org.apache.kafka:kafka-streams:3.1.0** and found that > * when creating a global table using a compacted topic as input > * entries that have been deleted at some point > * are then no longer returned when iterating over the store with > **store.all()** - as expected > * but after the pod restarts and its kafka streams state directory is > deleted, after restoring from the topic using > **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState** > * those formerly deleted records are once again returned by that store > when using **store.all()** - not expected > * however they return null, using **store.get("foo")** - as expected > > This is somewhat similar to > https://issues.apache.org/jira/browse/KAFKA-7663, in that I would like to > be able to modify this restore behaviour. > However it is also different, because I think it is not documented > anywhere and it is unintuitive (to me) - since it changes how the > application behaves after restarting it even if the kafka cluster itself > was not changed - so I think it's more of a bug than missing documentation. > > Some more information, the topic is configured like this > ```java > cleanup.policy: compact > compression.type: producer > delete.retention.ms: 8640 > max.compaction.lag.ms: 9223372036854776000 > min.compaction.lag.ms: 0 > retention.bytes: -1 > retention.ms: 8640 > ``` > > I am adding the global store like so > ```java > streamsBuilder.addGlobalStore( > Stores.timestampedKeyValueStoreBuilder( > Stores.persistentTimestampedKeyValueStore("foobar"), > Serdes.String(), > Serdes.String()), > "foo.bar.globaltopic", > Consumed.with(Serdes.String(), Serdes.String()), > () -> new FooBarUpdateHandler(timeService) > ); > ``` > > and here is the definition of 'FooBarUpdateHandler' > ```java > import java.time.Instant; > import java.util.ArrayList; > import java.util.List; > import org.apache.kafka.streams.processor.api.Processor; > import org.apache.kafka.streams.processor.api.Record; > import org.apache.kafka.streams.state.KeyValueIterator; > import org.apache.kafka.streams.state.TimestampedKeyValueStore; > import org.apache.kafka.streams.state.ValueAndTimestamp; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > /** > * Internal class handling partFamily updates. > */ > public class FooBarUpdateHandler implements Processor Void, Void> { > > private static final Logger logger = > LoggerFactory.getLogger(FooBarUpdateHandler.class); > private TimestampedKeyValueStore store; > > @Override > public void init(final > org.apache.kafka.streams.processor.api.ProcessorContext > context) { > store = context.getStateStore("foobar"); > } > > @Override > public void process(final Record record) { > > // handle tombstones from input topic > if (record.value() == null == record.value().equals("deleteme")) { > store.delete(record.key()); > } else { > store.put( > record.key(), > ValueAndTimestamp.make( > record.key(), > Instant.now().toEpochMilli() > ) > ); > } > > // this is not relevant > // it's only to show the issue when restarting and restoring the >
[Streams] GlobalStateManagerImpl#restoreState locally restores previously deleted records
Hello, I have a quarkus application using **org.apache.kafka:kafka-streams:3.1.0** and found that * when creating a global table using a compacted topic as input * entries that have been deleted at some point * are then no longer returned when iterating over the store with **store.all()** - as expected * but after the pod restarts and its kafka streams state directory is deleted, after restoring from the topic using **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState** * those formerly deleted records are once again returned by that store when using **store.all()** - not expected * however they return null, using **store.get("foo")** - as expected This is somewhat similar to https://issues.apache.org/jira/browse/KAFKA-7663, in that I would like to be able to modify this restore behaviour. However it is also different, because I think it is not documented anywhere and it is unintuitive (to me) - since it changes how the application behaves after restarting it even if the kafka cluster itself was not changed - so I think it's more of a bug than missing documentation. Some more information, the topic is configured like this ```java cleanup.policy: compact compression.type: producer delete.retention.ms: 8640 max.compaction.lag.ms: 9223372036854776000 min.compaction.lag.ms: 0 retention.bytes: -1 retention.ms: 8640 ``` I am adding the global store like so ```java streamsBuilder.addGlobalStore( Stores.timestampedKeyValueStoreBuilder( Stores.persistentTimestampedKeyValueStore("foobar"), Serdes.String(), Serdes.String()), "foo.bar.globaltopic", Consumed.with(Serdes.String(), Serdes.String()), () -> new FooBarUpdateHandler(timeService) ); ``` and here is the definition of 'FooBarUpdateHandler' ```java import java.time.Instant; import java.util.ArrayList; import java.util.List; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Internal class handling partFamily updates. */ public class FooBarUpdateHandler implements Processor { private static final Logger logger = LoggerFactory.getLogger(FooBarUpdateHandler.class); private TimestampedKeyValueStore store; @Override public void init(final org.apache.kafka.streams.processor.api.ProcessorContext context) { store = context.getStateStore("foobar"); } @Override public void process(final Record record) { // handle tombstones from input topic if (record.value() == null == record.value().equals("deleteme")) { store.delete(record.key()); } else { store.put( record.key(), ValueAndTimestamp.make( record.key(), Instant.now().toEpochMilli() ) ); } // this is not relevant // it's only to show the issue when restarting and restoring the final List existingKeys = new ArrayList<>(); try (final KeyValueIterator> all = store.all()) { all.forEachRemaining((r) -> { existingKeys.add(r.key); }); } logger.info("Got {} records in the store, with keys {}", existingKeys.size(), String.join(",", existingKeys)); } } ``` My workaround is to add this to the 'init' method of the 'FooBarUpdateHandler' ```java try (final KeyValueIterator> all = store.all()) { if (all == null) { return; } logger.info("Removing already deleted records from rocksdb representing the global store {}", storeName); all.forEachRemaining(r -> { if (r != null && r.key != null && store.get(r.key) == null) { store.delete(r.key); } }); } ``` Now it is again consistent across restarts. Kind Regards, Patrick Patrick D’Addona Senior Lead IT Architect Mobile: +49 151 544 22 161 patrick.dadd...@maibornwolff.de Theresienhöhe 13, 80339 München MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany www.maibornwolff.de, Phone +49 89 544 253 000 USt-ID DE 129 299 525, Munich District Court HRB 98058 Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann, Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.