mjsax commented on code in PR #21680: URL: https://github.com/apache/kafka/pull/21680#discussion_r2902669429
########## streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersIteratorAdapter.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.KeyValueIterator; + +/** + * This class is used to ensure backward compatibility at DSL level between + * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and + * {@link org.apache.kafka.streams.state.SessionStore}. + * <p> + * When iterating over session entries from a store that stores values with headers, + * this adapter strips the headers prefix so the caller receives raw aggregation bytes + * without headers. + * + * @see SessionToHeadersStoreAdapter + */ +class SessionToHeadersIteratorAdapter implements KeyValueIterator<Windowed<Bytes>, byte[]> { + private final KeyValueIterator<Windowed<Bytes>, byte[]> innerIterator; + + SessionToHeadersIteratorAdapter(final KeyValueIterator<Windowed<Bytes>, byte[]> innerIterator) { + this.innerIterator = innerIterator; + } + + @Override + public void close() { + innerIterator.close(); + } + + @Override + public Windowed<Bytes> peekNextKey() { + return innerIterator.peekNextKey(); + } + + @Override + public boolean hasNext() { + return innerIterator.hasNext(); + } + + @Override + public KeyValue<Windowed<Bytes>, byte[]> next() { + final KeyValue<Windowed<Bytes>, byte[]> keyValue = innerIterator.next(); Review Comment: Missing `null` check for `keyValue` ########## streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.SessionStore; + +import java.util.Map; + +import static org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat; + +/** + * This class is used to ensure backward compatibility at DSL level between + * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and + * {@link org.apache.kafka.streams.state.SessionStore}. + * <p> + * If a user provides a supplier for {@code SessionStore} (without headers) via + * {@link org.apache.kafka.streams.kstream.Materialized} when building + * a {@code SessionStoreWithHeaders}, this adapter is used to translate between + * the raw aggregation {@code byte[]} format and the aggregation-with-headers {@code byte[]} format. + * <p> + * On writes (put), empty headers are prepended to the raw aggregation value before + * delegating to the inner store. + * On reads (get, fetch, findSessions), the headers prefix is stripped from the stored value + * so the caller receives raw aggregation bytes without headers. + * + * @see SessionToHeadersIteratorAdapter + */ +@SuppressWarnings("unchecked") +public class SessionToHeadersStoreAdapter implements SessionStore<Bytes, byte[]> { + final SessionStore<Bytes, byte[]> store; + + SessionToHeadersStoreAdapter(final SessionStore<Bytes, byte[]> store) { + if (!store.persistent()) { + throw new IllegalArgumentException("Provided store must be a persistent store, but it is not."); + } + this.store = store; + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.findSessions(key, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long earliestSessionEndTime, + final long latestSessionEndTime) { + return new SessionToHeadersIteratorAdapter( + store.findSessions(earliestSessionEndTime, latestSessionEndTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.backwardFindSessions(key, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public byte[] fetchSession(final Bytes key, + final long sessionStartTime, + final long sessionEndTime) { + return AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key, sessionStartTime, sessionEndTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) { + return new SessionToHeadersIteratorAdapter(store.fetch(key)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes key) { + return new SessionToHeadersIteratorAdapter(store.backwardFetch(key)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { + return new SessionToHeadersIteratorAdapter(store.fetch(keyFrom, keyTo)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { + return new SessionToHeadersIteratorAdapter(store.backwardFetch(keyFrom, keyTo)); + } + + @Override + public void remove(final Windowed<Bytes> sessionKey) { + store.remove(sessionKey); + } + + @Override + public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) { + store.put(sessionKey, convertToHeaderFormat(aggregate)); Review Comment: I think we need to do the opposite here, and convert header to plain aggregations ########## streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.SessionStore; + +import java.util.Map; + +import static org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat; + +/** + * This class is used to ensure backward compatibility at DSL level between + * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and + * {@link org.apache.kafka.streams.state.SessionStore}. + * <p> + * If a user provides a supplier for {@code SessionStore} (without headers) via + * {@link org.apache.kafka.streams.kstream.Materialized} when building + * a {@code SessionStoreWithHeaders}, this adapter is used to translate between + * the raw aggregation {@code byte[]} format and the aggregation-with-headers {@code byte[]} format. + * <p> + * On writes (put), empty headers are prepended to the raw aggregation value before + * delegating to the inner store. + * On reads (get, fetch, findSessions), the headers prefix is stripped from the stored value + * so the caller receives raw aggregation bytes without headers. + * + * @see SessionToHeadersIteratorAdapter + */ +@SuppressWarnings("unchecked") +public class SessionToHeadersStoreAdapter implements SessionStore<Bytes, byte[]> { + final SessionStore<Bytes, byte[]> store; + + SessionToHeadersStoreAdapter(final SessionStore<Bytes, byte[]> store) { + if (!store.persistent()) { + throw new IllegalArgumentException("Provided store must be a persistent store, but it is not."); + } + this.store = store; + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.findSessions(key, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long earliestSessionEndTime, + final long latestSessionEndTime) { + return new SessionToHeadersIteratorAdapter( + store.findSessions(earliestSessionEndTime, latestSessionEndTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.backwardFindSessions(key, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public byte[] fetchSession(final Bytes key, + final long sessionStartTime, + final long sessionEndTime) { + return AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key, sessionStartTime, sessionEndTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) { + return new SessionToHeadersIteratorAdapter(store.fetch(key)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes key) { + return new SessionToHeadersIteratorAdapter(store.backwardFetch(key)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { + return new SessionToHeadersIteratorAdapter(store.fetch(keyFrom, keyTo)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { + return new SessionToHeadersIteratorAdapter(store.backwardFetch(keyFrom, keyTo)); + } + + @Override + public void remove(final Windowed<Bytes> sessionKey) { + store.remove(sessionKey); + } + + @Override + public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) { + store.put(sessionKey, convertToHeaderFormat(aggregate)); + } + + @Override + public String name() { + return store.name(); + } + + @Override + public void init(final StateStoreContext stateStoreContext, final StateStore root) { + store.init(stateStoreContext, root); + } + + @Override + public void commit(final Map<TopicPartition, Long> changelogOffsets) { + store.commit(changelogOffsets); + } + + @Override + public void close() { + store.close(); + } + + @Override + public boolean persistent() { + return store.persistent(); + } + + @Override + public boolean isOpen() { + return store.isOpen(); + } + + @Override + public <R> QueryResult<R> query(final Query<R> query, + final PositionBound positionBound, + final QueryConfig config) { + final long start = config.isCollectExecutionInfo() ? System.nanoTime() : -1L; + final QueryResult<R> result = store.query(query, positionBound, config); Review Comment: This is an adaptor store, we need to make a byte-array translation for `result`, right? Cf https://github.com/apache/kafka/pull/21643 and https://github.com/apache/kafka/pull/21650 ########## streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersIteratorAdapter.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.KeyValueIterator; + +/** + * This class is used to ensure backward compatibility at DSL level between + * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and + * {@link org.apache.kafka.streams.state.SessionStore}. + * <p> + * When iterating over session entries from a store that stores values with headers, + * this adapter strips the headers prefix so the caller receives raw aggregation bytes + * without headers. + * + * @see SessionToHeadersStoreAdapter + */ +class SessionToHeadersIteratorAdapter implements KeyValueIterator<Windowed<Bytes>, byte[]> { + private final KeyValueIterator<Windowed<Bytes>, byte[]> innerIterator; + + SessionToHeadersIteratorAdapter(final KeyValueIterator<Windowed<Bytes>, byte[]> innerIterator) { + this.innerIterator = innerIterator; + } + + @Override + public void close() { + innerIterator.close(); + } + + @Override + public Windowed<Bytes> peekNextKey() { + return innerIterator.peekNextKey(); + } + + @Override + public boolean hasNext() { + return innerIterator.hasNext(); + } + + @Override + public KeyValue<Windowed<Bytes>, byte[]> next() { + final KeyValue<Windowed<Bytes>, byte[]> keyValue = innerIterator.next(); + return KeyValue.pair(keyValue.key, AggregationWithHeadersDeserializer.rawAggregation(keyValue.value)); Review Comment: This is an adaptor, not a facade, so the inner store is a "plain store" and we need to convert from plain to header format. -- The code does the opposite right now what does not seems to be right? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.SessionStore; + +import java.util.Map; + +import static org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat; + +/** + * This class is used to ensure backward compatibility at DSL level between + * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and + * {@link org.apache.kafka.streams.state.SessionStore}. + * <p> + * If a user provides a supplier for {@code SessionStore} (without headers) via + * {@link org.apache.kafka.streams.kstream.Materialized} when building + * a {@code SessionStoreWithHeaders}, this adapter is used to translate between + * the raw aggregation {@code byte[]} format and the aggregation-with-headers {@code byte[]} format. + * <p> + * On writes (put), empty headers are prepended to the raw aggregation value before + * delegating to the inner store. + * On reads (get, fetch, findSessions), the headers prefix is stripped from the stored value + * so the caller receives raw aggregation bytes without headers. + * + * @see SessionToHeadersIteratorAdapter + */ +@SuppressWarnings("unchecked") +public class SessionToHeadersStoreAdapter implements SessionStore<Bytes, byte[]> { + final SessionStore<Bytes, byte[]> store; + + SessionToHeadersStoreAdapter(final SessionStore<Bytes, byte[]> store) { + if (!store.persistent()) { + throw new IllegalArgumentException("Provided store must be a persistent store, but it is not."); + } + this.store = store; + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.findSessions(key, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long earliestSessionEndTime, + final long latestSessionEndTime) { + return new SessionToHeadersIteratorAdapter( + store.findSessions(earliestSessionEndTime, latestSessionEndTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.backwardFindSessions(key, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public byte[] fetchSession(final Bytes key, + final long sessionStartTime, + final long sessionEndTime) { + return AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key, sessionStartTime, sessionEndTime)); Review Comment: Same -- we need to convert from plain to header format here, right? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.SessionStore; + +import java.util.Map; + +import static org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat; + +/** + * This class is used to ensure backward compatibility at DSL level between + * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and + * {@link org.apache.kafka.streams.state.SessionStore}. + * <p> + * If a user provides a supplier for {@code SessionStore} (without headers) via + * {@link org.apache.kafka.streams.kstream.Materialized} when building + * a {@code SessionStoreWithHeaders}, this adapter is used to translate between + * the raw aggregation {@code byte[]} format and the aggregation-with-headers {@code byte[]} format. + * <p> + * On writes (put), empty headers are prepended to the raw aggregation value before + * delegating to the inner store. + * On reads (get, fetch, findSessions), the headers prefix is stripped from the stored value + * so the caller receives raw aggregation bytes without headers. + * + * @see SessionToHeadersIteratorAdapter + */ +@SuppressWarnings("unchecked") +public class SessionToHeadersStoreAdapter implements SessionStore<Bytes, byte[]> { + final SessionStore<Bytes, byte[]> store; + + SessionToHeadersStoreAdapter(final SessionStore<Bytes, byte[]> store) { + if (!store.persistent()) { + throw new IllegalArgumentException("Provided store must be a persistent store, but it is not."); + } + this.store = store; + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.findSessions(key, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long earliestSessionEndTime, + final long latestSessionEndTime) { + return new SessionToHeadersIteratorAdapter( + store.findSessions(earliestSessionEndTime, latestSessionEndTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.backwardFindSessions(key, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFindSessions(final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return new SessionToHeadersIteratorAdapter( + store.backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime)); + } + + @Override + public byte[] fetchSession(final Bytes key, + final long sessionStartTime, + final long sessionEndTime) { + return AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key, sessionStartTime, sessionEndTime)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) { + return new SessionToHeadersIteratorAdapter(store.fetch(key)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes key) { + return new SessionToHeadersIteratorAdapter(store.backwardFetch(key)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { + return new SessionToHeadersIteratorAdapter(store.fetch(keyFrom, keyTo)); + } + + @Override + public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo) { + return new SessionToHeadersIteratorAdapter(store.backwardFetch(keyFrom, keyTo)); + } + + @Override + public void remove(final Windowed<Bytes> sessionKey) { + store.remove(sessionKey); + } + + @Override + public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) { Review Comment: ```suggestion public void put(final Windowed<Bytes> sessionKey, final byte[] aggregateWithHeader) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
