aliehsaeedii commented on code in PR #21840:
URL: https://github.com/apache/kafka/pull/21840#discussion_r3219687112
##########
docs/streams/developer-guide/interactive-queries.md:
##########
@@ -135,12 +135,20 @@ Every application instance can directly query any of its
local state stores.
The _name_ of a state store is defined when you create the store. You can
create the store explicitly by using the Processor API or implicitly by using
stateful operations in the DSL.
-The _type_ of a state store is defined by `QueryableStoreType`. You can access
the built-in types via the class `QueryableStoreTypes`. Kafka Streams currently
has two built-in types:
+The _type_ of a state store is defined by `QueryableStoreType`. Pass a
built-in implementation from
[`QueryableStoreTypes`](/{version}/javadoc/org/apache/kafka/streams/state/QueryableStoreTypes.html)
as the second argument to `KafkaStreams#store(...)`. The helpers below are
those implementations:
Review Comment:
```suggestion
The _type_ of a state store is defined by `QueryableStoreType`. Pass a
built-in implementation from
[`QueryableStoreTypes`](/{version}/javadoc/org/apache/kafka/streams/state/QueryableStoreTypes.html)
as the second argument to `KafkaStreams#store(...)`. The available built-in
helpers are:
```
##########
docs/streams/developer-guide/processor-api.md:
##########
@@ -173,11 +173,16 @@ Yes (enabled by default)
* Stores its data on local disk.
* Storage capacity: managed local state can be larger than the memory (heap
space) of an application instance, but must fit into the available local disk
space.
* RocksDB settings can be fine-tuned, see [RocksDB
configuration](config-streams.html#streams-developer-guide-rocksdb-config).
- * Available [store
variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\)):
timestamped key-value store, versioned key-value store, time window key-value
store, session window key-value store.
+ * Available [store
variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html): plain
key-value store (values only — no embedded record timestamp in state),
timestamped key-value store, versioned key-value store, windowed store, session
store. Header-aware variants are also described below.
+ * Use
[persistentKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\))
when you need a persistent plain key-value store as above.
* Use
[persistentTimestampedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore\(java.lang.String\))
when you need a persistent key-(value/timestamp) store that supports
put/get/delete and range queries.
* Use
[persistentVersionedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore\(java.lang.String,java.time.Duration\))
when you need a persistent, versioned key-(value/timestamp) store that
supports put/get/delete and timestamped get operations.
- * Use
[persistentWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\))
or
[persistentTimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\))
when you need a persistent timeWindowedKey-value or
timeWindowedKey-(value/timestamp) store, respectively.
- * Use
[persistentSessionStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore\(java.lang.String,java.time.Duration\))
when you need a persistent sessionWindowedKey-value store.
+ * Use
[persistentWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\))
or
[persistentTimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\))
when you need a persistent plain windowed store or a persistent timestamped
windowed store, respectively.
+ * Use
[persistentSessionStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore\(java.lang.String,java.time.Duration\))
when you need a persistent session store.
+ * **Headers:** To persist [record
headers](https://kafka.apache.org/documentation/#recordheaders) in state, use a
`WithHeaders` store supplier with its matching `StoreBuilder` factory (see
[Headers in State Stores](#headers-in-state-stores) below). There are no
`WithHeaders` suppliers for plain persistent key-value or plain persistent
windowed stores — only persistent timestamped key-value, persistent timestamped
windowed, and session `WithHeaders` suppliers exist.
+ * Use
[`persistentTimestampedKeyValueStoreWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStoreWithHeaders\(java.lang.String\))
with
[`timestampedKeyValueStoreBuilderWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html)
when you need a persistent key-(value/timestamp) store that retains headers
and supports put/get/delete and range queries.
+ * Use
[`persistentTimestampedWindowStoreWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStoreWithHeaders\(java.lang.String,java.time.Duration,java.time.Duration,boolean\))
with
[`timestampedWindowStoreWithHeadersBuilder`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html)
when you need a persistent timestamped windowed store that retains headers.
+ * Use
[`persistentSessionStoreWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStoreWithHeaders\(java.lang.String,java.time.Duration\))
with
[`sessionStoreBuilderWithHeaders`](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html)
when you need a persistent session store that retains headers.
Review Comment:
I checked the code, we have ` SessionStoreWithHeadersBuilder`.
##########
docs/streams/developer-guide/interactive-queries.md:
##########
@@ -135,12 +135,20 @@ Every application instance can directly query any of its
local state stores.
The _name_ of a state store is defined when you create the store. You can
create the store explicitly by using the Processor API or implicitly by using
stateful operations in the DSL.
-The _type_ of a state store is defined by `QueryableStoreType`. You can access
the built-in types via the class `QueryableStoreTypes`. Kafka Streams currently
has two built-in types:
+The _type_ of a state store is defined by `QueryableStoreType`. Pass a
built-in implementation from
[`QueryableStoreTypes`](/{version}/javadoc/org/apache/kafka/streams/state/QueryableStoreTypes.html)
as the second argument to `KafkaStreams#store(...)`. The helpers below are
those implementations:
- * A key-value store `QueryableStoreTypes#keyValueStore()`, see Querying
local key-value stores.
- * A window store `QueryableStoreTypes#windowStore()`, see Querying local
window stores.
+ * **`QueryableStoreTypes#keyValueStore()`** — see [Querying local key-value
stores](#querying-local-key-value-stores).
+ * **`QueryableStoreTypes#timestampedKeyValueStore()`** — see [Querying local
key-value stores](#querying-local-key-value-stores).
+ * **`QueryableStoreTypes#timestampedKeyValueStoreWithHeaders()`** — see
[#header-aware-stores-interactive-queries](#header-aware-stores-interactive-queries).
+ * **`QueryableStoreTypes#windowStore()`** — see [Querying local window
stores](#querying-local-window-stores).
+ * **`QueryableStoreTypes#timestampedWindowStore()`** — see [Querying local
window stores](#querying-local-window-stores).
+ * **`QueryableStoreTypes#timestampedWindowStoreWithHeaders()`** — see
[#header-aware-stores-interactive-queries](#header-aware-stores-interactive-queries).
+ * **`QueryableStoreTypes#sessionStore()`** — see [Querying local window
stores](#querying-local-window-stores).
+ * **`QueryableStoreTypes#sessionStoreWithHeaders()`** — see
[#header-aware-stores-interactive-queries](#header-aware-stores-interactive-queries).
+### Header-aware stores and interactive queries
{#header-aware-stores-interactive-queries}
+For a [header-aware store](processor-api.html#headers-in-state-stores), use
the matching **`*WithHeaders()`** entry from the list above when interactive
query results must include headers.
Review Comment:
```suggestion
For a [header-aware store](processor-api.html#headers-in-state-stores), use
the **`*WithHeaders()`** entry that corresponds to your store type when query
results must include headers.
```
##########
docs/streams/developer-guide/processor-api.md:
##########
@@ -299,7 +304,17 @@ You can query timestamped state stores both with and
without a timestamp.
* For DSL operators, store data is upgraded lazily in the background.
* No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you
can opt-in by implementing the
[TimestampedBytesStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html)
interface. In this case, the old format is retained, and Streams uses a proxy
store that removes/adds timestamps on read/write.
+## Headers in State Stores {#headers-in-state-stores}
+You can materialize Kafka [record
headers](https://kafka.apache.org/documentation/#recordheaders) into
RocksDB-backed state together with keys and values. Plain persistent key-value
stores keep values without an embedded record timestamp; suppliers for
timestamped key-value, windowed, or session semantics expose timestamps
according to each store type. Use this when downstream processing needs access
to record headers from prior input — for example, when an aggregation or join
implemented with the Processor API must propagate headers to its output.
+
+Only persistent, RocksDB-backed suppliers exist for header-aware stores (the
`Stores` factory names start with `persistent` and end with `WithHeaders`).
There are no in-memory suppliers whose names end with `WithHeaders`.
Review Comment:
```suggestion
Only persistent, RocksDB-backed suppliers exist for header-aware stores (the
`Stores` factory names start with `persistent` and end with `WithHeaders`).
```
##########
docs/streams/developer-guide/processor-api.md:
##########
@@ -217,9 +222,9 @@ Yes (enabled by default)
* Stores its data in memory.
* Storage capacity: managed local state must fit into memory (heap space) of
an application instance.
* Useful when application instances run in an environment where local disk
space is either not available or local disk space is wiped in-between app
instance restarts.
- * Available [store
variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-):
time window key-value store, session window key-value store.
+ * Available [store
variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-):
plain key-value store, timestamped key-value store, windowed store, session
store.
* Use
[TimestampedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedKeyValueStore.html)
when you need a key-(value/timestamp) store that supports put/get/delete and
range queries.
- * Use
[TimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html)
when you need to store windowedKey-(value/timestamp) pairs.
+ * Use
[TimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedWindowStore.html)
when you need to store timestamped windowed key-(value/timestamp) pairs.
Review Comment:
"timestamped windowed key" is misleading: the windowed key isn't
timestamped, the value is. The previous wording (windowedKey-(value/timestamp))
was more precise. Please revert that adjective.
##########
docs/streams/developer-guide/interactive-queries.md:
##########
@@ -135,12 +135,20 @@ Every application instance can directly query any of its
local state stores.
The _name_ of a state store is defined when you create the store. You can
create the store explicitly by using the Processor API or implicitly by using
stateful operations in the DSL.
-The _type_ of a state store is defined by `QueryableStoreType`. You can access
the built-in types via the class `QueryableStoreTypes`. Kafka Streams currently
has two built-in types:
+The _type_ of a state store is defined by `QueryableStoreType`. Pass a
built-in implementation from
[`QueryableStoreTypes`](/{version}/javadoc/org/apache/kafka/streams/state/QueryableStoreTypes.html)
as the second argument to `KafkaStreams#store(...)`. The helpers below are
those implementations:
- * A key-value store `QueryableStoreTypes#keyValueStore()`, see Querying
local key-value stores.
- * A window store `QueryableStoreTypes#windowStore()`, see Querying local
window stores.
+ * **`QueryableStoreTypes#keyValueStore()`** — see [Querying local key-value
stores](#querying-local-key-value-stores).
+ * **`QueryableStoreTypes#timestampedKeyValueStore()`** — see [Querying local
key-value stores](#querying-local-key-value-stores).
+ * **`QueryableStoreTypes#timestampedKeyValueStoreWithHeaders()`** — see
[#header-aware-stores-interactive-queries](#header-aware-stores-interactive-queries).
+ * **`QueryableStoreTypes#windowStore()`** — see [Querying local window
stores](#querying-local-window-stores).
+ * **`QueryableStoreTypes#timestampedWindowStore()`** — see [Querying local
window stores](#querying-local-window-stores).
+ * **`QueryableStoreTypes#timestampedWindowStoreWithHeaders()`** — see
[#header-aware-stores-interactive-queries](#header-aware-stores-interactive-queries).
+ * **`QueryableStoreTypes#sessionStore()`** — see [Querying local window
stores](#querying-local-window-stores).
+ * **`QueryableStoreTypes#sessionStoreWithHeaders()`** — see
[#header-aware-stores-interactive-queries](#header-aware-stores-interactive-queries).
Review Comment:
In lines 142, 145, and 147: The link text is the raw anchor. Replace with
`see [Header-aware stores and
interactive queries](#header-aware-stores-interactive-queries)` to match
the target heading defined at line 149.
##########
docs/streams/developer-guide/interactive-queries.md:
##########
@@ -150,8 +158,10 @@ Kafka Streams materializes one state store per stream
partition. This means your
## Querying local key-value stores
Review Comment:
The document goes h1 (line 126) → h3 (line 149) → h2 (line 161). Skipping a
level (no h2 between the h1 and the h3) breaks the heading hierarchy. Either
- Promote to h2 so it sits as a sibling of ## Querying local key-value
stores, ## Querying local window stores, ## Querying local custom state stores.
The whole "Querying local …" family
lines up under the h1.
- Demote to a **Note:** block (no heading at all) appended to the bullet
list, since the body is just one sentence. This is what I'd actually recommend
— a heading for one sentence is
overkill.
##########
docs/streams/developer-guide/interactive-queries.md:
##########
@@ -135,12 +135,20 @@ Every application instance can directly query any of its
local state stores.
The _name_ of a state store is defined when you create the store. You can
create the store explicitly by using the Processor API or implicitly by using
stateful operations in the DSL.
-The _type_ of a state store is defined by `QueryableStoreType`. You can access
the built-in types via the class `QueryableStoreTypes`. Kafka Streams currently
has two built-in types:
+The _type_ of a state store is defined by `QueryableStoreType`. Pass a
built-in implementation from
[`QueryableStoreTypes`](/{version}/javadoc/org/apache/kafka/streams/state/QueryableStoreTypes.html)
as the second argument to `KafkaStreams#store(...)`. The helpers below are
those implementations:
- * A key-value store `QueryableStoreTypes#keyValueStore()`, see Querying
local key-value stores.
- * A window store `QueryableStoreTypes#windowStore()`, see Querying local
window stores.
+ * **`QueryableStoreTypes#keyValueStore()`** — see [Querying local key-value
stores](#querying-local-key-value-stores).
+ * **`QueryableStoreTypes#timestampedKeyValueStore()`** — see [Querying local
key-value stores](#querying-local-key-value-stores).
+ * **`QueryableStoreTypes#timestampedKeyValueStoreWithHeaders()`** — see
[#header-aware-stores-interactive-queries](#header-aware-stores-interactive-queries).
+ * **`QueryableStoreTypes#windowStore()`** — see [Querying local window
stores](#querying-local-window-stores).
+ * **`QueryableStoreTypes#timestampedWindowStore()`** — see [Querying local
window stores](#querying-local-window-stores).
+ * **`QueryableStoreTypes#timestampedWindowStoreWithHeaders()`** — see
[#header-aware-stores-interactive-queries](#header-aware-stores-interactive-queries).
+ * **`QueryableStoreTypes#sessionStore()`** — see [Querying local window
stores](#querying-local-window-stores).
+ * **`QueryableStoreTypes#sessionStoreWithHeaders()`** — see
[#header-aware-stores-interactive-queries](#header-aware-stores-interactive-queries).
+### Header-aware stores and interactive queries
{#header-aware-stores-interactive-queries}
+For a [header-aware store](processor-api.html#headers-in-state-stores), use
the matching **`*WithHeaders()`** entry from the list above when interactive
query results must include headers.
Review Comment:
`matching` is a bit vague.
##########
docs/streams/developer-guide/processor-api.md:
##########
@@ -217,9 +222,9 @@ Yes (enabled by default)
* Stores its data in memory.
* Storage capacity: managed local state must fit into memory (heap space) of
an application instance.
* Useful when application instances run in an environment where local disk
space is either not available or local disk space is wiped in-between app
instance restarts.
- * Available [store
variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-):
time window key-value store, session window key-value store.
+ * Available [store
variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#inMemoryKeyValueStore-java.lang.String-):
plain key-value store, timestamped key-value store, windowed store, session
store.
Review Comment:
We don't have in-memory `timestamped key-value store`.
##########
docs/streams/developer-guide/processor-api.md:
##########
@@ -173,11 +173,16 @@ Yes (enabled by default)
* Stores its data on local disk.
* Storage capacity: managed local state can be larger than the memory (heap
space) of an application instance, but must fit into the available local disk
space.
* RocksDB settings can be fine-tuned, see [RocksDB
configuration](config-streams.html#streams-developer-guide-rocksdb-config).
- * Available [store
variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\)):
timestamped key-value store, versioned key-value store, time window key-value
store, session window key-value store.
+ * Available [store
variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html): plain
key-value store (values only — no embedded record timestamp in state),
timestamped key-value store, versioned key-value store, windowed store, session
store. Header-aware variants are also described below.
+ * Use
[persistentKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\))
when you need a persistent plain key-value store as above.
* Use
[persistentTimestampedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedKeyValueStore\(java.lang.String\))
when you need a persistent key-(value/timestamp) store that supports
put/get/delete and range queries.
* Use
[persistentVersionedKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentVersionedKeyValueStore\(java.lang.String,java.time.Duration\))
when you need a persistent, versioned key-(value/timestamp) store that
supports put/get/delete and timestamped get operations.
- * Use
[persistentWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\))
or
[persistentTimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\))
when you need a persistent timeWindowedKey-value or
timeWindowedKey-(value/timestamp) store, respectively.
- * Use
[persistentSessionStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore\(java.lang.String,java.time.Duration\))
when you need a persistent sessionWindowedKey-value store.
+ * Use
[persistentWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\))
or
[persistentTimestampedWindowStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentTimestampedWindowStore\(java.lang.String,java.time.Duration,java.time.Duration,boolean\))
when you need a persistent plain windowed store or a persistent timestamped
windowed store, respectively.
+ * Use
[persistentSessionStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentSessionStore\(java.lang.String,java.time.Duration\))
when you need a persistent session store.
+ * **Headers:** To persist [record
headers](https://kafka.apache.org/documentation/#recordheaders) in state, use a
`WithHeaders` store supplier with its matching `StoreBuilder` factory (see
[Headers in State Stores](#headers-in-state-stores) below). There are no
`WithHeaders` suppliers for plain persistent key-value or plain persistent
windowed stores — only persistent timestamped key-value, persistent timestamped
windowed, and session `WithHeaders` suppliers exist.
Review Comment:
`There are no WithHeaders suppliers for ...`
Please split this sentence into two short sentences.
##########
docs/streams/developer-guide/processor-api.md:
##########
@@ -173,11 +173,16 @@ Yes (enabled by default)
* Stores its data on local disk.
* Storage capacity: managed local state can be larger than the memory (heap
space) of an application instance, but must fit into the available local disk
space.
* RocksDB settings can be fine-tuned, see [RocksDB
configuration](config-streams.html#streams-developer-guide-rocksdb-config).
- * Available [store
variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\)):
timestamped key-value store, versioned key-value store, time window key-value
store, session window key-value store.
+ * Available [store
variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html): plain
key-value store (values only — no embedded record timestamp in state),
timestamped key-value store, versioned key-value store, windowed store, session
store. Header-aware variants are also described below.
+ * Use
[persistentKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\))
when you need a persistent plain key-value store as above.
Review Comment:
```suggestion
* Use
[persistentKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\))
when you need a persistent plain key-value store (no embedded record
timestamp).
```
##########
docs/streams/developer-guide/processor-api.md:
##########
@@ -299,7 +304,17 @@ You can query timestamped state stores both with and
without a timestamp.
* For DSL operators, store data is upgraded lazily in the background.
* No upgrade happens if you provide a custom XxxBytesStoreSupplier, but you
can opt-in by implementing the
[TimestampedBytesStore](/{version}/javadoc/org/apache/kafka/streams/state/TimestampedBytesStore.html)
interface. In this case, the old format is retained, and Streams uses a proxy
store that removes/adds timestamps on read/write.
+## Headers in State Stores {#headers-in-state-stores}
+You can materialize Kafka [record
headers](https://kafka.apache.org/documentation/#recordheaders) into
RocksDB-backed state together with keys and values. Plain persistent key-value
stores keep values without an embedded record timestamp; suppliers for
timestamped key-value, windowed, or session semantics expose timestamps
according to each store type. Use this when downstream processing needs access
to record headers from prior input — for example, when an aggregation or join
implemented with the Processor API must propagate headers to its output.
+
+Only persistent, RocksDB-backed suppliers exist for header-aware stores (the
`Stores` factory names start with `persistent` and end with `WithHeaders`).
There are no in-memory suppliers whose names end with `WithHeaders`.
Review Comment:
The second sentence repeats the first. Drop it!
##########
docs/streams/upgrade-guide.md:
##########
@@ -75,6 +75,18 @@ Kafka Streams now allows to purge local state directories
and checkpoint files d
Kafka Streams now persists state store changelog offsets inside each state
store rather than in a single per-task `.checkpoint` file
([KIP-1035](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets)).
This is an internal infrastructure change and is transparent to most users —
existing per-task `.checkpoint` files are migrated automatically on first
startup, and no application or operator action is required. EOS crash behavior
is unchanged in 4.3: state stores are still wiped and fully restored from the
changelog. KIP-1035 is a prerequisite for [KIP-892: Transactional Semantics for
StateStores](https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores),
which will use these per-store offsets to make EOS state writes transactional
and skip the full restore. Authors of custom `StateStore` implementations may
opt-in to managing their own offsets via `managesOffsets()`,
`commit(Map<TopicPartition,
Long>)`, and `committedOffset(TopicPartition)`; see KIP-1035 for the API. For
downgrade implications, see [Notable compatibility changes in past
releases](#notable-compatibility-changes-in-past-releases).
+### Header-aware state stores for the Processor API (KIP-1271)
{#kip-1271-headers-aware-stores}
+
+Kafka Streams adds **header-aware** state stores. Opt in with the new `Stores`
suppliers whose names end with `WithHeaders` and the matching `StoreBuilder`
factories—for example `persistentTimestampedKeyValueStoreWithHeaders` with
`timestampedKeyValueStoreBuilderWithHeaders`,
`persistentTimestampedWindowStoreWithHeaders` with
`timestampedWindowStoreWithHeadersBuilder`, and
`persistentSessionStoreWithHeaders` with `sessionStoreBuilderWithHeaders`. See
the [Processor API state store
documentation](developer-guide/processor-api.html#headers-in-state-stores).
Review Comment:
long sentence with em dash followed by three name pairs. Either bullet the
pairs or break into two sentences.
##########
docs/streams/developer-guide/interactive-queries.md:
##########
@@ -150,8 +158,10 @@ Kafka Streams materializes one state store per stream
partition. This means your
## Querying local key-value stores
Review Comment:
Why is that in h2? I see heading-level jump (h1 (line 126) → h3 (line 149)→
h2: this line)
##########
docs/streams/developer-guide/processor-api.md:
##########
@@ -173,11 +173,16 @@ Yes (enabled by default)
* Stores its data on local disk.
* Storage capacity: managed local state can be larger than the memory (heap
space) of an application instance, but must fit into the available local disk
space.
* RocksDB settings can be fine-tuned, see [RocksDB
configuration](config-streams.html#streams-developer-guide-rocksdb-config).
- * Available [store
variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\)):
timestamped key-value store, versioned key-value store, time window key-value
store, session window key-value store.
+ * Available [store
variants](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html): plain
key-value store (values only — no embedded record timestamp in state),
timestamped key-value store, versioned key-value store, windowed store, session
store. Header-aware variants are also described below.
+ * Use
[persistentKeyValueStore](/{version}/javadoc/org/apache/kafka/streams/state/Stores.html#persistentKeyValueStore\(java.lang.String\))
when you need a persistent plain key-value store as above.
Review Comment:
`as above` is vague.
##########
docs/streams/developer-guide/interactive-queries.md:
##########
@@ -150,8 +158,10 @@ Kafka Streams materializes one state store per stream
partition. This means your
## Querying local key-value stores
Review Comment:
Another issue: The new ### Header-aware stores… heading at line 149 splits
the intro in half. The two general lines that follow it — "You can also
implement your own QueryableStoreType…" and the Note about per-partition stores
— apply to all QueryableStoreTypes, but the new heading makes them visually
belong to the header-aware subsection only. Move those two lines above the new
heading (or drop the heading entirely) so they stay part of the general intro.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]