[
https://issues.apache.org/jira/browse/KAFKA-12475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302200#comment-17302200
]
A. Sophie Blee-Goldman edited comment on KAFKA-12475 at 3/16/21, 3:37 AM:
--------------------------------------------------------------------------
One option to fix this would be to add a deleteAll() method to the StateStore
interface(s), For compatibility, and because not all stores support a deleteAll
operation, this should have a default implementation that invokes the existing
delete(key) method on everything in the store. This will obviously suffer
performance-wise, but additional latency in state store restoration is
preferred over silently producing incorrect results with EOS. Also, we expect
that the vast majority of custom state stores will either be local & able to
just wipe out the state directory as usual, or else are backed by a storage
engine that supports some form of a deleteAll() operation.
For the rocksdb-based state stores that Streams provides, we can just override
deleteAll() to wipe out the state directory as we do today. Note that this
approach would require a KIP, and there may be some compatibility concerns for
users of local custom state stores today: they should not be required to change
their code to implement deleteAll() and wipe out the state store. So we may
need to actually wipe out the state directory in the default implementation,
and leave it up to EOS users with remote storage to implement things for
semantic correctness. We could even make things easier for users by letting
them specify whether the state store is "remote" or "local", and then decide
which approach to choose when erasing the state.
was (Author: ableegoldman):
One option to fix this would be to add a deleteAll() method to the StateStore
interface(s), For compatibility, and because not all stores support a deleteAll
operation, this should have a default implementation that invokes the existing
delete(key) method on everything in the store. This will obviously suffer
performance-wise, but additional latency in state store restoration is
preferred over silently producing incorrect results with EOS. Also, we expect
that the vast majority of custom state stores will either be local & able to
just wipe out the state directory as usual, or else are backed by a storage
engine that supports some form of a deleteAll() operation.
For the rocksdb-based state stores that Streams provides, we can just override
deleteAll() to wipe out the state directory as we do today. Note that this
approach would require a KIP, and there may be some compatibility concerns for
users of local custom state stores today: they should not be required to change
their code to implement deleteAll() and wipe out the state store. So we may
need to actually wipe out the state directory in the default implementation,
and leave it up to EOS users with remote storage to implement things for
semantic correctness.
> Kafka Streams breaks EOS with remote state stores
> -------------------------------------------------
>
> Key: KAFKA-12475
> URL: https://issues.apache.org/jira/browse/KAFKA-12475
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: A. Sophie Blee-Goldman
> Priority: Major
> Labels: needs-kip
>
> Currently in Kafka Streams, exactly-once semantics (EOS) require that the
> state stores be completely erased and restored from the changelog from
> scratch in case of an error. This erasure is implemented by closing the state
> store and then simply wiping out the local state directory. This works fine
> for the two store implementations provided OOTB, in-memory and rocksdb, but
> fails when the application includes a custom StateStore based on remote
> storage, such as Redis. In this case Streams will fail to erase any of the
> data before reinserting data from the changelog, resulting in possible
> duplicates and breaking the guarantee of EOS.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)