bbejeck commented on code in PR #22313:
URL: https://github.com/apache/kafka/pull/22313#discussion_r3318477640
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java:
##########
@@ -71,6 +73,71 @@ public class CachingSessionStore
this.maxObservedTimestamp = RecordQueue.UNKNOWN;
}
+ @Override
+ public ReadOnlySessionStore<Bytes, byte[]> readOnly(final IsolationLevel
isolationLevel) {
+ Objects.requireNonNull(isolationLevel, "isolationLevel cannot be
null");
+ if (isolationLevel == IsolationLevel.READ_COMMITTED) {
+ return wrapped().readOnly(isolationLevel);
+ }
+ return new ReadOnlyView(wrapped().readOnly(isolationLevel));
+ }
+
+ private final class ReadOnlyView implements ReadOnlySessionStore<Bytes,
byte[]> {
+
+ private final ReadOnlySessionStore<Bytes, byte[]> underlying;
+
+ ReadOnlyView(final ReadOnlySessionStore<Bytes, byte[]> underlying) {
+ this.underlying = underlying;
+ }
+
+ @Override
+ public byte[] fetchSession(final Bytes key, final long
earliestSessionEndTime, final long latestSessionStartTime) {
Review Comment:
nit: can we break the parameters into separate lines - here and elsewhere in
`ReadOnlyView`
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java:
##########
@@ -249,6 +333,14 @@ public KeyValueIterator<Windowed<Bytes>, byte[]>
backwardFindSessions(final Byte
final Bytes keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
+ return backwardFindSessionsInternal(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime, wrapped());
+ }
+
+ private KeyValueIterator<Windowed<Bytes>, byte[]>
backwardFindSessionsInternal(final Bytes keyFrom,
+
final Bytes keyTo,
Review Comment:
nit parameter alignment
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java:
##########
@@ -220,6 +297,14 @@ public KeyValueIterator<Windowed<Bytes>, byte[]>
findSessions(final Bytes keyFro
final Bytes
keyTo,
final long
earliestSessionEndTime,
final long
latestSessionStartTime) {
+ return findSessionsInternal(keyFrom, keyTo, earliestSessionEndTime,
latestSessionStartTime, wrapped());
+ }
+
+ private KeyValueIterator<Windowed<Bytes>, byte[]>
findSessionsInternal(final Bytes keyFrom,
+
final Bytes keyTo,
Review Comment:
nit: parameter alignment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]