Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-15 Thread via GitHub


github-actions[bot] commented on PR #30317:
URL: https://github.com/apache/beam/pull/30317#issuecomment-2112557161

   Stopping reviewer notifications for this pull request: requested by reviewer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-15 Thread via GitHub


shunping commented on PR #30317:
URL: https://github.com/apache/beam/pull/30317#issuecomment-2112554061

   stop reviewer notifications


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-15 Thread via GitHub


github-actions[bot] commented on PR #30317:
URL: https://github.com/apache/beam/pull/30317#issuecomment-2112367951

   Assigning new set of reviewers because Pr has gone too long without review. 
If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any 
comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-11 Thread via GitHub


github-actions[bot] commented on PR #30317:
URL: https://github.com/apache/beam/pull/30317#issuecomment-2105726584

   Reminder, please take a look at this pr: @Abacn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-02 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1588300082


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.Cache;
+import org.apache.beam.fn.harness.Caches;
+import 
org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of a bag user state that utilizes the Beam Fn State API 
to fetch, clear and
+ * persist values.
+ *
+ * Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ */
+public class OrderedListUserState {
+  private final BeamFnStateClient beamFnStateClient;
+  private final StateRequest request;
+  private final Coder valueCoder;
+  private final TimestampedValueCoder timestampedValueCoder;
+  // Pending updates to persistent storage
+  private NavigableMap> pendingAdds = Maps.newTreeMap();
+  private TreeRangeSet pendingRemoves = TreeRangeSet.create();
+
+  private boolean isCleared = false;
+  private boolean isClosed = false;
+
+  public OrderedListUserState(
+  Cache cache,
+  BeamFnStateClient beamFnStateClient,
+  String instructionId,
+  StateKey stateKey,
+  Coder valueCoder) {
+checkArgument(
+stateKey.hasOrderedListUserState(),
+"Expected OrderedListUserState StateKey but received %s.",
+stateKey);
+this.beamFnStateClient = beamFnStateClient;
+this.valueCoder = valueCoder;
+this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder);
+this.request =
+
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
+  }
+
+  public void add(TimestampedValue value) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer usable because it is closed for 
%s",
+request.getStateKey());
+Instant timestamp = value.getTimestamp();
+pendingAdds.putIfAbsent(timestamp, new ArrayList<>());
+pendingAdds.get(timestamp).add(value.getValue());
+  }
+
+  public Iterable> readRange(Instant minTimestamp, Instant 
limitTimestamp) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-02 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1588300082


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.Cache;
+import org.apache.beam.fn.harness.Caches;
+import 
org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of a bag user state that utilizes the Beam Fn State API 
to fetch, clear and
+ * persist values.
+ *
+ * Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ */
+public class OrderedListUserState {
+  private final BeamFnStateClient beamFnStateClient;
+  private final StateRequest request;
+  private final Coder valueCoder;
+  private final TimestampedValueCoder timestampedValueCoder;
+  // Pending updates to persistent storage
+  private NavigableMap> pendingAdds = Maps.newTreeMap();
+  private TreeRangeSet pendingRemoves = TreeRangeSet.create();
+
+  private boolean isCleared = false;
+  private boolean isClosed = false;
+
+  public OrderedListUserState(
+  Cache cache,
+  BeamFnStateClient beamFnStateClient,
+  String instructionId,
+  StateKey stateKey,
+  Coder valueCoder) {
+checkArgument(
+stateKey.hasOrderedListUserState(),
+"Expected OrderedListUserState StateKey but received %s.",
+stateKey);
+this.beamFnStateClient = beamFnStateClient;
+this.valueCoder = valueCoder;
+this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder);
+this.request =
+
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
+  }
+
+  public void add(TimestampedValue value) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer usable because it is closed for 
%s",
+request.getStateKey());
+Instant timestamp = value.getTimestamp();
+pendingAdds.putIfAbsent(timestamp, new ArrayList<>());
+pendingAdds.get(timestamp).add(value.getValue());
+  }
+
+  public Iterable> readRange(Instant minTimestamp, Instant 
limitTimestamp) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-02 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1588140845


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java:
##
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static java.util.Arrays.asList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.beam.fn.harness.Caches;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class OrderedListUserStateTest {
+  private static final TimestampedValue A1 =
+  TimestampedValue.of("A1", Instant.ofEpochMilli(1));
+  private static final TimestampedValue B1 =
+  TimestampedValue.of("B1", Instant.ofEpochMilli(1));
+  private static final TimestampedValue A2 =
+  TimestampedValue.of("A2", Instant.ofEpochMilli(2));
+  private static final TimestampedValue B2 =
+  TimestampedValue.of("B2", Instant.ofEpochMilli(2));
+  private static final TimestampedValue A3 =
+  TimestampedValue.of("A3", Instant.ofEpochMilli(3));
+  private static final TimestampedValue A4 =
+  TimestampedValue.of("A4", Instant.ofEpochMilli(4));
+
+  private final String pTransformId = "pTransformId";
+  private final String stateId = "stateId";
+  private final String encodedWindow = "encodedWindow";
+
+  @Test
+  public void testNoPersistedValues() throws Exception {
+FakeBeamFnStateClient fakeClient = new 
FakeBeamFnStateClient(Collections.emptyMap());
+OrderedListUserState userState =
+new OrderedListUserState<>(
+Caches.noop(),
+fakeClient,
+"instructionId",
+createOrderedListStateKey("A"),
+StringUtf8Coder.of());
+assertThat(userState.read(), is(emptyIterable()));
+  }
+
+  @Test
+  public void testRead() throws Exception {
+FakeBeamFnStateClient fakeClient =
+new FakeBeamFnStateClient(
+TimestampedValueCoder.of(StringUtf8Coder.of()),
+ImmutableMap.of(createOrderedListStateKey("A", 1), asList(A1, 
B1)));
+OrderedListUserState userState =
+new OrderedListUserState<>(
+Caches.noop(),
+fakeClient,
+"instructionId",
+createOrderedListStateKey("A"),
+StringUtf8Coder.of());
+
+assertArrayEquals(
+asList(A1, B1).toArray(), Iterables.toArray(userState.read(), 
TimestampedValue.class));
+userState.asyncClose();
+assertThrows(IllegalStateException.class, () -> userState.read());
+  }
+
+  @Test
+  public void testReadRange() throws Exception {
+FakeBeamFnStateClient fakeClient =
+new FakeBeamFnStateClient(
+TimestampedValueCoder.of(StringUtf8Coder.of()),
+ImmutableMap.of(
+createOrderedListStateKey("A", 1), asList(A1, B1),
+createOrderedListStateKey("A", 4), 
Collections.singletonList(A4),
+createOrderedListStateKey("A", 2), 
Collections.singletonList(A2)));
+
+OrderedListUserState userState =
+new OrderedListUserState<>(
+Caches.noop(),
+fakeClient,
+"instructionId",
+createOrderedListStateKey("A"),
+   

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-02 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1588027273


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##


Review Comment:
   Yes. I think we will need a separate one for orderedlist because although 
orderedlist and multimap have similar semantics, the code paths are totally 
different.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-02 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1588027273


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##


Review Comment:
   Yes. I think we will need a separate one for orderedlist because although 
orderedlist and multimap have similar semantics, the implementation is 
different.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-02 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1588084688


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.Cache;
+import org.apache.beam.fn.harness.Caches;
+import 
org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of a bag user state that utilizes the Beam Fn State API 
to fetch, clear and

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-02 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1588080541


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java:
##
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static java.util.Arrays.asList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.beam.fn.harness.Caches;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class OrderedListUserStateTest {
+  private static final TimestampedValue A1 =
+  TimestampedValue.of("A1", Instant.ofEpochMilli(1));
+  private static final TimestampedValue B1 =
+  TimestampedValue.of("B1", Instant.ofEpochMilli(1));
+  private static final TimestampedValue A2 =
+  TimestampedValue.of("A2", Instant.ofEpochMilli(2));
+  private static final TimestampedValue B2 =
+  TimestampedValue.of("B2", Instant.ofEpochMilli(2));
+  private static final TimestampedValue A3 =
+  TimestampedValue.of("A3", Instant.ofEpochMilli(3));
+  private static final TimestampedValue A4 =
+  TimestampedValue.of("A4", Instant.ofEpochMilli(4));
+
+  private final String pTransformId = "pTransformId";
+  private final String stateId = "stateId";
+  private final String encodedWindow = "encodedWindow";
+
+  @Test
+  public void testNoPersistedValues() throws Exception {
+FakeBeamFnStateClient fakeClient = new 
FakeBeamFnStateClient(Collections.emptyMap());
+OrderedListUserState userState =
+new OrderedListUserState<>(
+Caches.noop(),
+fakeClient,
+"instructionId",
+createOrderedListStateKey("A"),
+StringUtf8Coder.of());
+assertThat(userState.read(), is(emptyIterable()));
+  }
+
+  @Test
+  public void testRead() throws Exception {
+FakeBeamFnStateClient fakeClient =
+new FakeBeamFnStateClient(
+TimestampedValueCoder.of(StringUtf8Coder.of()),
+ImmutableMap.of(createOrderedListStateKey("A", 1), asList(A1, 
B1)));
+OrderedListUserState userState =
+new OrderedListUserState<>(
+Caches.noop(),
+fakeClient,
+"instructionId",
+createOrderedListStateKey("A"),
+StringUtf8Coder.of());
+
+assertArrayEquals(
+asList(A1, B1).toArray(), Iterables.toArray(userState.read(), 
TimestampedValue.class));
+userState.asyncClose();
+assertThrows(IllegalStateException.class, () -> userState.read());
+  }
+
+  @Test
+  public void testReadRange() throws Exception {
+FakeBeamFnStateClient fakeClient =
+new FakeBeamFnStateClient(
+TimestampedValueCoder.of(StringUtf8Coder.of()),
+ImmutableMap.of(
+createOrderedListStateKey("A", 1), asList(A1, B1),
+createOrderedListStateKey("A", 4), 
Collections.singletonList(A4),
+createOrderedListStateKey("A", 2), 
Collections.singletonList(A2)));
+
+OrderedListUserState userState =
+new OrderedListUserState<>(
+Caches.noop(),
+fakeClient,
+"instructionId",
+createOrderedListStateKey("A"),
+   

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-02 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1588027273


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##


Review Comment:
   Yes. I think we will need a separate one for orderedlist because although 
orderedlist and multimap has similar semantics, the implementation is 
different.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-02 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1587995856


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.Cache;
+import org.apache.beam.fn.harness.Caches;
+import 
org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of a bag user state that utilizes the Beam Fn State API 
to fetch, clear and
+ * persist values.
+ *
+ * Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ */
+public class OrderedListUserState {
+  private final BeamFnStateClient beamFnStateClient;
+  private final StateRequest request;
+  private final Coder valueCoder;
+  private final TimestampedValueCoder timestampedValueCoder;
+  // Pending updates to persistent storage
+  private NavigableMap> pendingAdds = Maps.newTreeMap();
+  private TreeRangeSet pendingRemoves = TreeRangeSet.create();
+
+  private boolean isCleared = false;
+  private boolean isClosed = false;
+
+  public OrderedListUserState(
+  Cache cache,
+  BeamFnStateClient beamFnStateClient,
+  String instructionId,
+  StateKey stateKey,
+  Coder valueCoder) {
+checkArgument(
+stateKey.hasOrderedListUserState(),
+"Expected OrderedListUserState StateKey but received %s.",
+stateKey);
+this.beamFnStateClient = beamFnStateClient;
+this.valueCoder = valueCoder;
+this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder);
+this.request =
+
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
+  }
+
+  public void add(TimestampedValue value) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer usable because it is closed for 
%s",
+request.getStateKey());
+Instant timestamp = value.getTimestamp();
+pendingAdds.putIfAbsent(timestamp, new ArrayList<>());
+pendingAdds.get(timestamp).add(value.getValue());
+  }
+
+  public Iterable> readRange(Instant minTimestamp, Instant 
limitTimestamp) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-02 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1587995856


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.Cache;
+import org.apache.beam.fn.harness.Caches;
+import 
org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of a bag user state that utilizes the Beam Fn State API 
to fetch, clear and
+ * persist values.
+ *
+ * Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ */
+public class OrderedListUserState {
+  private final BeamFnStateClient beamFnStateClient;
+  private final StateRequest request;
+  private final Coder valueCoder;
+  private final TimestampedValueCoder timestampedValueCoder;
+  // Pending updates to persistent storage
+  private NavigableMap> pendingAdds = Maps.newTreeMap();
+  private TreeRangeSet pendingRemoves = TreeRangeSet.create();
+
+  private boolean isCleared = false;
+  private boolean isClosed = false;
+
+  public OrderedListUserState(
+  Cache cache,
+  BeamFnStateClient beamFnStateClient,
+  String instructionId,
+  StateKey stateKey,
+  Coder valueCoder) {
+checkArgument(
+stateKey.hasOrderedListUserState(),
+"Expected OrderedListUserState StateKey but received %s.",
+stateKey);
+this.beamFnStateClient = beamFnStateClient;
+this.valueCoder = valueCoder;
+this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder);
+this.request =
+
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
+  }
+
+  public void add(TimestampedValue value) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer usable because it is closed for 
%s",
+request.getStateKey());
+Instant timestamp = value.getTimestamp();
+pendingAdds.putIfAbsent(timestamp, new ArrayList<>());
+pendingAdds.get(timestamp).add(value.getValue());
+  }
+
+  public Iterable> readRange(Instant minTimestamp, Instant 
limitTimestamp) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-02 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1587899863


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.Cache;
+import org.apache.beam.fn.harness.Caches;
+import 
org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of a bag user state that utilizes the Beam Fn State API 
to fetch, clear and
+ * persist values.
+ *
+ * Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ */
+public class OrderedListUserState {
+  private final BeamFnStateClient beamFnStateClient;
+  private final StateRequest request;
+  private final Coder valueCoder;
+  private final TimestampedValueCoder timestampedValueCoder;
+  // Pending updates to persistent storage
+  private NavigableMap> pendingAdds = Maps.newTreeMap();
+  private TreeRangeSet pendingRemoves = TreeRangeSet.create();
+
+  private boolean isCleared = false;
+  private boolean isClosed = false;
+
+  public OrderedListUserState(
+  Cache cache,
+  BeamFnStateClient beamFnStateClient,
+  String instructionId,
+  StateKey stateKey,
+  Coder valueCoder) {
+checkArgument(
+stateKey.hasOrderedListUserState(),
+"Expected OrderedListUserState StateKey but received %s.",
+stateKey);
+this.beamFnStateClient = beamFnStateClient;
+this.valueCoder = valueCoder;
+this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder);
+this.request =
+
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
+  }
+
+  public void add(TimestampedValue value) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer usable because it is closed for 
%s",
+request.getStateKey());
+Instant timestamp = value.getTimestamp();
+pendingAdds.putIfAbsent(timestamp, new ArrayList<>());
+pendingAdds.get(timestamp).add(value.getValue());
+  }
+
+  public Iterable> readRange(Instant minTimestamp, Instant 
limitTimestamp) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-02 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1587892941


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +1103,12 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-02 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1587889713


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java:
##
@@ -600,8 +603,74 @@ public  MultimapState 
bindMultimap(
   @Override
   public  OrderedListState bindOrderedList(
   String id, StateSpec> spec, Coder elemCoder) {
-throw new UnsupportedOperationException(
-"TODO: Add support for a sorted-list state to the Fn API.");
+return (OrderedListState)
+stateKeyObjectCache.computeIfAbsent(
+createOrderedListUserStateKey(id),
+new Function() {
+  @Override
+  public Object apply(StateKey key) {
+return new OrderedListState() {
+  private final OrderedListUserState impl =
+  createOrderedListUserState(key, elemCoder);
+
+  @Override
+  public void clear() {

Review Comment:
   The clear() function in OrderedListState is quite similar to its counterpart 
in MultiMap, and I don't think we ever has the notion of deleting all states 
associated with a state object. There is one function called "asyncClose()" 
which will sync the local copy with the remote one and then *invalidate* any 
states associated with the state object.
   
   On a different topic, I notice that we have a clear() function in the 
implementation, which handles the special case of deleting all elements from an 
OrderedList more efficiently than clearRange(min, max). I should probably use 
that instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-05-02 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1587688898


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1063,6 +1087,10 @@ message StateAppendRequest {
   // Represents a part of a logical byte stream. Elements within
   // the logical byte stream are encoded in the nested context and
   // multiple append requests are concatenated together.
+  // For OrderedListState, elements of TimeStampedValue should be encoded
+  // with TimestampedValueCoder.of(LengthPrefixCoder.of(Coder)), so that
+  // the request handler knows how to decode timestamps from the data without
+  // decoding the value bytes.

Review Comment:
   This comment has been updated in the proto change only PR (#31092)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-04-03 Thread via GitHub


Abacn commented on PR #30317:
URL: https://github.com/apache/beam/pull/30317#issuecomment-2034822148

   waiting on author


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-04-03 Thread via GitHub


github-actions[bot] commented on PR #30317:
URL: https://github.com/apache/beam/pull/30317#issuecomment-2034415747

   Assigning new set of reviewers because Pr has gone too long without review. 
If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @Abacn for label java.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any 
comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-29 Thread via GitHub


github-actions[bot] commented on PR #30317:
URL: https://github.com/apache/beam/pull/30317#issuecomment-2027168571

   Reminder, please take a look at this pr: @kennknowles 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-22 Thread via GitHub


github-actions[bot] commented on PR #30317:
URL: https://github.com/apache/beam/pull/30317#issuecomment-2014951449

   Assigning new set of reviewers because Pr has gone too long without review. 
If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any 
comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-19 Thread via GitHub


acrites commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1531296538


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##


Review Comment:
   Or are we going to piggy-back on multimap for this? (If so we should delete 
the TODO.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-19 Thread via GitHub


acrites commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1531294554


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##


Review Comment:
   Do we also need to add something here: 
https://github.com/apache/beam/blob/fb7ba65e2236f3dd871b6e492afc07249a4a5c49/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L478



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-18 Thread via GitHub


acrites commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1529368187


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java:
##
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static java.util.Arrays.asList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.beam.fn.harness.Caches;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class OrderedListUserStateTest {
+  private static final TimestampedValue A1 =
+  TimestampedValue.of("A1", Instant.ofEpochMilli(1));
+  private static final TimestampedValue B1 =
+  TimestampedValue.of("B1", Instant.ofEpochMilli(1));
+  private static final TimestampedValue A2 =
+  TimestampedValue.of("A2", Instant.ofEpochMilli(2));
+  private static final TimestampedValue B2 =
+  TimestampedValue.of("B2", Instant.ofEpochMilli(2));
+  private static final TimestampedValue A3 =
+  TimestampedValue.of("A3", Instant.ofEpochMilli(3));
+  private static final TimestampedValue A4 =
+  TimestampedValue.of("A4", Instant.ofEpochMilli(4));
+
+  private final String pTransformId = "pTransformId";
+  private final String stateId = "stateId";
+  private final String encodedWindow = "encodedWindow";
+
+  @Test
+  public void testNoPersistedValues() throws Exception {
+FakeBeamFnStateClient fakeClient = new 
FakeBeamFnStateClient(Collections.emptyMap());
+OrderedListUserState userState =
+new OrderedListUserState<>(
+Caches.noop(),
+fakeClient,
+"instructionId",
+createOrderedListStateKey("A"),
+StringUtf8Coder.of());
+assertThat(userState.read(), is(emptyIterable()));
+  }
+
+  @Test
+  public void testRead() throws Exception {
+FakeBeamFnStateClient fakeClient =
+new FakeBeamFnStateClient(
+TimestampedValueCoder.of(StringUtf8Coder.of()),
+ImmutableMap.of(createOrderedListStateKey("A", 1), asList(A1, 
B1)));
+OrderedListUserState userState =
+new OrderedListUserState<>(
+Caches.noop(),
+fakeClient,
+"instructionId",
+createOrderedListStateKey("A"),
+StringUtf8Coder.of());
+
+assertArrayEquals(
+asList(A1, B1).toArray(), Iterables.toArray(userState.read(), 
TimestampedValue.class));
+userState.asyncClose();
+assertThrows(IllegalStateException.class, () -> userState.read());
+  }
+
+  @Test
+  public void testReadRange() throws Exception {
+FakeBeamFnStateClient fakeClient =
+new FakeBeamFnStateClient(
+TimestampedValueCoder.of(StringUtf8Coder.of()),
+ImmutableMap.of(
+createOrderedListStateKey("A", 1), asList(A1, B1),
+createOrderedListStateKey("A", 4), 
Collections.singletonList(A4),
+createOrderedListStateKey("A", 2), 
Collections.singletonList(A2)));
+
+OrderedListUserState userState =
+new OrderedListUserState<>(
+Caches.noop(),
+fakeClient,
+"instructionId",
+createOrderedListStateKey("A"),
+

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-18 Thread via GitHub


acrites commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1529360968


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java:
##
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static java.util.Arrays.asList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.beam.fn.harness.Caches;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class OrderedListUserStateTest {
+  private static final TimestampedValue A1 =
+  TimestampedValue.of("A1", Instant.ofEpochMilli(1));
+  private static final TimestampedValue B1 =
+  TimestampedValue.of("B1", Instant.ofEpochMilli(1));
+  private static final TimestampedValue A2 =
+  TimestampedValue.of("A2", Instant.ofEpochMilli(2));
+  private static final TimestampedValue B2 =
+  TimestampedValue.of("B2", Instant.ofEpochMilli(2));
+  private static final TimestampedValue A3 =
+  TimestampedValue.of("A3", Instant.ofEpochMilli(3));
+  private static final TimestampedValue A4 =
+  TimestampedValue.of("A4", Instant.ofEpochMilli(4));
+
+  private final String pTransformId = "pTransformId";
+  private final String stateId = "stateId";
+  private final String encodedWindow = "encodedWindow";
+
+  @Test
+  public void testNoPersistedValues() throws Exception {
+FakeBeamFnStateClient fakeClient = new 
FakeBeamFnStateClient(Collections.emptyMap());
+OrderedListUserState userState =
+new OrderedListUserState<>(
+Caches.noop(),
+fakeClient,
+"instructionId",
+createOrderedListStateKey("A"),
+StringUtf8Coder.of());
+assertThat(userState.read(), is(emptyIterable()));
+  }
+
+  @Test
+  public void testRead() throws Exception {
+FakeBeamFnStateClient fakeClient =
+new FakeBeamFnStateClient(
+TimestampedValueCoder.of(StringUtf8Coder.of()),
+ImmutableMap.of(createOrderedListStateKey("A", 1), asList(A1, 
B1)));
+OrderedListUserState userState =
+new OrderedListUserState<>(
+Caches.noop(),
+fakeClient,
+"instructionId",
+createOrderedListStateKey("A"),
+StringUtf8Coder.of());
+
+assertArrayEquals(
+asList(A1, B1).toArray(), Iterables.toArray(userState.read(), 
TimestampedValue.class));
+userState.asyncClose();
+assertThrows(IllegalStateException.class, () -> userState.read());
+  }
+
+  @Test
+  public void testReadRange() throws Exception {
+FakeBeamFnStateClient fakeClient =
+new FakeBeamFnStateClient(
+TimestampedValueCoder.of(StringUtf8Coder.of()),
+ImmutableMap.of(
+createOrderedListStateKey("A", 1), asList(A1, B1),
+createOrderedListStateKey("A", 4), 
Collections.singletonList(A4),
+createOrderedListStateKey("A", 2), 
Collections.singletonList(A2)));
+
+OrderedListUserState userState =
+new OrderedListUserState<>(
+Caches.noop(),
+fakeClient,
+"instructionId",
+createOrderedListStateKey("A"),
+

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-18 Thread via GitHub


acrites commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1529334529


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.Cache;
+import org.apache.beam.fn.harness.Caches;
+import 
org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of a bag user state that utilizes the Beam Fn State API 
to fetch, clear and
+ * persist values.
+ *
+ * Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ */
+public class OrderedListUserState {
+  private final BeamFnStateClient beamFnStateClient;
+  private final StateRequest request;
+  private final Coder valueCoder;
+  private final TimestampedValueCoder timestampedValueCoder;
+  // Pending updates to persistent storage
+  private NavigableMap> pendingAdds = Maps.newTreeMap();
+  private TreeRangeSet pendingRemoves = TreeRangeSet.create();
+
+  private boolean isCleared = false;
+  private boolean isClosed = false;
+
+  public OrderedListUserState(
+  Cache cache,
+  BeamFnStateClient beamFnStateClient,
+  String instructionId,
+  StateKey stateKey,
+  Coder valueCoder) {
+checkArgument(
+stateKey.hasOrderedListUserState(),
+"Expected OrderedListUserState StateKey but received %s.",
+stateKey);
+this.beamFnStateClient = beamFnStateClient;
+this.valueCoder = valueCoder;
+this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder);
+this.request =
+
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
+  }
+
+  public void add(TimestampedValue value) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer usable because it is closed for 
%s",
+request.getStateKey());
+Instant timestamp = value.getTimestamp();
+pendingAdds.putIfAbsent(timestamp, new ArrayList<>());
+pendingAdds.get(timestamp).add(value.getValue());
+  }
+
+  public Iterable> readRange(Instant minTimestamp, Instant 
limitTimestamp) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer usable 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-18 Thread via GitHub


acrites commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1529329576


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.Cache;
+import org.apache.beam.fn.harness.Caches;
+import 
org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of a bag user state that utilizes the Beam Fn State API 
to fetch, clear and
+ * persist values.
+ *
+ * Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ */
+public class OrderedListUserState {
+  private final BeamFnStateClient beamFnStateClient;
+  private final StateRequest request;
+  private final Coder valueCoder;
+  private final TimestampedValueCoder timestampedValueCoder;
+  // Pending updates to persistent storage
+  private NavigableMap> pendingAdds = Maps.newTreeMap();
+  private TreeRangeSet pendingRemoves = TreeRangeSet.create();
+
+  private boolean isCleared = false;
+  private boolean isClosed = false;
+
+  public OrderedListUserState(
+  Cache cache,
+  BeamFnStateClient beamFnStateClient,
+  String instructionId,
+  StateKey stateKey,
+  Coder valueCoder) {
+checkArgument(
+stateKey.hasOrderedListUserState(),
+"Expected OrderedListUserState StateKey but received %s.",
+stateKey);
+this.beamFnStateClient = beamFnStateClient;
+this.valueCoder = valueCoder;
+this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder);
+this.request =
+
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
+  }
+
+  public void add(TimestampedValue value) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer usable because it is closed for 
%s",
+request.getStateKey());
+Instant timestamp = value.getTimestamp();
+pendingAdds.putIfAbsent(timestamp, new ArrayList<>());
+pendingAdds.get(timestamp).add(value.getValue());
+  }
+
+  public Iterable> readRange(Instant minTimestamp, Instant 
limitTimestamp) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer usable 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-18 Thread via GitHub


acrites commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1529328943


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.Cache;
+import org.apache.beam.fn.harness.Caches;
+import 
org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of a bag user state that utilizes the Beam Fn State API 
to fetch, clear and
+ * persist values.
+ *
+ * Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ */
+public class OrderedListUserState {
+  private final BeamFnStateClient beamFnStateClient;
+  private final StateRequest request;
+  private final Coder valueCoder;
+  private final TimestampedValueCoder timestampedValueCoder;
+  // Pending updates to persistent storage
+  private NavigableMap> pendingAdds = Maps.newTreeMap();
+  private TreeRangeSet pendingRemoves = TreeRangeSet.create();
+
+  private boolean isCleared = false;
+  private boolean isClosed = false;
+
+  public OrderedListUserState(
+  Cache cache,
+  BeamFnStateClient beamFnStateClient,
+  String instructionId,
+  StateKey stateKey,
+  Coder valueCoder) {
+checkArgument(
+stateKey.hasOrderedListUserState(),
+"Expected OrderedListUserState StateKey but received %s.",
+stateKey);
+this.beamFnStateClient = beamFnStateClient;
+this.valueCoder = valueCoder;
+this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder);
+this.request =
+
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
+  }
+
+  public void add(TimestampedValue value) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer usable because it is closed for 
%s",
+request.getStateKey());
+Instant timestamp = value.getTimestamp();
+pendingAdds.putIfAbsent(timestamp, new ArrayList<>());
+pendingAdds.get(timestamp).add(value.getValue());
+  }
+
+  public Iterable> readRange(Instant minTimestamp, Instant 
limitTimestamp) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer usable 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-18 Thread via GitHub


acrites commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1529318766


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.Cache;
+import org.apache.beam.fn.harness.Caches;
+import 
org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of a bag user state that utilizes the Beam Fn State API 
to fetch, clear and
+ * persist values.
+ *
+ * Calling {@link #asyncClose()} schedules any required persistence 
changes. This object should
+ * no longer be used after it is closed.
+ *
+ * TODO: Move to an async persist model where persistence is signalled 
based upon cache memory
+ * pressure and its need to flush.
+ */
+public class OrderedListUserState {
+  private final BeamFnStateClient beamFnStateClient;
+  private final StateRequest request;
+  private final Coder valueCoder;
+  private final TimestampedValueCoder timestampedValueCoder;
+  // Pending updates to persistent storage
+  private NavigableMap> pendingAdds = Maps.newTreeMap();
+  private TreeRangeSet pendingRemoves = TreeRangeSet.create();
+
+  private boolean isCleared = false;
+  private boolean isClosed = false;
+
+  public OrderedListUserState(
+  Cache cache,
+  BeamFnStateClient beamFnStateClient,
+  String instructionId,
+  StateKey stateKey,
+  Coder valueCoder) {
+checkArgument(
+stateKey.hasOrderedListUserState(),
+"Expected OrderedListUserState StateKey but received %s.",
+stateKey);
+this.beamFnStateClient = beamFnStateClient;
+this.valueCoder = valueCoder;
+this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder);
+this.request =
+
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
+  }
+
+  public void add(TimestampedValue value) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer usable because it is closed for 
%s",
+request.getStateKey());
+Instant timestamp = value.getTimestamp();
+pendingAdds.putIfAbsent(timestamp, new ArrayList<>());
+pendingAdds.get(timestamp).add(value.getValue());
+  }
+
+  public Iterable> readRange(Instant minTimestamp, Instant 
limitTimestamp) {
+checkState(
+!isClosed,
+"OrderedList user state is no longer usable 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-18 Thread via GitHub


acrites commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1529260758


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java:
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.fn.harness.Cache;
+import org.apache.beam.fn.harness.Caches;
+import 
org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
+import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of a bag user state that utilizes the Beam Fn State API 
to fetch, clear and

Review Comment:
   ordered list



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-18 Thread via GitHub


acrites commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1529260242


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +1103,12 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).

Review Comment:
   Maybe add a comment about these values needing to be within 
`[BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE)`? 
Should these be the default values for these fields?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-18 Thread via GitHub


acrites commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1529258289


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java:
##
@@ -600,8 +603,74 @@ public  MultimapState 
bindMultimap(
   @Override
   public  OrderedListState bindOrderedList(
   String id, StateSpec> spec, Coder elemCoder) {
-throw new UnsupportedOperationException(
-"TODO: Add support for a sorted-list state to the Fn API.");
+return (OrderedListState)
+stateKeyObjectCache.computeIfAbsent(
+createOrderedListUserStateKey(id),
+new Function() {
+  @Override
+  public Object apply(StateKey key) {
+return new OrderedListState() {
+  private final OrderedListUserState impl =
+  createOrderedListUserState(key, elemCoder);
+
+  @Override
+  public void clear() {

Review Comment:
   Do we want a separate notion of "clear"? i.e. one that deletes *all* state 
associated with this OrderedList and not just `clearRange(min, max)`, which 
would only delete all the elements from the OrderedList?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-15 Thread via GitHub


robertwb commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1526756757


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1063,6 +1087,10 @@ message StateAppendRequest {
   // Represents a part of a logical byte stream. Elements within
   // the logical byte stream are encoded in the nested context and
   // multiple append requests are concatenated together.
+  // For OrderedListState, elements of TimeStampedValue should be encoded
+  // with TimestampedValueCoder.of(LengthPrefixCoder.of(Coder)), so that
+  // the request handler knows how to decode timestamps from the data without
+  // decoding the value bytes.

Review Comment:
   Perhaps note that data will be returned in the same format?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-14 Thread via GitHub


github-actions[bot] commented on PR #30317:
URL: https://github.com/apache/beam/pull/30317#issuecomment-1997307049

   Reminder, please take a look at this pr: @robertwb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-06 Thread via GitHub


shunping commented on PR #30317:
URL: https://github.com/apache/beam/pull/30317#issuecomment-1981758955

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-06 Thread via GitHub


github-actions[bot] commented on PR #30317:
URL: https://github.com/apache/beam/pull/30317#issuecomment-1981431117

   Assigning reviewers. If you would like to opt out of this review, comment 
`assign to next reviewer`:
   
   R: @robertwb for label java.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any 
comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review 
comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-04 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1511829304


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1021,6 +1033,29 @@ message StateKey {
 bytes map_key = 5;
   }
 
+  // Represents a request for an ordered list of values associated with a
+  // specified user key and window for a PTransform. See
+  // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
+  // details.
+  //
+  // The response data stream will be a concatenation of all entries of sort 
key
+  // and V's associated with the specified user key and window.
+  // See https://s.apache.org/beam-fn-api-send-and-receive-data for further
+  // details.
+  message OrderedListUserState {
+// (Required) The id of the PTransform containing user state.
+string transform_id = 1;
+// (Required) The id of the user state.
+string user_state_id = 2;
+// (Required) The window encoded in a nested context.
+bytes window = 3;
+// (Required) The key of the currently executing element encoded in a
+// nested context.
+bytes key = 4;
+// (optional) The sort key encoded in a nested context.
+int64 sort_key = 5;

Review Comment:
   Done. The sort_key in the state key is replaced with a sort range.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-04 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1511828608


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}

Review Comment:
   I pushed the new code to reuse Get/Append/Clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-04 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1511828177


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java:
##
@@ -169,6 +172,7 @@ public ResultT get() {
   }
 
   @Override
+  @SuppressWarnings("unchecked")

Review Comment:
   I reverted the changes on those annotations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-03-04 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1511827360


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  new ArrayList<>(
+  orderedListKeys
+  .getOrDefault(request.getStateKey(), new TreeSet<>())
+  .subSet(r.getStart(), true, r.getEnd(), false));
+
+  for (Long l : keysToRemove) {
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+data.remove(keyBuilder.build());
+orderedListKeys.get(request.getStateKey()).remove(l);
+  }
+}
+
+for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+  StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+  
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+  ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+  try {
+InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+  } catch (IOException ex) {
+throw new RuntimeException(ex);
+  }
+  // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


robertwb commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1501142124


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}

Review Comment:
   Thanks. 
   
   IIUC, append is still correct, as there's a bag (with possibly multiple 
items) assigned to every point in the ordered space. (It's a lot like MultiMap 
with the ability to read ranges in order rather than just do point lookups, 
though +1 to not mixing the two.) 
   
   Agree on caching--there are more clever things we can do here in the future, 
but we can punt that to future work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  new ArrayList<>(
+  orderedListKeys
+  .getOrDefault(request.getStateKey(), new TreeSet<>())
+  .subSet(r.getStart(), true, r.getEnd(), false));
+
+  for (Long l : keysToRemove) {
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+data.remove(keyBuilder.build());
+orderedListKeys.get(request.getStateKey()).remove(l);
+  }
+}
+
+for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+  StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+  
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+  ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+  try {
+InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+  } catch (IOException ex) {
+throw new RuntimeException(ex);
+  }
+  // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   ACK. 
   
   I find a better way to do this. Specifically, I can reuse 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  new ArrayList<>(
+  orderedListKeys
+  .getOrDefault(request.getStateKey(), new TreeSet<>())
+  .subSet(r.getStart(), true, r.getEnd(), false));
+
+  for (Long l : keysToRemove) {
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+data.remove(keyBuilder.build());
+orderedListKeys.get(request.getStateKey()).remove(l);
+  }
+}
+
+for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+  StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+  
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+  ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+  try {
+InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+  } catch (IOException ex) {
+throw new RuntimeException(ex);
+  }
+  // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   ACK. 
   
   I find a better way to do this. Specifically, I can reuse 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  new ArrayList<>(
+  orderedListKeys
+  .getOrDefault(request.getStateKey(), new TreeSet<>())
+  .subSet(r.getStart(), true, r.getEnd(), false));
+
+  for (Long l : keysToRemove) {
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+data.remove(keyBuilder.build());
+orderedListKeys.get(request.getStateKey()).remove(l);
+  }
+}
+
+for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+  StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+  
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+  ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+  try {
+InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+  } catch (IOException ex) {
+throw new RuntimeException(ex);
+  }
+  // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   ACK. 
   
   I find a better way to do this. Specifically, I can reuse 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  new ArrayList<>(
+  orderedListKeys
+  .getOrDefault(request.getStateKey(), new TreeSet<>())
+  .subSet(r.getStart(), true, r.getEnd(), false));
+
+  for (Long l : keysToRemove) {
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+data.remove(keyBuilder.build());
+orderedListKeys.get(request.getStateKey()).remove(l);
+  }
+}
+
+for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+  StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+  
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+  ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+  try {
+InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+  } catch (IOException ex) {
+throw new RuntimeException(ex);
+  }
+  // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   ACK. 
   
   I find a better way to do this. Specifically, I can reuse 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  new ArrayList<>(
+  orderedListKeys
+  .getOrDefault(request.getStateKey(), new TreeSet<>())
+  .subSet(r.getStart(), true, r.getEnd(), false));
+
+  for (Long l : keysToRemove) {
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+data.remove(keyBuilder.build());
+orderedListKeys.get(request.getStateKey()).remove(l);
+  }
+}
+
+for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+  StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+  
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+  ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+  try {
+InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+  } catch (IOException ex) {
+throw new RuntimeException(ex);
+  }
+  // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   ACK. I find a better way to avoid the decoding overhead in the fake client 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500878413


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}

Review Comment:
   Sounds good! Thanks a lot for the input, Kenn.
   
   I will go ahead with the changes to re-use Get/Append/Clear for ordered list 
then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;
+}
+
+// A request to update an ordered list
+message OrderedListStateUpdateRequest {
+  // when the request is processed, deletes should always happen before 
inserts.

Review Comment:
   Yep. I am planning to add an addendum to the original design doc to 
summarize the decisions we make here. We should have that after this round of 
review completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;
+}
+
+// A request to update an ordered list
+message OrderedListStateUpdateRequest {
+  // when the request is processed, deletes should always happen before 
inserts.

Review Comment:
   Yep. I am planning to add an addendum to the original design doc to 
summarize the decisions we make here. We should have that after the this round 
of review completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;
+}
+
+// A request to update an ordered list
+message OrderedListStateUpdateRequest {
+  // when the request is processed, deletes should always happen before 
inserts.

Review Comment:
   Yep. I am planning to add an addendum to the original design doc to 
summarize the decisions we make here after the this round of review completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;
+}
+
+// A request to update an ordered list
+message OrderedListStateUpdateRequest {
+  // when the request is processed, deletes should always happen before 
inserts.

Review Comment:
   Yep. I am planning to add an addendum to the original design to summarize 
the decision we make here after the this round of review completes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


kennknowles commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500851162


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}

Review Comment:
   OK my responses are evolving now that I've read the whole code change and 
re-read the doc.
   
- I see specialized requests are broken out anyhow, so that's fine.
- Including everything needed for caching in the state key is good for raw 
request caching, so re-using Get is good. Though there are perhaps smarter ways 
to cache that won't benefit from this
- "append" is still a fine method for adding things to ordered list state, 
but it isn't important and the name is misleading (as it is for bags and 
multimaps, since they are not ordered, so anyhow it is the same here and might 
as well keep the incorrect naming)
- Obviously `clear` is fine



##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;
+}
+
+// A request to update an ordered list
+message OrderedListStateUpdateRequest {
+  // when the request is processed, deletes should always happen before 
inserts.

Review Comment:
   It would be helpful to outline the pro/con in the design doc of little 
decisions like, and note which one was chosen and why.
   
   For example one benefit to splitting the requests is to avoid ordering 
issues. We would have to spec that either the inserts or deletes happen first, 
even though they are in one request together. It is a bit confusing. And then 
if you want them in the other order, you still have to make two requests but 
each one has an empty field.
   
   And note whether there is an efficiency consideration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


kennknowles commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500844153


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}

Review Comment:
   The one counterpoint is that a state key is used for caching, as long as the 
spec is that a state key is deterministic in what it returns. So it has value 
for `get`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-23 Thread via GitHub


kennknowles commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1500839373


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}

Review Comment:
   I still agree with my comment there: "it looks like a 'do everything' 
request/response protocol and the key holds all the information." That comment 
may seem to be about style, but it is really about transparency, debuggability, 
readability.
   
   I view whatever "consistency" we have amongst the state requests as 
coincidence at best and a mistake at worst. _The_ reason that different kinds 
of state exist is because they support different methods. There is a reason 
that the only method they have in common in Java (where it originated) is 
`clear()`. Apparently the design decision was to model `methodA, methodB, 
methodC` as `oneBigMethod("actually do A"), oneBigMethod("actually do B"), 
oneBigMethod("actually do C")`. I just think that is not as good as being 
straightforward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-21 Thread via GitHub


robertwb commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1498139008


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}

Review Comment:
   I prefer being consistent. If we want to restructure things to break 
everything out, we can have that discussion, but it doesn't seem a clear enough 
win to break consistency. (If the range is in the ordered list state state key, 
the existing Get/Append/Clear will work just fine, no need for optional fields.)
   
   @kennknowles for your thoughts too. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-21 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1498124213


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {

Review Comment:
   Ack!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-21 Thread via GitHub


robertwb commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1498123563


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  new ArrayList<>(
+  orderedListKeys
+  .getOrDefault(request.getStateKey(), new TreeSet<>())
+  .subSet(r.getStart(), true, r.getEnd(), false));
+
+  for (Long l : keysToRemove) {
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+data.remove(keyBuilder.build());
+orderedListKeys.get(request.getStateKey()).remove(l);
+  }
+}
+
+for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+  StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+  
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+  ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+  try {
+InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+  } catch (IOException ex) {
+throw new RuntimeException(ex);
+  }
+  // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   I am not opposed to encode/decode overhead to make things totally clear 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-21 Thread via GitHub


robertwb commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1498119280


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {

Review Comment:
   I'm just suggesting `String::isEmpty()` over `String::size() > 0`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-21 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1498118154


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java:
##
@@ -169,6 +172,7 @@ public ResultT get() {
   }
 
   @Override
+  @SuppressWarnings("unchecked")

Review Comment:
   Right. Nothing changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-21 Thread via GitHub


robertwb commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1498116158


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java:
##
@@ -169,6 +172,7 @@ public ResultT get() {
   }
 
   @Override
+  @SuppressWarnings("unchecked")

Review Comment:
   But nothing changed in this code, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-21 Thread via GitHub


robertwb commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1498107366


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1021,6 +1033,29 @@ message StateKey {
 bytes map_key = 5;
   }
 
+  // Represents a request for an ordered list of values associated with a
+  // specified user key and window for a PTransform. See
+  // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
+  // details.
+  //
+  // The response data stream will be a concatenation of all entries of sort 
key
+  // and V's associated with the specified user key and window.
+  // See https://s.apache.org/beam-fn-api-send-and-receive-data for further
+  // details.
+  message OrderedListUserState {
+// (Required) The id of the PTransform containing user state.
+string transform_id = 1;
+// (Required) The id of the user state.
+string user_state_id = 2;
+// (Required) The window encoded in a nested context.
+bytes window = 3;
+// (Required) The key of the currently executing element encoded in a
+// nested context.
+bytes key = 4;
+// (optional) The sort key encoded in a nested context.
+int64 sort_key = 5;

Review Comment:
   We shouldn't put something in the protos that's only used for the fake. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-21 Thread via GitHub


robertwb commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1498106140


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;

Review Comment:
   Yes, that's what I meant by mutually exclusive. If we allow providing both, 
we have to figure out what to do when they disagree, and there's no good 
usecase for that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492990264


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1021,6 +1033,29 @@ message StateKey {
 bytes map_key = 5;
   }
 
+  // Represents a request for an ordered list of values associated with a
+  // specified user key and window for a PTransform. See
+  // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
+  // details.
+  //
+  // The response data stream will be a concatenation of all entries of sort 
key
+  // and V's associated with the specified user key and window.
+  // See https://s.apache.org/beam-fn-api-send-and-receive-data for further
+  // details.
+  message OrderedListUserState {
+// (Required) The id of the PTransform containing user state.
+string transform_id = 1;
+// (Required) The id of the user state.
+string user_state_id = 2;
+// (Required) The window encoded in a nested context.
+bytes window = 3;
+// (Required) The key of the currently executing element encoded in a
+// nested context.
+bytes key = 4;
+// (optional) The sort key encoded in a nested context.
+int64 sort_key = 5;

Review Comment:
   The sort_key here is only used by FakeBeamFnStateClient. The purpose is to 
store the internal data the same way as MultiMap.
   
   We don't need it for any Insert/Update as the range/key is included in the 
corresponding messages.
   
https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L1122
   
https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L1131
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492924566


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  new ArrayList<>(
+  orderedListKeys
+  .getOrDefault(request.getStateKey(), new TreeSet<>())
+  .subSet(r.getStart(), true, r.getEnd(), false));
+
+  for (Long l : keysToRemove) {
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+data.remove(keyBuilder.build());
+orderedListKeys.get(request.getStateKey()).remove(l);
+  }
+}
+
+for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+  StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+  
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+  ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+  try {
+InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+  } catch (IOException ex) {
+throw new RuntimeException(ex);
+  }
+  // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   Timestamp is the sort key here. As our OrderedListState interface is based 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492924566


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  new ArrayList<>(
+  orderedListKeys
+  .getOrDefault(request.getStateKey(), new TreeSet<>())
+  .subSet(r.getStart(), true, r.getEnd(), false));
+
+  for (Long l : keysToRemove) {
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+keyBuilder.getOrderedListUserStateBuilder().setSortKey(l);
+data.remove(keyBuilder.build());
+orderedListKeys.get(request.getStateKey()).remove(l);
+  }
+}
+
+for (OrderedListEntry e : 
request.getOrderedListUpdate().getInsertsList()) {
+  StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+  
keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey());
+
+  ByteStringOutputStream outStream = new ByteStringOutputStream();
+
+  try {
+InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), 
outStream);
+  } catch (IOException ex) {
+throw new RuntimeException(ex);
+  }
+  // In the response, the value encoded bytes are placed before the 
timestamp encoded bytes.

Review Comment:
   Timestamp is the sort key here. As our OrderedListState interface is based 

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492822517


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");

Review Comment:
   See my comment in 
https://github.com/apache/beam/pull/30317#discussion_r1492631642



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492822517


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");

Review Comment:
   See my comment in 
https://github.com/apache/beam/pull/30317#discussion_r1492631642



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492821907


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {

Review Comment:
   The code here is to check if there is a non-empty continuation token. An 
empty continuation token in the get request suggests it should return the first 
block in the query range. Non-empty continuation token here contains 
information of which block it is requesting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492762927


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");

Review Comment:
   See my comment in a previous thread: 
https://github.com/apache/beam/pull/30317#discussion_r1492631642



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492761288


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;
+}
+
+// A request to update an ordered list
+message OrderedListStateUpdateRequest {
+  // when the request is processed, deletes should always happen before 
inserts.

Review Comment:
   Discussion 2 in the design doc talked about this, but there is no conclusion 
on that 
https://docs.google.com/document/d/1U77sAvE6Iy9XsVruRYHxPdFji7nqS6HPi1XU8fhyrxs/
   
   I am fine with splitting them though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492753170


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}

Review Comment:
   There is a discussion for this in the original design doc. 
https://docs.google.com/document/d/1U77sAvE6Iy9XsVruRYHxPdFji7nqS6HPi1XU8fhyrxs/edit?disco=H0MsNik
   
   I think @kennknowles mentioned he preferred separate messages.
   
   Personally, I prefer the way you propose to re-use the Get/Append/Clear 
messages by putting optional fields there for ordered list state. 
   
   However, I hesitate to do that because (1) it will introduce new optional 
fields to an existing proto message and (2) regarding extensibility and 
flexibility, it may be easier to have separate messages for ordered list and 
any future states that doesn't fit in current Get/Append/Clear paradigm?
   
   WDYT?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492734368


##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;

Review Comment:
   Nice catch. Will delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492733026


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java:
##
@@ -169,6 +172,7 @@ public ResultT get() {
   }
 
   @Override
+  @SuppressWarnings("unchecked")

Review Comment:
   I got warning message on each of these binding functions. For example in the 
code I wrote
   
https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L612
   
   the warning message is as follows:
   ```
   Unchecked cast: 'java.lang.Object' to 
'org.apache.beam.sdk.state.OrderedListState' 
Inspection info: Reports code on which an unchecked warning will be issued 
by the javac compiler. Every unchecked warning may potentially trigger 
ClassCastException at runtime
   ```
   Basically, the compiler complains that we convert an object (from the state 
key cache) into the state class we bind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492637135


##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java:
##
@@ -598,13 +606,81 @@ public  MultimapState 
bindMultimap(
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public  OrderedListState bindOrderedList(
   String id, StateSpec> spec, Coder elemCoder) {
-throw new UnsupportedOperationException(
-"TODO: Add support for a sorted-list state to the Fn API.");
+return (OrderedListState)
+stateKeyObjectCache.computeIfAbsent(
+createOrderedListUserStateKey(id),
+new Function() {
+  @Override
+  public Object apply(StateKey key) {
+return new OrderedListState() {
+  private final OrderedListUserState impl =
+  createOrderedListUserState(key, elemCoder);
+
+  @Override
+  public void clear() {
+clearRange(
+BoundedWindow.TIMESTAMP_MIN_VALUE, 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+  }
+
+  @Override
+  public void add(TimestampedValue value) {
+impl.add(value);
+  }
+
+  @Override
+  public ReadableState isEmpty() {
+return new ReadableState() {
+  @Override
+  public @Nullable Boolean read() {
+return !impl.read().iterator().hasNext();
+  }
+
+  @Override
+  public ReadableState readLater() {
+return this;
+  }
+};
+  }
+
+  @Nullable
+  @Override
+  public Iterable> read() {
+return readRange(
+BoundedWindow.TIMESTAMP_MIN_VALUE, 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+  }
+
+  @Override
+  public GroupingState, 
Iterable>>
+  readLater() {
+throw new UnsupportedOperationException();

Review Comment:
   Ok. Will revise it accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492631642


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;

Review Comment:
   >  comment on continuation token ...
   
   Good idea! I will add that when I revise the code.
   
   > Do we want to require returning the range if there's a continuation token 
involved? Or should they be mutually exclusive?
   
   I don't think the range is a hard requirement. 
   In fact in my simple implementation of the fake client, I put the current 
sort key and the current block index into the continuation token: 
https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java#L228.
 
   In other implementation, you may need the range, but I think it is 
implementation-dependent. That's why I hesitate to put a range/sort key as a 
separate field when continuation token is present.
   
   I thought the continuation token should allow the runner to uniquely 
identify where to find the next block of data. Is that what you mean by 
"mutually exclusive"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492631642


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;

Review Comment:
   >  comment on continuation token ...
   
   Good idea! I will add that when I revise the code.
   
   > Do we want to require returning the range if there's a continuation token 
involved? Or should they be mutually exclusive?
   
   I don't think the range is a hard requirement. 
   In fact in my simple implementation of the fake client, I put the current 
sort key and the current block index into the continuation token: 
https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java#L228.
 
   In other implementation, you may need the range, but I think it is 
implementation-dependent. That's why I hesitate to put a range/sort key as a 
separate field when continuation token is present.
   
   I thought the continuation token should allow the runner to uniquely 
identify where to find the next block of data.Is that what you mean by 
"mutually exclusive"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492631642


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;

Review Comment:
   >  comment on continuation token ...
   Good idea! I will add that when I revise the code.
   
   > Do we want to require returning the range if there's a continuation token 
involved? Or should they be mutually exclusive?
   
   I don't think the range is a hard requirement. 
   In fact in my simple implementation of the fake client, I put the current 
sort key and the current block index into the continuation token: 
https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java#L228.
 
   In other implementation, you may need the range, but I think it is 
implementation-dependent. That's also why I hesitate to put a range/sort key as 
a separate field when continuation token is present.
   
   I thought the continuation token should allow the runner to uniquely 
identify where to find the next block of data.Is that what you mean by 
"mutually exclusive"?



##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;

Review Comment:
   >  comment on continuation token ...
   
   Good idea! I will add that when I revise the code.
   
   > Do we want to require returning the range if there's a continuation token 
involved? Or should they be mutually exclusive?
   
   I don't think the range is a hard requirement. 
   In fact in my simple implementation of the fake client, I put the current 
sort key and the current block index into the continuation token: 
https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java#L228.
 
   In other implementation, you may need the range, but I think it is 
implementation-dependent. That's also why I hesitate to put a range/sort key as 
a separate field when continuation token is present.
   
   I thought the continuation token should allow the runner to uniquely 
identify where to find the next block of data.Is that what you mean by 
"mutually exclusive"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492601187


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;

Review Comment:
   > how do we return multiple elements if the request was a range?
   
   We concatenate the encoded entries in a byte array and send them back in 
chunks with corresponding continuation token. 
   
   > should this be repeated OrderedListEntry instead of data?
   
   I have considered this option. Besides the efficiency reason @robertwb  
mentioned, I also find that representing the response as bytes has an advantage 
of reusing the existing code in 
https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java.
 This iterator is used to parsed the returned data block (in not only 
OrderedListState but also Multimap, Bag, etc) and it supports blocks even if 
the boundary is not aligned with entries/elements. I think this is not 
achievable with OrderedListEntry 
   
   > But the coding should be specified (e.g. is this a concatenation of 
several encoded KVs
   
   That's right. It is basically the concatenation of encoded 
TimestampedValue, since TimestampedValue is already in use in the sdk 
interface of OrderedListState 
https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java#L29
   
   



##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;

Review Comment:
   > how do we return multiple elements if the request was a range?
   
   We concatenate the encoded entries in a byte array and send them back in 
chunks with corresponding continuation token. 
   
   > should this be repeated OrderedListEntry instead of data?
   
   I have considered this option. Besides the efficiency reason @robertwb  
mentioned, I also find that representing the response as bytes has an advantage 
of reusing the existing code in 
https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java.
 This iterator is used to parsed the returned data block (in not only 
OrderedListState but also Multimap, Bag, etc) and it supports blocks even if 
the boundary is not aligned with entries/elements. I think this is not 
achievable with OrderedListEntry. 
   
   > But the coding should be specified (e.g. is this a concatenation of 
several encoded KVs
   
   That's right. It is basically the concatenation of encoded 
TimestampedValue, since TimestampedValue is already in use in the sdk 
interface of OrderedListState 
https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java#L29
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492601187


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;

Review Comment:
   > how do we return multiple elements if the request was a range?
   
   We concatenate the encoded entries in a byte array and send them back in 
chunks with corresponding continuation token. 
   
   > should this be repeated OrderedListEntry instead of data?
   
   I have considered this option. Besides the efficiency reason @robertwb  
mentioned, I also find that representing the the response as bytes has an 
advantage of reusing the existing code in 
https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java.
 This iterator is used to parsed the returned data block (in not only 
OrderedListState but also Multimap, Bag, etc) and it supports blocks even if 
the boundary is not aligned with entries/elements. I think this is achievable 
with OrderedListEntry 
   
   > But the coding should be specified (e.g. is this a concatenation of 
several encoded KVs
   
   That's right. It is basically the concatenation of encoded 
TimestampedValue, since TimestampedValue is already in use in the sdk 
interface of OrderedListState 
https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java#L29
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492601187


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;

Review Comment:
   > how do we return multiple elements if the request was a range?
   
   We concatenate the encoded entries in a byte array and send them back in 
chunks with corresponding continuation token. 
   
   > should this be repeated OrderedListEntry instead of data?
   
   I have considered this option. Besides the efficiency reason @robertwb  
mentioned, I also find that representing the response as bytes has an advantage 
of reusing the existing code in 
https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java.
 This iterator is used to parsed the returned data block (in not only 
OrderedListState but also Multimap, Bag, etc) and it supports blocks even if 
the boundary is not aligned with entries/elements. I think this is achievable 
with OrderedListEntry 
   
   > But the coding should be specified (e.g. is this a concatenation of 
several encoded KVs
   
   That's right. It is basically the concatenation of encoded 
TimestampedValue, since TimestampedValue is already in use in the sdk 
interface of OrderedListState 
https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java#L29
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-16 Thread via GitHub


shunping commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1492601187


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;

Review Comment:
   > how do we return multiple elements if the request was a range?
   We concatenate the encoded entries in a byte array and send them back in 
chunks with corresponding continuation token. 
   
   > should this be repeated OrderedListEntry instead of data?
   
   I have considered this option. Besides the efficiency reason @robertwb  
mentioned, I also find that representing the the response as bytes has an 
advantage of reusing the existing code in 
https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java.
 This iterator is used to parsed the returned data block (in not only 
OrderedListState but also Multimap, Bag, etc) and it supports blocks even if 
the boundary is not aligned with entries/elements. I think this is achievable 
with OrderedListEntry 
   
   > But the coding should be specified (e.g. is this a concatenation of 
several encoded KVs
   
   That's right. It is basically the concatenation of encoded 
TimestampedValue, since TimestampedValue is already in use in the sdk 
interface of OrderedListState 
https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java#L29
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-15 Thread via GitHub


robertwb commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1491730995


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1021,6 +1033,29 @@ message StateKey {
 bytes map_key = 5;
   }
 
+  // Represents a request for an ordered list of values associated with a
+  // specified user key and window for a PTransform. See
+  // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further
+  // details.
+  //
+  // The response data stream will be a concatenation of all entries of sort 
key
+  // and V's associated with the specified user key and window.
+  // See https://s.apache.org/beam-fn-api-send-and-receive-data for further
+  // details.
+  message OrderedListUserState {
+// (Required) The id of the PTransform containing user state.
+string transform_id = 1;
+// (Required) The id of the user state.
+string user_state_id = 2;
+// (Required) The window encoded in a nested context.
+bytes window = 3;
+// (Required) The key of the currently executing element encoded in a
+// nested context.
+bytes key = 4;
+// (optional) The sort key encoded in a nested context.
+int64 sort_key = 5;

Review Comment:
   What is the semantics of providing a sort_key? Should we allow a range 
instead?



##
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java:
##
@@ -174,6 +215,124 @@ public CompletableFuture 
handle(StateRequest.Builder requestBuild
 response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
 break;
 
+  case ORDERED_LIST_GET:
+{
+  long start = request.getOrderedListGet().getRange().getStart();
+  long end = request.getOrderedListGet().getRange().getEnd();
+
+  KvCoder coder = KvCoder.of(VarLongCoder.of(), 
VarIntCoder.of());
+  long sortKey = start;
+  int index = 0;
+  if (request.getOrderedListGet().getContinuationToken().size() > 0) {
+try {
+  // The continuation format here is the sort key (long) followed 
by an index (int)
+  KV cursor =
+  
coder.decode(request.getOrderedListGet().getContinuationToken().newInput());
+  sortKey = cursor.getKey();
+  index = cursor.getValue();
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  ByteString continuationToken;
+  ByteString returnBlock = ByteString.EMPTY;
+  ;
+  try {
+if (sortKey < start || sortKey >= end) {
+  throw new IndexOutOfBoundsException("sort key out of range");
+}
+
+NavigableSet subset =
+orderedListKeys
+.getOrDefault(request.getStateKey(), new TreeSet<>())
+.subSet(sortKey, true, end, false);
+
+// get the effective sort key currently, can throw 
NoSuchElementException
+Long nextSortKey = subset.first();
+
+StateKey.Builder keyBuilder = request.getStateKey().toBuilder();
+
keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey);
+List byteStrings =
+data.getOrDefault(keyBuilder.build(), 
Collections.singletonList(ByteString.EMPTY));
+
+// get the block specified in continuation token, can throw 
IndexOutOfBoundsException
+returnBlock = byteStrings.get(index);
+
+if (byteStrings.size() > index + 1) {
+  // more blocks from this sort key
+  index += 1;
+} else {
+  // finish navigating the current sort key and need to find the 
next one,
+  // can throw NoSuchElementException
+  nextSortKey = subset.tailSet(nextSortKey, false).first();
+  index = 0;
+}
+
+ByteStringOutputStream outputStream = new ByteStringOutputStream();
+try {
+  KV cursor = KV.of(nextSortKey, index);
+  coder.encode(cursor, outputStream);
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+continuationToken = outputStream.toByteString();
+  } catch (NoSuchElementException | IndexOutOfBoundsException e) {
+continuationToken = ByteString.EMPTY;
+  }
+  response =
+  StateResponse.newBuilder()
+  .setOrderedListGet(
+  OrderedListStateGetResponse.newBuilder()
+  .setData(returnBlock)
+  .setContinuationToken(continuationToken));
+}
+break;
+
+  case ORDERED_LIST_UPDATE:
+for (OrderedListRange r : 
request.getOrderedListUpdate().getDeletesList()) {
+  List keysToRemove =
+  

Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-15 Thread via GitHub


scwhittle commented on code in PR #30317:
URL: https://github.com/apache/beam/pull/30317#discussion_r1491549772


##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;

Review Comment:
   comment on continuation token, ie should be returned by previous response 
and the key/range should match the previous request generating the token



##
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java:
##
@@ -598,13 +606,81 @@ public  MultimapState 
bindMultimap(
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public  OrderedListState bindOrderedList(
   String id, StateSpec> spec, Coder elemCoder) {
-throw new UnsupportedOperationException(
-"TODO: Add support for a sorted-list state to the Fn API.");
+return (OrderedListState)
+stateKeyObjectCache.computeIfAbsent(
+createOrderedListUserStateKey(id),
+new Function() {
+  @Override
+  public Object apply(StateKey key) {
+return new OrderedListState() {
+  private final OrderedListUserState impl =
+  createOrderedListUserState(key, elemCoder);
+
+  @Override
+  public void clear() {
+clearRange(
+BoundedWindow.TIMESTAMP_MIN_VALUE, 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+  }
+
+  @Override
+  public void add(TimestampedValue value) {
+impl.add(value);
+  }
+
+  @Override
+  public ReadableState isEmpty() {
+return new ReadableState() {
+  @Override
+  public @Nullable Boolean read() {
+return !impl.read().iterator().hasNext();
+  }
+
+  @Override
+  public ReadableState readLater() {
+return this;
+  }
+};
+  }
+
+  @Nullable
+  @Override
+  public Iterable> read() {
+return readRange(
+BoundedWindow.TIMESTAMP_MIN_VALUE, 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+  }
+
+  @Override
+  public GroupingState, 
Iterable>>
+  readLater() {
+throw new UnsupportedOperationException();

Review Comment:
   just doing nothing is valid here and seems better than an exception, ditto 
below



##
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##
@@ -1075,6 +,42 @@ message StateClearRequest {}
 // A response to clear state.
 message StateClearResponse {}
 
+// A message describes a sort key range [start, end).
+message OrderedListRange {
+  int64 start = 1;
+  int64 end = 2;
+}
+
+// A data entry which is tagged with a sort key.
+message OrderedListEntry {
+  int64 sort_key = 1;
+  bytes data = 2;
+}
+
+// This request will fetch an ordered list with a sort key range. If the
+// timestamp range is not specified, the runner should use [MIN_INT64,
+// MAX_INT64) by default.
+message OrderedListStateGetRequest {
+  bytes continuation_token = 1;
+  OrderedListRange range = 2;
+}
+
+// A response to the get state request for an ordered list.
+message OrderedListStateGetResponse {
+  bytes continuation_token = 1;
+  bytes data = 2;

Review Comment:
   how do we return multiple elements if the request was a range?
   
   should we return the sort-key for elements, that seems like part of the 
user-data for example if it's some id/timestamp the user might want it back 
instead of having to duplicate it in the payload as well.
   
   should this be repeated OrderedListEntry instead of data?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement ordered list state for FnApi. [beam]

2024-02-15 Thread via GitHub


shunping commented on PR #30317:
URL: https://github.com/apache/beam/pull/30317#issuecomment-1946576345

   Run Python PreCommit 3.9


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org