Re: [PR] Implement ordered list state for FnApi. [beam]
github-actions[bot] commented on PR #30317: URL: https://github.com/apache/beam/pull/30317#issuecomment-2112557161 Stopping reviewer notifications for this pull request: requested by reviewer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on PR #30317: URL: https://github.com/apache/beam/pull/30317#issuecomment-2112554061 stop reviewer notifications -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
github-actions[bot] commented on PR #30317: URL: https://github.com/apache/beam/pull/30317#issuecomment-2112367951 Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`: R: @kennknowles for label java. Available commands: - `stop reviewer notifications` - opt out of the automated review tooling - `remind me after tests pass` - tag the comment author after tests pass - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
github-actions[bot] commented on PR #30317: URL: https://github.com/apache/beam/pull/30317#issuecomment-2105726584 Reminder, please take a look at this pr: @Abacn -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1588300082 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.harness.Cache; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.fn.stream.PrefetchableIterable; +import org.apache.beam.sdk.fn.stream.PrefetchableIterables; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet; +import org.joda.time.Instant; + +/** + * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear and + * persist values. + * + * Calling {@link #asyncClose()} schedules any required persistence changes. This object should + * no longer be used after it is closed. + * + * TODO: Move to an async persist model where persistence is signalled based upon cache memory + * pressure and its need to flush. + */ +public class OrderedListUserState { + private final BeamFnStateClient beamFnStateClient; + private final StateRequest request; + private final Coder valueCoder; + private final TimestampedValueCoder timestampedValueCoder; + // Pending updates to persistent storage + private NavigableMap> pendingAdds = Maps.newTreeMap(); + private TreeRangeSet pendingRemoves = TreeRangeSet.create(); + + private boolean isCleared = false; + private boolean isClosed = false; + + public OrderedListUserState( + Cache cache, + BeamFnStateClient beamFnStateClient, + String instructionId, + StateKey stateKey, + Coder valueCoder) { +checkArgument( +stateKey.hasOrderedListUserState(), +"Expected OrderedListUserState StateKey but received %s.", +stateKey); +this.beamFnStateClient = beamFnStateClient; +this.valueCoder = valueCoder; +this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder); +this.request = + StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build(); + } + + public void add(TimestampedValue value) { +checkState( +!isClosed, +"OrderedList user state is no longer usable because it is closed for %s", +request.getStateKey()); +Instant timestamp = value.getTimestamp(); +pendingAdds.putIfAbsent(timestamp, new ArrayList<>()); +pendingAdds.get(timestamp).add(value.getValue()); + } + + public Iterable> readRange(Instant minTimestamp, Instant limitTimestamp) { +checkState( +!isClosed, +"OrderedList user state is no longer
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1588300082 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.harness.Cache; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.fn.stream.PrefetchableIterable; +import org.apache.beam.sdk.fn.stream.PrefetchableIterables; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet; +import org.joda.time.Instant; + +/** + * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear and + * persist values. + * + * Calling {@link #asyncClose()} schedules any required persistence changes. This object should + * no longer be used after it is closed. + * + * TODO: Move to an async persist model where persistence is signalled based upon cache memory + * pressure and its need to flush. + */ +public class OrderedListUserState { + private final BeamFnStateClient beamFnStateClient; + private final StateRequest request; + private final Coder valueCoder; + private final TimestampedValueCoder timestampedValueCoder; + // Pending updates to persistent storage + private NavigableMap> pendingAdds = Maps.newTreeMap(); + private TreeRangeSet pendingRemoves = TreeRangeSet.create(); + + private boolean isCleared = false; + private boolean isClosed = false; + + public OrderedListUserState( + Cache cache, + BeamFnStateClient beamFnStateClient, + String instructionId, + StateKey stateKey, + Coder valueCoder) { +checkArgument( +stateKey.hasOrderedListUserState(), +"Expected OrderedListUserState StateKey but received %s.", +stateKey); +this.beamFnStateClient = beamFnStateClient; +this.valueCoder = valueCoder; +this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder); +this.request = + StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build(); + } + + public void add(TimestampedValue value) { +checkState( +!isClosed, +"OrderedList user state is no longer usable because it is closed for %s", +request.getStateKey()); +Instant timestamp = value.getTimestamp(); +pendingAdds.putIfAbsent(timestamp, new ArrayList<>()); +pendingAdds.get(timestamp).add(value.getValue()); + } + + public Iterable> readRange(Instant minTimestamp, Instant limitTimestamp) { +checkState( +!isClosed, +"OrderedList user state is no longer
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1588140845 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java: ## @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static java.util.Arrays.asList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertThrows; + +import java.io.IOException; +import java.util.Collections; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class OrderedListUserStateTest { + private static final TimestampedValue A1 = + TimestampedValue.of("A1", Instant.ofEpochMilli(1)); + private static final TimestampedValue B1 = + TimestampedValue.of("B1", Instant.ofEpochMilli(1)); + private static final TimestampedValue A2 = + TimestampedValue.of("A2", Instant.ofEpochMilli(2)); + private static final TimestampedValue B2 = + TimestampedValue.of("B2", Instant.ofEpochMilli(2)); + private static final TimestampedValue A3 = + TimestampedValue.of("A3", Instant.ofEpochMilli(3)); + private static final TimestampedValue A4 = + TimestampedValue.of("A4", Instant.ofEpochMilli(4)); + + private final String pTransformId = "pTransformId"; + private final String stateId = "stateId"; + private final String encodedWindow = "encodedWindow"; + + @Test + public void testNoPersistedValues() throws Exception { +FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(Collections.emptyMap()); +OrderedListUserState userState = +new OrderedListUserState<>( +Caches.noop(), +fakeClient, +"instructionId", +createOrderedListStateKey("A"), +StringUtf8Coder.of()); +assertThat(userState.read(), is(emptyIterable())); + } + + @Test + public void testRead() throws Exception { +FakeBeamFnStateClient fakeClient = +new FakeBeamFnStateClient( +TimestampedValueCoder.of(StringUtf8Coder.of()), +ImmutableMap.of(createOrderedListStateKey("A", 1), asList(A1, B1))); +OrderedListUserState userState = +new OrderedListUserState<>( +Caches.noop(), +fakeClient, +"instructionId", +createOrderedListStateKey("A"), +StringUtf8Coder.of()); + +assertArrayEquals( +asList(A1, B1).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); +userState.asyncClose(); +assertThrows(IllegalStateException.class, () -> userState.read()); + } + + @Test + public void testReadRange() throws Exception { +FakeBeamFnStateClient fakeClient = +new FakeBeamFnStateClient( +TimestampedValueCoder.of(StringUtf8Coder.of()), +ImmutableMap.of( +createOrderedListStateKey("A", 1), asList(A1, B1), +createOrderedListStateKey("A", 4), Collections.singletonList(A4), +createOrderedListStateKey("A", 2), Collections.singletonList(A2))); + +OrderedListUserState userState = +new OrderedListUserState<>( +Caches.noop(), +fakeClient, +"instructionId", +createOrderedListStateKey("A"), +
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1588027273 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## Review Comment: Yes. I think we will need a separate one for orderedlist because although orderedlist and multimap have similar semantics, the code paths are totally different. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1588027273 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## Review Comment: Yes. I think we will need a separate one for orderedlist because although orderedlist and multimap have similar semantics, the implementation is different. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1588084688 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.harness.Cache; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.fn.stream.PrefetchableIterable; +import org.apache.beam.sdk.fn.stream.PrefetchableIterables; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet; +import org.joda.time.Instant; + +/** + * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear and Review Comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1588080541 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java: ## @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static java.util.Arrays.asList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertThrows; + +import java.io.IOException; +import java.util.Collections; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class OrderedListUserStateTest { + private static final TimestampedValue A1 = + TimestampedValue.of("A1", Instant.ofEpochMilli(1)); + private static final TimestampedValue B1 = + TimestampedValue.of("B1", Instant.ofEpochMilli(1)); + private static final TimestampedValue A2 = + TimestampedValue.of("A2", Instant.ofEpochMilli(2)); + private static final TimestampedValue B2 = + TimestampedValue.of("B2", Instant.ofEpochMilli(2)); + private static final TimestampedValue A3 = + TimestampedValue.of("A3", Instant.ofEpochMilli(3)); + private static final TimestampedValue A4 = + TimestampedValue.of("A4", Instant.ofEpochMilli(4)); + + private final String pTransformId = "pTransformId"; + private final String stateId = "stateId"; + private final String encodedWindow = "encodedWindow"; + + @Test + public void testNoPersistedValues() throws Exception { +FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(Collections.emptyMap()); +OrderedListUserState userState = +new OrderedListUserState<>( +Caches.noop(), +fakeClient, +"instructionId", +createOrderedListStateKey("A"), +StringUtf8Coder.of()); +assertThat(userState.read(), is(emptyIterable())); + } + + @Test + public void testRead() throws Exception { +FakeBeamFnStateClient fakeClient = +new FakeBeamFnStateClient( +TimestampedValueCoder.of(StringUtf8Coder.of()), +ImmutableMap.of(createOrderedListStateKey("A", 1), asList(A1, B1))); +OrderedListUserState userState = +new OrderedListUserState<>( +Caches.noop(), +fakeClient, +"instructionId", +createOrderedListStateKey("A"), +StringUtf8Coder.of()); + +assertArrayEquals( +asList(A1, B1).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); +userState.asyncClose(); +assertThrows(IllegalStateException.class, () -> userState.read()); + } + + @Test + public void testReadRange() throws Exception { +FakeBeamFnStateClient fakeClient = +new FakeBeamFnStateClient( +TimestampedValueCoder.of(StringUtf8Coder.of()), +ImmutableMap.of( +createOrderedListStateKey("A", 1), asList(A1, B1), +createOrderedListStateKey("A", 4), Collections.singletonList(A4), +createOrderedListStateKey("A", 2), Collections.singletonList(A2))); + +OrderedListUserState userState = +new OrderedListUserState<>( +Caches.noop(), +fakeClient, +"instructionId", +createOrderedListStateKey("A"), +
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1588027273 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## Review Comment: Yes. I think we will need a separate one for orderedlist because although orderedlist and multimap has similar semantics, the implementation is different. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1587995856 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.harness.Cache; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.fn.stream.PrefetchableIterable; +import org.apache.beam.sdk.fn.stream.PrefetchableIterables; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet; +import org.joda.time.Instant; + +/** + * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear and + * persist values. + * + * Calling {@link #asyncClose()} schedules any required persistence changes. This object should + * no longer be used after it is closed. + * + * TODO: Move to an async persist model where persistence is signalled based upon cache memory + * pressure and its need to flush. + */ +public class OrderedListUserState { + private final BeamFnStateClient beamFnStateClient; + private final StateRequest request; + private final Coder valueCoder; + private final TimestampedValueCoder timestampedValueCoder; + // Pending updates to persistent storage + private NavigableMap> pendingAdds = Maps.newTreeMap(); + private TreeRangeSet pendingRemoves = TreeRangeSet.create(); + + private boolean isCleared = false; + private boolean isClosed = false; + + public OrderedListUserState( + Cache cache, + BeamFnStateClient beamFnStateClient, + String instructionId, + StateKey stateKey, + Coder valueCoder) { +checkArgument( +stateKey.hasOrderedListUserState(), +"Expected OrderedListUserState StateKey but received %s.", +stateKey); +this.beamFnStateClient = beamFnStateClient; +this.valueCoder = valueCoder; +this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder); +this.request = + StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build(); + } + + public void add(TimestampedValue value) { +checkState( +!isClosed, +"OrderedList user state is no longer usable because it is closed for %s", +request.getStateKey()); +Instant timestamp = value.getTimestamp(); +pendingAdds.putIfAbsent(timestamp, new ArrayList<>()); +pendingAdds.get(timestamp).add(value.getValue()); + } + + public Iterable> readRange(Instant minTimestamp, Instant limitTimestamp) { +checkState( +!isClosed, +"OrderedList user state is no longer
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1587995856 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.harness.Cache; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.fn.stream.PrefetchableIterable; +import org.apache.beam.sdk.fn.stream.PrefetchableIterables; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet; +import org.joda.time.Instant; + +/** + * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear and + * persist values. + * + * Calling {@link #asyncClose()} schedules any required persistence changes. This object should + * no longer be used after it is closed. + * + * TODO: Move to an async persist model where persistence is signalled based upon cache memory + * pressure and its need to flush. + */ +public class OrderedListUserState { + private final BeamFnStateClient beamFnStateClient; + private final StateRequest request; + private final Coder valueCoder; + private final TimestampedValueCoder timestampedValueCoder; + // Pending updates to persistent storage + private NavigableMap> pendingAdds = Maps.newTreeMap(); + private TreeRangeSet pendingRemoves = TreeRangeSet.create(); + + private boolean isCleared = false; + private boolean isClosed = false; + + public OrderedListUserState( + Cache cache, + BeamFnStateClient beamFnStateClient, + String instructionId, + StateKey stateKey, + Coder valueCoder) { +checkArgument( +stateKey.hasOrderedListUserState(), +"Expected OrderedListUserState StateKey but received %s.", +stateKey); +this.beamFnStateClient = beamFnStateClient; +this.valueCoder = valueCoder; +this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder); +this.request = + StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build(); + } + + public void add(TimestampedValue value) { +checkState( +!isClosed, +"OrderedList user state is no longer usable because it is closed for %s", +request.getStateKey()); +Instant timestamp = value.getTimestamp(); +pendingAdds.putIfAbsent(timestamp, new ArrayList<>()); +pendingAdds.get(timestamp).add(value.getValue()); + } + + public Iterable> readRange(Instant minTimestamp, Instant limitTimestamp) { +checkState( +!isClosed, +"OrderedList user state is no longer
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1587899863 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.harness.Cache; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.fn.stream.PrefetchableIterable; +import org.apache.beam.sdk.fn.stream.PrefetchableIterables; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet; +import org.joda.time.Instant; + +/** + * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear and + * persist values. + * + * Calling {@link #asyncClose()} schedules any required persistence changes. This object should + * no longer be used after it is closed. + * + * TODO: Move to an async persist model where persistence is signalled based upon cache memory + * pressure and its need to flush. + */ +public class OrderedListUserState { + private final BeamFnStateClient beamFnStateClient; + private final StateRequest request; + private final Coder valueCoder; + private final TimestampedValueCoder timestampedValueCoder; + // Pending updates to persistent storage + private NavigableMap> pendingAdds = Maps.newTreeMap(); + private TreeRangeSet pendingRemoves = TreeRangeSet.create(); + + private boolean isCleared = false; + private boolean isClosed = false; + + public OrderedListUserState( + Cache cache, + BeamFnStateClient beamFnStateClient, + String instructionId, + StateKey stateKey, + Coder valueCoder) { +checkArgument( +stateKey.hasOrderedListUserState(), +"Expected OrderedListUserState StateKey but received %s.", +stateKey); +this.beamFnStateClient = beamFnStateClient; +this.valueCoder = valueCoder; +this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder); +this.request = + StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build(); + } + + public void add(TimestampedValue value) { +checkState( +!isClosed, +"OrderedList user state is no longer usable because it is closed for %s", +request.getStateKey()); +Instant timestamp = value.getTimestamp(); +pendingAdds.putIfAbsent(timestamp, new ArrayList<>()); +pendingAdds.get(timestamp).add(value.getValue()); + } + + public Iterable> readRange(Instant minTimestamp, Instant limitTimestamp) { +checkState( +!isClosed, +"OrderedList user state is no longer
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1587892941 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +1103,12 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). Review Comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1587889713 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java: ## @@ -600,8 +603,74 @@ public MultimapState bindMultimap( @Override public OrderedListState bindOrderedList( String id, StateSpec> spec, Coder elemCoder) { -throw new UnsupportedOperationException( -"TODO: Add support for a sorted-list state to the Fn API."); +return (OrderedListState) +stateKeyObjectCache.computeIfAbsent( +createOrderedListUserStateKey(id), +new Function() { + @Override + public Object apply(StateKey key) { +return new OrderedListState() { + private final OrderedListUserState impl = + createOrderedListUserState(key, elemCoder); + + @Override + public void clear() { Review Comment: The clear() function in OrderedListState is quite similar to its counterpart in MultiMap, and I don't think we ever has the notion of deleting all states associated with a state object. There is one function called "asyncClose()" which will sync the local copy with the remote one and then *invalidate* any states associated with the state object. On a different topic, I notice that we have a clear() function in the implementation, which handles the special case of deleting all elements from an OrderedList more efficiently than clearRange(min, max). I should probably use that instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1587688898 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1063,6 +1087,10 @@ message StateAppendRequest { // Represents a part of a logical byte stream. Elements within // the logical byte stream are encoded in the nested context and // multiple append requests are concatenated together. + // For OrderedListState, elements of TimeStampedValue should be encoded + // with TimestampedValueCoder.of(LengthPrefixCoder.of(Coder)), so that + // the request handler knows how to decode timestamps from the data without + // decoding the value bytes. Review Comment: This comment has been updated in the proto change only PR (#31092) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
Abacn commented on PR #30317: URL: https://github.com/apache/beam/pull/30317#issuecomment-2034822148 waiting on author -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
github-actions[bot] commented on PR #30317: URL: https://github.com/apache/beam/pull/30317#issuecomment-2034415747 Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`: R: @Abacn for label java. Available commands: - `stop reviewer notifications` - opt out of the automated review tooling - `remind me after tests pass` - tag the comment author after tests pass - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
github-actions[bot] commented on PR #30317: URL: https://github.com/apache/beam/pull/30317#issuecomment-2027168571 Reminder, please take a look at this pr: @kennknowles -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
github-actions[bot] commented on PR #30317: URL: https://github.com/apache/beam/pull/30317#issuecomment-2014951449 Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`: R: @kennknowles for label java. Available commands: - `stop reviewer notifications` - opt out of the automated review tooling - `remind me after tests pass` - tag the comment author after tests pass - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
acrites commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1531296538 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## Review Comment: Or are we going to piggy-back on multimap for this? (If so we should delete the TODO.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
acrites commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1531294554 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## Review Comment: Do we also need to add something here: https://github.com/apache/beam/blob/fb7ba65e2236f3dd871b6e492afc07249a4a5c49/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L478 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
acrites commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1529368187 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java: ## @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static java.util.Arrays.asList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertThrows; + +import java.io.IOException; +import java.util.Collections; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class OrderedListUserStateTest { + private static final TimestampedValue A1 = + TimestampedValue.of("A1", Instant.ofEpochMilli(1)); + private static final TimestampedValue B1 = + TimestampedValue.of("B1", Instant.ofEpochMilli(1)); + private static final TimestampedValue A2 = + TimestampedValue.of("A2", Instant.ofEpochMilli(2)); + private static final TimestampedValue B2 = + TimestampedValue.of("B2", Instant.ofEpochMilli(2)); + private static final TimestampedValue A3 = + TimestampedValue.of("A3", Instant.ofEpochMilli(3)); + private static final TimestampedValue A4 = + TimestampedValue.of("A4", Instant.ofEpochMilli(4)); + + private final String pTransformId = "pTransformId"; + private final String stateId = "stateId"; + private final String encodedWindow = "encodedWindow"; + + @Test + public void testNoPersistedValues() throws Exception { +FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(Collections.emptyMap()); +OrderedListUserState userState = +new OrderedListUserState<>( +Caches.noop(), +fakeClient, +"instructionId", +createOrderedListStateKey("A"), +StringUtf8Coder.of()); +assertThat(userState.read(), is(emptyIterable())); + } + + @Test + public void testRead() throws Exception { +FakeBeamFnStateClient fakeClient = +new FakeBeamFnStateClient( +TimestampedValueCoder.of(StringUtf8Coder.of()), +ImmutableMap.of(createOrderedListStateKey("A", 1), asList(A1, B1))); +OrderedListUserState userState = +new OrderedListUserState<>( +Caches.noop(), +fakeClient, +"instructionId", +createOrderedListStateKey("A"), +StringUtf8Coder.of()); + +assertArrayEquals( +asList(A1, B1).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); +userState.asyncClose(); +assertThrows(IllegalStateException.class, () -> userState.read()); + } + + @Test + public void testReadRange() throws Exception { +FakeBeamFnStateClient fakeClient = +new FakeBeamFnStateClient( +TimestampedValueCoder.of(StringUtf8Coder.of()), +ImmutableMap.of( +createOrderedListStateKey("A", 1), asList(A1, B1), +createOrderedListStateKey("A", 4), Collections.singletonList(A4), +createOrderedListStateKey("A", 2), Collections.singletonList(A2))); + +OrderedListUserState userState = +new OrderedListUserState<>( +Caches.noop(), +fakeClient, +"instructionId", +createOrderedListStateKey("A"), +
Re: [PR] Implement ordered list state for FnApi. [beam]
acrites commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1529360968 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java: ## @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static java.util.Arrays.asList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertThrows; + +import java.io.IOException; +import java.util.Collections; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class OrderedListUserStateTest { + private static final TimestampedValue A1 = + TimestampedValue.of("A1", Instant.ofEpochMilli(1)); + private static final TimestampedValue B1 = + TimestampedValue.of("B1", Instant.ofEpochMilli(1)); + private static final TimestampedValue A2 = + TimestampedValue.of("A2", Instant.ofEpochMilli(2)); + private static final TimestampedValue B2 = + TimestampedValue.of("B2", Instant.ofEpochMilli(2)); + private static final TimestampedValue A3 = + TimestampedValue.of("A3", Instant.ofEpochMilli(3)); + private static final TimestampedValue A4 = + TimestampedValue.of("A4", Instant.ofEpochMilli(4)); + + private final String pTransformId = "pTransformId"; + private final String stateId = "stateId"; + private final String encodedWindow = "encodedWindow"; + + @Test + public void testNoPersistedValues() throws Exception { +FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(Collections.emptyMap()); +OrderedListUserState userState = +new OrderedListUserState<>( +Caches.noop(), +fakeClient, +"instructionId", +createOrderedListStateKey("A"), +StringUtf8Coder.of()); +assertThat(userState.read(), is(emptyIterable())); + } + + @Test + public void testRead() throws Exception { +FakeBeamFnStateClient fakeClient = +new FakeBeamFnStateClient( +TimestampedValueCoder.of(StringUtf8Coder.of()), +ImmutableMap.of(createOrderedListStateKey("A", 1), asList(A1, B1))); +OrderedListUserState userState = +new OrderedListUserState<>( +Caches.noop(), +fakeClient, +"instructionId", +createOrderedListStateKey("A"), +StringUtf8Coder.of()); + +assertArrayEquals( +asList(A1, B1).toArray(), Iterables.toArray(userState.read(), TimestampedValue.class)); +userState.asyncClose(); +assertThrows(IllegalStateException.class, () -> userState.read()); + } + + @Test + public void testReadRange() throws Exception { +FakeBeamFnStateClient fakeClient = +new FakeBeamFnStateClient( +TimestampedValueCoder.of(StringUtf8Coder.of()), +ImmutableMap.of( +createOrderedListStateKey("A", 1), asList(A1, B1), +createOrderedListStateKey("A", 4), Collections.singletonList(A4), +createOrderedListStateKey("A", 2), Collections.singletonList(A2))); + +OrderedListUserState userState = +new OrderedListUserState<>( +Caches.noop(), +fakeClient, +"instructionId", +createOrderedListStateKey("A"), +
Re: [PR] Implement ordered list state for FnApi. [beam]
acrites commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1529334529 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.harness.Cache; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.fn.stream.PrefetchableIterable; +import org.apache.beam.sdk.fn.stream.PrefetchableIterables; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet; +import org.joda.time.Instant; + +/** + * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear and + * persist values. + * + * Calling {@link #asyncClose()} schedules any required persistence changes. This object should + * no longer be used after it is closed. + * + * TODO: Move to an async persist model where persistence is signalled based upon cache memory + * pressure and its need to flush. + */ +public class OrderedListUserState { + private final BeamFnStateClient beamFnStateClient; + private final StateRequest request; + private final Coder valueCoder; + private final TimestampedValueCoder timestampedValueCoder; + // Pending updates to persistent storage + private NavigableMap> pendingAdds = Maps.newTreeMap(); + private TreeRangeSet pendingRemoves = TreeRangeSet.create(); + + private boolean isCleared = false; + private boolean isClosed = false; + + public OrderedListUserState( + Cache cache, + BeamFnStateClient beamFnStateClient, + String instructionId, + StateKey stateKey, + Coder valueCoder) { +checkArgument( +stateKey.hasOrderedListUserState(), +"Expected OrderedListUserState StateKey but received %s.", +stateKey); +this.beamFnStateClient = beamFnStateClient; +this.valueCoder = valueCoder; +this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder); +this.request = + StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build(); + } + + public void add(TimestampedValue value) { +checkState( +!isClosed, +"OrderedList user state is no longer usable because it is closed for %s", +request.getStateKey()); +Instant timestamp = value.getTimestamp(); +pendingAdds.putIfAbsent(timestamp, new ArrayList<>()); +pendingAdds.get(timestamp).add(value.getValue()); + } + + public Iterable> readRange(Instant minTimestamp, Instant limitTimestamp) { +checkState( +!isClosed, +"OrderedList user state is no longer usable
Re: [PR] Implement ordered list state for FnApi. [beam]
acrites commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1529329576 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.harness.Cache; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.fn.stream.PrefetchableIterable; +import org.apache.beam.sdk.fn.stream.PrefetchableIterables; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet; +import org.joda.time.Instant; + +/** + * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear and + * persist values. + * + * Calling {@link #asyncClose()} schedules any required persistence changes. This object should + * no longer be used after it is closed. + * + * TODO: Move to an async persist model where persistence is signalled based upon cache memory + * pressure and its need to flush. + */ +public class OrderedListUserState { + private final BeamFnStateClient beamFnStateClient; + private final StateRequest request; + private final Coder valueCoder; + private final TimestampedValueCoder timestampedValueCoder; + // Pending updates to persistent storage + private NavigableMap> pendingAdds = Maps.newTreeMap(); + private TreeRangeSet pendingRemoves = TreeRangeSet.create(); + + private boolean isCleared = false; + private boolean isClosed = false; + + public OrderedListUserState( + Cache cache, + BeamFnStateClient beamFnStateClient, + String instructionId, + StateKey stateKey, + Coder valueCoder) { +checkArgument( +stateKey.hasOrderedListUserState(), +"Expected OrderedListUserState StateKey but received %s.", +stateKey); +this.beamFnStateClient = beamFnStateClient; +this.valueCoder = valueCoder; +this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder); +this.request = + StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build(); + } + + public void add(TimestampedValue value) { +checkState( +!isClosed, +"OrderedList user state is no longer usable because it is closed for %s", +request.getStateKey()); +Instant timestamp = value.getTimestamp(); +pendingAdds.putIfAbsent(timestamp, new ArrayList<>()); +pendingAdds.get(timestamp).add(value.getValue()); + } + + public Iterable> readRange(Instant minTimestamp, Instant limitTimestamp) { +checkState( +!isClosed, +"OrderedList user state is no longer usable
Re: [PR] Implement ordered list state for FnApi. [beam]
acrites commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1529328943 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.harness.Cache; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.fn.stream.PrefetchableIterable; +import org.apache.beam.sdk.fn.stream.PrefetchableIterables; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet; +import org.joda.time.Instant; + +/** + * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear and + * persist values. + * + * Calling {@link #asyncClose()} schedules any required persistence changes. This object should + * no longer be used after it is closed. + * + * TODO: Move to an async persist model where persistence is signalled based upon cache memory + * pressure and its need to flush. + */ +public class OrderedListUserState { + private final BeamFnStateClient beamFnStateClient; + private final StateRequest request; + private final Coder valueCoder; + private final TimestampedValueCoder timestampedValueCoder; + // Pending updates to persistent storage + private NavigableMap> pendingAdds = Maps.newTreeMap(); + private TreeRangeSet pendingRemoves = TreeRangeSet.create(); + + private boolean isCleared = false; + private boolean isClosed = false; + + public OrderedListUserState( + Cache cache, + BeamFnStateClient beamFnStateClient, + String instructionId, + StateKey stateKey, + Coder valueCoder) { +checkArgument( +stateKey.hasOrderedListUserState(), +"Expected OrderedListUserState StateKey but received %s.", +stateKey); +this.beamFnStateClient = beamFnStateClient; +this.valueCoder = valueCoder; +this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder); +this.request = + StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build(); + } + + public void add(TimestampedValue value) { +checkState( +!isClosed, +"OrderedList user state is no longer usable because it is closed for %s", +request.getStateKey()); +Instant timestamp = value.getTimestamp(); +pendingAdds.putIfAbsent(timestamp, new ArrayList<>()); +pendingAdds.get(timestamp).add(value.getValue()); + } + + public Iterable> readRange(Instant minTimestamp, Instant limitTimestamp) { +checkState( +!isClosed, +"OrderedList user state is no longer usable
Re: [PR] Implement ordered list state for FnApi. [beam]
acrites commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1529318766 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.harness.Cache; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.fn.stream.PrefetchableIterable; +import org.apache.beam.sdk.fn.stream.PrefetchableIterables; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet; +import org.joda.time.Instant; + +/** + * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear and + * persist values. + * + * Calling {@link #asyncClose()} schedules any required persistence changes. This object should + * no longer be used after it is closed. + * + * TODO: Move to an async persist model where persistence is signalled based upon cache memory + * pressure and its need to flush. + */ +public class OrderedListUserState { + private final BeamFnStateClient beamFnStateClient; + private final StateRequest request; + private final Coder valueCoder; + private final TimestampedValueCoder timestampedValueCoder; + // Pending updates to persistent storage + private NavigableMap> pendingAdds = Maps.newTreeMap(); + private TreeRangeSet pendingRemoves = TreeRangeSet.create(); + + private boolean isCleared = false; + private boolean isClosed = false; + + public OrderedListUserState( + Cache cache, + BeamFnStateClient beamFnStateClient, + String instructionId, + StateKey stateKey, + Coder valueCoder) { +checkArgument( +stateKey.hasOrderedListUserState(), +"Expected OrderedListUserState StateKey but received %s.", +stateKey); +this.beamFnStateClient = beamFnStateClient; +this.valueCoder = valueCoder; +this.timestampedValueCoder = TimestampedValueCoder.of(this.valueCoder); +this.request = + StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build(); + } + + public void add(TimestampedValue value) { +checkState( +!isClosed, +"OrderedList user state is no longer usable because it is closed for %s", +request.getStateKey()); +Instant timestamp = value.getTimestamp(); +pendingAdds.putIfAbsent(timestamp, new ArrayList<>()); +pendingAdds.get(timestamp).add(value.getValue()); + } + + public Iterable> readRange(Instant minTimestamp, Instant limitTimestamp) { +checkState( +!isClosed, +"OrderedList user state is no longer usable
Re: [PR] Implement ordered list state for FnApi. [beam]
acrites commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1529260758 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java: ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import org.apache.beam.fn.harness.Cache; +import org.apache.beam.fn.harness.Caches; +import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.fn.stream.PrefetchableIterable; +import org.apache.beam.sdk.fn.stream.PrefetchableIterables; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.TreeRangeSet; +import org.joda.time.Instant; + +/** + * An implementation of a bag user state that utilizes the Beam Fn State API to fetch, clear and Review Comment: ordered list -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
acrites commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1529260242 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +1103,12 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). Review Comment: Maybe add a comment about these values needing to be within `[BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE)`? Should these be the default values for these fields? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
acrites commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1529258289 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java: ## @@ -600,8 +603,74 @@ public MultimapState bindMultimap( @Override public OrderedListState bindOrderedList( String id, StateSpec> spec, Coder elemCoder) { -throw new UnsupportedOperationException( -"TODO: Add support for a sorted-list state to the Fn API."); +return (OrderedListState) +stateKeyObjectCache.computeIfAbsent( +createOrderedListUserStateKey(id), +new Function() { + @Override + public Object apply(StateKey key) { +return new OrderedListState() { + private final OrderedListUserState impl = + createOrderedListUserState(key, elemCoder); + + @Override + public void clear() { Review Comment: Do we want a separate notion of "clear"? i.e. one that deletes *all* state associated with this OrderedList and not just `clearRange(min, max)`, which would only delete all the elements from the OrderedList? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
robertwb commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1526756757 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1063,6 +1087,10 @@ message StateAppendRequest { // Represents a part of a logical byte stream. Elements within // the logical byte stream are encoded in the nested context and // multiple append requests are concatenated together. + // For OrderedListState, elements of TimeStampedValue should be encoded + // with TimestampedValueCoder.of(LengthPrefixCoder.of(Coder)), so that + // the request handler knows how to decode timestamps from the data without + // decoding the value bytes. Review Comment: Perhaps note that data will be returned in the same format? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
github-actions[bot] commented on PR #30317: URL: https://github.com/apache/beam/pull/30317#issuecomment-1997307049 Reminder, please take a look at this pr: @robertwb -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on PR #30317: URL: https://github.com/apache/beam/pull/30317#issuecomment-1981758955 Run Java PreCommit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
github-actions[bot] commented on PR #30317: URL: https://github.com/apache/beam/pull/30317#issuecomment-1981431117 Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`: R: @robertwb for label java. Available commands: - `stop reviewer notifications` - opt out of the automated review tooling - `remind me after tests pass` - tag the comment author after tests pass - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers) The PR bot will only process comments in the main thread (not review comments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1511829304 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1021,6 +1033,29 @@ message StateKey { bytes map_key = 5; } + // Represents a request for an ordered list of values associated with a + // specified user key and window for a PTransform. See + // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further + // details. + // + // The response data stream will be a concatenation of all entries of sort key + // and V's associated with the specified user key and window. + // See https://s.apache.org/beam-fn-api-send-and-receive-data for further + // details. + message OrderedListUserState { +// (Required) The id of the PTransform containing user state. +string transform_id = 1; +// (Required) The id of the user state. +string user_state_id = 2; +// (Required) The window encoded in a nested context. +bytes window = 3; +// (Required) The key of the currently executing element encoded in a +// nested context. +bytes key = 4; +// (optional) The sort key encoded in a nested context. +int64 sort_key = 5; Review Comment: Done. The sort_key in the state key is replaced with a sort range. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1511828608 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} Review Comment: I pushed the new code to reuse Get/Append/Clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1511828177 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java: ## @@ -169,6 +172,7 @@ public ResultT get() { } @Override + @SuppressWarnings("unchecked") Review Comment: I reverted the changes on those annotations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1511827360 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); +keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); +data.remove(keyBuilder.build()); +orderedListKeys.get(request.getStateKey()).remove(l); + } +} + +for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { +InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { +throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: Done. -- This is an automated message from the Apache Git Service. To
Re: [PR] Implement ordered list state for FnApi. [beam]
robertwb commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1501142124 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} Review Comment: Thanks. IIUC, append is still correct, as there's a bag (with possibly multiple items) assigned to every point in the ordered space. (It's a lot like MultiMap with the ability to read ranges in order rather than just do point lookups, though +1 to not mixing the two.) Agree on caching--there are more clever things we can do here in the future, but we can punt that to future work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); +keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); +data.remove(keyBuilder.build()); +orderedListKeys.get(request.getStateKey()).remove(l); + } +} + +for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { +InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { +throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: ACK. I find a better way to do this. Specifically, I can reuse
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); +keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); +data.remove(keyBuilder.build()); +orderedListKeys.get(request.getStateKey()).remove(l); + } +} + +for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { +InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { +throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: ACK. I find a better way to do this. Specifically, I can reuse
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); +keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); +data.remove(keyBuilder.build()); +orderedListKeys.get(request.getStateKey()).remove(l); + } +} + +for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { +InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { +throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: ACK. I find a better way to do this. Specifically, I can reuse
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); +keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); +data.remove(keyBuilder.build()); +orderedListKeys.get(request.getStateKey()).remove(l); + } +} + +for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { +InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { +throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: ACK. I find a better way to do this. Specifically, I can reuse
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500882918 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); +keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); +data.remove(keyBuilder.build()); +orderedListKeys.get(request.getStateKey()).remove(l); + } +} + +for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { +InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { +throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: ACK. I find a better way to avoid the decoding overhead in the fake client
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500878413 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} Review Comment: Sounds good! Thanks a lot for the input, Kenn. I will go ahead with the changes to re-use Get/Append/Clear for ordered list then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; +} + +// A request to update an ordered list +message OrderedListStateUpdateRequest { + // when the request is processed, deletes should always happen before inserts. Review Comment: Yep. I am planning to add an addendum to the original design doc to summarize the decisions we make here. We should have that after this round of review completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; +} + +// A request to update an ordered list +message OrderedListStateUpdateRequest { + // when the request is processed, deletes should always happen before inserts. Review Comment: Yep. I am planning to add an addendum to the original design doc to summarize the decisions we make here. We should have that after the this round of review completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; +} + +// A request to update an ordered list +message OrderedListStateUpdateRequest { + // when the request is processed, deletes should always happen before inserts. Review Comment: Yep. I am planning to add an addendum to the original design doc to summarize the decisions we make here after the this round of review completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500872501 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; +} + +// A request to update an ordered list +message OrderedListStateUpdateRequest { + // when the request is processed, deletes should always happen before inserts. Review Comment: Yep. I am planning to add an addendum to the original design to summarize the decision we make here after the this round of review completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
kennknowles commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500851162 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} Review Comment: OK my responses are evolving now that I've read the whole code change and re-read the doc. - I see specialized requests are broken out anyhow, so that's fine. - Including everything needed for caching in the state key is good for raw request caching, so re-using Get is good. Though there are perhaps smarter ways to cache that won't benefit from this - "append" is still a fine method for adding things to ordered list state, but it isn't important and the name is misleading (as it is for bags and multimaps, since they are not ordered, so anyhow it is the same here and might as well keep the incorrect naming) - Obviously `clear` is fine ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; +} + +// A request to update an ordered list +message OrderedListStateUpdateRequest { + // when the request is processed, deletes should always happen before inserts. Review Comment: It would be helpful to outline the pro/con in the design doc of little decisions like, and note which one was chosen and why. For example one benefit to splitting the requests is to avoid ordering issues. We would have to spec that either the inserts or deletes happen first, even though they are in one request together. It is a bit confusing. And then if you want them in the other order, you still have to make two requests but each one has an empty field. And note whether there is an efficiency consideration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
kennknowles commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500844153 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} Review Comment: The one counterpoint is that a state key is used for caching, as long as the spec is that a state key is deterministic in what it returns. So it has value for `get`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
kennknowles commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1500839373 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} Review Comment: I still agree with my comment there: "it looks like a 'do everything' request/response protocol and the key holds all the information." That comment may seem to be about style, but it is really about transparency, debuggability, readability. I view whatever "consistency" we have amongst the state requests as coincidence at best and a mistake at worst. _The_ reason that different kinds of state exist is because they support different methods. There is a reason that the only method they have in common in Java (where it originated) is `clear()`. Apparently the design decision was to model `methodA, methodB, methodC` as `oneBigMethod("actually do A"), oneBigMethod("actually do B"), oneBigMethod("actually do C")`. I just think that is not as good as being straightforward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
robertwb commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1498139008 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} Review Comment: I prefer being consistent. If we want to restructure things to break everything out, we can have that discussion, but it doesn't seem a clear enough win to break consistency. (If the range is in the ordered list state state key, the existing Get/Append/Clear will work just fine, no need for optional fields.) @kennknowles for your thoughts too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1498124213 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { Review Comment: Ack! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
robertwb commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1498123563 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); +keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); +data.remove(keyBuilder.build()); +orderedListKeys.get(request.getStateKey()).remove(l); + } +} + +for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { +InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { +throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: I am not opposed to encode/decode overhead to make things totally clear
Re: [PR] Implement ordered list state for FnApi. [beam]
robertwb commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1498119280 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { Review Comment: I'm just suggesting `String::isEmpty()` over `String::size() > 0`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1498118154 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java: ## @@ -169,6 +172,7 @@ public ResultT get() { } @Override + @SuppressWarnings("unchecked") Review Comment: Right. Nothing changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
robertwb commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1498116158 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java: ## @@ -169,6 +172,7 @@ public ResultT get() { } @Override + @SuppressWarnings("unchecked") Review Comment: But nothing changed in this code, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
robertwb commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1498107366 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1021,6 +1033,29 @@ message StateKey { bytes map_key = 5; } + // Represents a request for an ordered list of values associated with a + // specified user key and window for a PTransform. See + // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further + // details. + // + // The response data stream will be a concatenation of all entries of sort key + // and V's associated with the specified user key and window. + // See https://s.apache.org/beam-fn-api-send-and-receive-data for further + // details. + message OrderedListUserState { +// (Required) The id of the PTransform containing user state. +string transform_id = 1; +// (Required) The id of the user state. +string user_state_id = 2; +// (Required) The window encoded in a nested context. +bytes window = 3; +// (Required) The key of the currently executing element encoded in a +// nested context. +bytes key = 4; +// (optional) The sort key encoded in a nested context. +int64 sort_key = 5; Review Comment: We shouldn't put something in the protos that's only used for the fake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
robertwb commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1498106140 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; Review Comment: Yes, that's what I meant by mutually exclusive. If we allow providing both, we have to figure out what to do when they disagree, and there's no good usecase for that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492990264 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1021,6 +1033,29 @@ message StateKey { bytes map_key = 5; } + // Represents a request for an ordered list of values associated with a + // specified user key and window for a PTransform. See + // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further + // details. + // + // The response data stream will be a concatenation of all entries of sort key + // and V's associated with the specified user key and window. + // See https://s.apache.org/beam-fn-api-send-and-receive-data for further + // details. + message OrderedListUserState { +// (Required) The id of the PTransform containing user state. +string transform_id = 1; +// (Required) The id of the user state. +string user_state_id = 2; +// (Required) The window encoded in a nested context. +bytes window = 3; +// (Required) The key of the currently executing element encoded in a +// nested context. +bytes key = 4; +// (optional) The sort key encoded in a nested context. +int64 sort_key = 5; Review Comment: The sort_key here is only used by FakeBeamFnStateClient. The purpose is to store the internal data the same way as MultiMap. We don't need it for any Insert/Update as the range/key is included in the corresponding messages. https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L1122 https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L1131 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492924566 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); +keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); +data.remove(keyBuilder.build()); +orderedListKeys.get(request.getStateKey()).remove(l); + } +} + +for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { +InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { +throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: Timestamp is the sort key here. As our OrderedListState interface is based
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492924566 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = + new ArrayList<>( + orderedListKeys + .getOrDefault(request.getStateKey(), new TreeSet<>()) + .subSet(r.getStart(), true, r.getEnd(), false)); + + for (Long l : keysToRemove) { +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); +keyBuilder.getOrderedListUserStateBuilder().setSortKey(l); +data.remove(keyBuilder.build()); +orderedListKeys.get(request.getStateKey()).remove(l); + } +} + +for (OrderedListEntry e : request.getOrderedListUpdate().getInsertsList()) { + StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(e.getSortKey()); + + ByteStringOutputStream outStream = new ByteStringOutputStream(); + + try { +InstantCoder.of().encode(Instant.ofEpochMilli(e.getSortKey()), outStream); + } catch (IOException ex) { +throw new RuntimeException(ex); + } + // In the response, the value encoded bytes are placed before the timestamp encoded bytes. Review Comment: Timestamp is the sort key here. As our OrderedListState interface is based
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492822517 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); Review Comment: See my comment in https://github.com/apache/beam/pull/30317#discussion_r1492631642 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492822517 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); Review Comment: See my comment in https://github.com/apache/beam/pull/30317#discussion_r1492631642 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492821907 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { Review Comment: The code here is to check if there is a non-empty continuation token. An empty continuation token in the get request suggests it should return the first block in the query range. Non-empty continuation token here contains information of which block it is requesting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492762927 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); Review Comment: See my comment in a previous thread: https://github.com/apache/beam/pull/30317#discussion_r1492631642 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492761288 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; +} + +// A request to update an ordered list +message OrderedListStateUpdateRequest { + // when the request is processed, deletes should always happen before inserts. Review Comment: Discussion 2 in the design doc talked about this, but there is no conclusion on that https://docs.google.com/document/d/1U77sAvE6Iy9XsVruRYHxPdFji7nqS6HPi1XU8fhyrxs/ I am fine with splitting them though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492753170 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} Review Comment: There is a discussion for this in the original design doc. https://docs.google.com/document/d/1U77sAvE6Iy9XsVruRYHxPdFji7nqS6HPi1XU8fhyrxs/edit?disco=H0MsNik I think @kennknowles mentioned he preferred separate messages. Personally, I prefer the way you propose to re-use the Get/Append/Clear messages by putting optional fields there for ordered list state. However, I hesitate to do that because (1) it will introduce new optional fields to an existing proto message and (2) regarding extensibility and flexibility, it may be easier to have separate messages for ordered list and any future states that doesn't fit in current Get/Append/Clear paradigm? WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492734368 ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; Review Comment: Nice catch. Will delete it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492733026 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java: ## @@ -169,6 +172,7 @@ public ResultT get() { } @Override + @SuppressWarnings("unchecked") Review Comment: I got warning message on each of these binding functions. For example in the code I wrote https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L612 the warning message is as follows: ``` Unchecked cast: 'java.lang.Object' to 'org.apache.beam.sdk.state.OrderedListState' Inspection info: Reports code on which an unchecked warning will be issued by the javac compiler. Every unchecked warning may potentially trigger ClassCastException at runtime ``` Basically, the compiler complains that we convert an object (from the state key cache) into the state class we bind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492637135 ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java: ## @@ -598,13 +606,81 @@ public MultimapState bindMultimap( } @Override + @SuppressWarnings("unchecked") public OrderedListState bindOrderedList( String id, StateSpec> spec, Coder elemCoder) { -throw new UnsupportedOperationException( -"TODO: Add support for a sorted-list state to the Fn API."); +return (OrderedListState) +stateKeyObjectCache.computeIfAbsent( +createOrderedListUserStateKey(id), +new Function() { + @Override + public Object apply(StateKey key) { +return new OrderedListState() { + private final OrderedListUserState impl = + createOrderedListUserState(key, elemCoder); + + @Override + public void clear() { +clearRange( +BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); + } + + @Override + public void add(TimestampedValue value) { +impl.add(value); + } + + @Override + public ReadableState isEmpty() { +return new ReadableState() { + @Override + public @Nullable Boolean read() { +return !impl.read().iterator().hasNext(); + } + + @Override + public ReadableState readLater() { +return this; + } +}; + } + + @Nullable + @Override + public Iterable> read() { +return readRange( +BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); + } + + @Override + public GroupingState, Iterable>> + readLater() { +throw new UnsupportedOperationException(); Review Comment: Ok. Will revise it accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492631642 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; Review Comment: > comment on continuation token ... Good idea! I will add that when I revise the code. > Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive? I don't think the range is a hard requirement. In fact in my simple implementation of the fake client, I put the current sort key and the current block index into the continuation token: https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java#L228. In other implementation, you may need the range, but I think it is implementation-dependent. That's why I hesitate to put a range/sort key as a separate field when continuation token is present. I thought the continuation token should allow the runner to uniquely identify where to find the next block of data. Is that what you mean by "mutually exclusive"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492631642 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; Review Comment: > comment on continuation token ... Good idea! I will add that when I revise the code. > Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive? I don't think the range is a hard requirement. In fact in my simple implementation of the fake client, I put the current sort key and the current block index into the continuation token: https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java#L228. In other implementation, you may need the range, but I think it is implementation-dependent. That's why I hesitate to put a range/sort key as a separate field when continuation token is present. I thought the continuation token should allow the runner to uniquely identify where to find the next block of data.Is that what you mean by "mutually exclusive"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492631642 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; Review Comment: > comment on continuation token ... Good idea! I will add that when I revise the code. > Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive? I don't think the range is a hard requirement. In fact in my simple implementation of the fake client, I put the current sort key and the current block index into the continuation token: https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java#L228. In other implementation, you may need the range, but I think it is implementation-dependent. That's also why I hesitate to put a range/sort key as a separate field when continuation token is present. I thought the continuation token should allow the runner to uniquely identify where to find the next block of data.Is that what you mean by "mutually exclusive"? ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; Review Comment: > comment on continuation token ... Good idea! I will add that when I revise the code. > Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive? I don't think the range is a hard requirement. In fact in my simple implementation of the fake client, I put the current sort key and the current block index into the continuation token: https://github.com/apache/beam/blob/5d7cd5e2e2e6b966ddb47cd78135b58013622d23/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java#L228. In other implementation, you may need the range, but I think it is implementation-dependent. That's also why I hesitate to put a range/sort key as a separate field when continuation token is present. I thought the continuation token should allow the runner to uniquely identify where to find the next block of data.Is that what you mean by "mutually exclusive"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492601187 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; Review Comment: > how do we return multiple elements if the request was a range? We concatenate the encoded entries in a byte array and send them back in chunks with corresponding continuation token. > should this be repeated OrderedListEntry instead of data? I have considered this option. Besides the efficiency reason @robertwb mentioned, I also find that representing the response as bytes has an advantage of reusing the existing code in https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java. This iterator is used to parsed the returned data block (in not only OrderedListState but also Multimap, Bag, etc) and it supports blocks even if the boundary is not aligned with entries/elements. I think this is not achievable with OrderedListEntry > But the coding should be specified (e.g. is this a concatenation of several encoded KVs That's right. It is basically the concatenation of encoded TimestampedValue, since TimestampedValue is already in use in the sdk interface of OrderedListState https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java#L29 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; Review Comment: > how do we return multiple elements if the request was a range? We concatenate the encoded entries in a byte array and send them back in chunks with corresponding continuation token. > should this be repeated OrderedListEntry instead of data? I have considered this option. Besides the efficiency reason @robertwb mentioned, I also find that representing the response as bytes has an advantage of reusing the existing code in https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java. This iterator is used to parsed the returned data block (in not only OrderedListState but also Multimap, Bag, etc) and it supports blocks even if the boundary is not aligned with entries/elements. I think this is not achievable with OrderedListEntry. > But the coding should be specified (e.g. is this a concatenation of several encoded KVs That's right. It is basically the concatenation of encoded TimestampedValue, since TimestampedValue is already in use in the sdk interface of OrderedListState https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java#L29 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492601187 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; Review Comment: > how do we return multiple elements if the request was a range? We concatenate the encoded entries in a byte array and send them back in chunks with corresponding continuation token. > should this be repeated OrderedListEntry instead of data? I have considered this option. Besides the efficiency reason @robertwb mentioned, I also find that representing the the response as bytes has an advantage of reusing the existing code in https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java. This iterator is used to parsed the returned data block (in not only OrderedListState but also Multimap, Bag, etc) and it supports blocks even if the boundary is not aligned with entries/elements. I think this is achievable with OrderedListEntry > But the coding should be specified (e.g. is this a concatenation of several encoded KVs That's right. It is basically the concatenation of encoded TimestampedValue, since TimestampedValue is already in use in the sdk interface of OrderedListState https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java#L29 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492601187 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; Review Comment: > how do we return multiple elements if the request was a range? We concatenate the encoded entries in a byte array and send them back in chunks with corresponding continuation token. > should this be repeated OrderedListEntry instead of data? I have considered this option. Besides the efficiency reason @robertwb mentioned, I also find that representing the response as bytes has an advantage of reusing the existing code in https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java. This iterator is used to parsed the returned data block (in not only OrderedListState but also Multimap, Bag, etc) and it supports blocks even if the boundary is not aligned with entries/elements. I think this is achievable with OrderedListEntry > But the coding should be specified (e.g. is this a concatenation of several encoded KVs That's right. It is basically the concatenation of encoded TimestampedValue, since TimestampedValue is already in use in the sdk interface of OrderedListState https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java#L29 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1492601187 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; Review Comment: > how do we return multiple elements if the request was a range? We concatenate the encoded entries in a byte array and send them back in chunks with corresponding continuation token. > should this be repeated OrderedListEntry instead of data? I have considered this option. Besides the efficiency reason @robertwb mentioned, I also find that representing the the response as bytes has an advantage of reusing the existing code in https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java. This iterator is used to parsed the returned data block (in not only OrderedListState but also Multimap, Bag, etc) and it supports blocks even if the boundary is not aligned with entries/elements. I think this is achievable with OrderedListEntry > But the coding should be specified (e.g. is this a concatenation of several encoded KVs That's right. It is basically the concatenation of encoded TimestampedValue, since TimestampedValue is already in use in the sdk interface of OrderedListState https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java#L29 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
robertwb commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1491730995 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1021,6 +1033,29 @@ message StateKey { bytes map_key = 5; } + // Represents a request for an ordered list of values associated with a + // specified user key and window for a PTransform. See + // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further + // details. + // + // The response data stream will be a concatenation of all entries of sort key + // and V's associated with the specified user key and window. + // See https://s.apache.org/beam-fn-api-send-and-receive-data for further + // details. + message OrderedListUserState { +// (Required) The id of the PTransform containing user state. +string transform_id = 1; +// (Required) The id of the user state. +string user_state_id = 2; +// (Required) The window encoded in a nested context. +bytes window = 3; +// (Required) The key of the currently executing element encoded in a +// nested context. +bytes key = 4; +// (optional) The sort key encoded in a nested context. +int64 sort_key = 5; Review Comment: What is the semantics of providing a sort_key? Should we allow a range instead? ## sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java: ## @@ -174,6 +215,124 @@ public CompletableFuture handle(StateRequest.Builder requestBuild response = StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance()); break; + case ORDERED_LIST_GET: +{ + long start = request.getOrderedListGet().getRange().getStart(); + long end = request.getOrderedListGet().getRange().getEnd(); + + KvCoder coder = KvCoder.of(VarLongCoder.of(), VarIntCoder.of()); + long sortKey = start; + int index = 0; + if (request.getOrderedListGet().getContinuationToken().size() > 0) { +try { + // The continuation format here is the sort key (long) followed by an index (int) + KV cursor = + coder.decode(request.getOrderedListGet().getContinuationToken().newInput()); + sortKey = cursor.getKey(); + index = cursor.getValue(); +} catch (IOException e) { + throw new RuntimeException(e); +} + } + + ByteString continuationToken; + ByteString returnBlock = ByteString.EMPTY; + ; + try { +if (sortKey < start || sortKey >= end) { + throw new IndexOutOfBoundsException("sort key out of range"); +} + +NavigableSet subset = +orderedListKeys +.getOrDefault(request.getStateKey(), new TreeSet<>()) +.subSet(sortKey, true, end, false); + +// get the effective sort key currently, can throw NoSuchElementException +Long nextSortKey = subset.first(); + +StateKey.Builder keyBuilder = request.getStateKey().toBuilder(); + keyBuilder.getOrderedListUserStateBuilder().setSortKey(nextSortKey); +List byteStrings = +data.getOrDefault(keyBuilder.build(), Collections.singletonList(ByteString.EMPTY)); + +// get the block specified in continuation token, can throw IndexOutOfBoundsException +returnBlock = byteStrings.get(index); + +if (byteStrings.size() > index + 1) { + // more blocks from this sort key + index += 1; +} else { + // finish navigating the current sort key and need to find the next one, + // can throw NoSuchElementException + nextSortKey = subset.tailSet(nextSortKey, false).first(); + index = 0; +} + +ByteStringOutputStream outputStream = new ByteStringOutputStream(); +try { + KV cursor = KV.of(nextSortKey, index); + coder.encode(cursor, outputStream); +} catch (IOException e) { + throw new RuntimeException(e); +} +continuationToken = outputStream.toByteString(); + } catch (NoSuchElementException | IndexOutOfBoundsException e) { +continuationToken = ByteString.EMPTY; + } + response = + StateResponse.newBuilder() + .setOrderedListGet( + OrderedListStateGetResponse.newBuilder() + .setData(returnBlock) + .setContinuationToken(continuationToken)); +} +break; + + case ORDERED_LIST_UPDATE: +for (OrderedListRange r : request.getOrderedListUpdate().getDeletesList()) { + List keysToRemove = +
Re: [PR] Implement ordered list state for FnApi. [beam]
scwhittle commented on code in PR #30317: URL: https://github.com/apache/beam/pull/30317#discussion_r1491549772 ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; Review Comment: comment on continuation token, ie should be returned by previous response and the key/range should match the previous request generating the token ## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java: ## @@ -598,13 +606,81 @@ public MultimapState bindMultimap( } @Override + @SuppressWarnings("unchecked") public OrderedListState bindOrderedList( String id, StateSpec> spec, Coder elemCoder) { -throw new UnsupportedOperationException( -"TODO: Add support for a sorted-list state to the Fn API."); +return (OrderedListState) +stateKeyObjectCache.computeIfAbsent( +createOrderedListUserStateKey(id), +new Function() { + @Override + public Object apply(StateKey key) { +return new OrderedListState() { + private final OrderedListUserState impl = + createOrderedListUserState(key, elemCoder); + + @Override + public void clear() { +clearRange( +BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); + } + + @Override + public void add(TimestampedValue value) { +impl.add(value); + } + + @Override + public ReadableState isEmpty() { +return new ReadableState() { + @Override + public @Nullable Boolean read() { +return !impl.read().iterator().hasNext(); + } + + @Override + public ReadableState readLater() { +return this; + } +}; + } + + @Nullable + @Override + public Iterable> read() { +return readRange( +BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); + } + + @Override + public GroupingState, Iterable>> + readLater() { +throw new UnsupportedOperationException(); Review Comment: just doing nothing is valid here and seems better than an exception, ditto below ## model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto: ## @@ -1075,6 +,42 @@ message StateClearRequest {} // A response to clear state. message StateClearResponse {} +// A message describes a sort key range [start, end). +message OrderedListRange { + int64 start = 1; + int64 end = 2; +} + +// A data entry which is tagged with a sort key. +message OrderedListEntry { + int64 sort_key = 1; + bytes data = 2; +} + +// This request will fetch an ordered list with a sort key range. If the +// timestamp range is not specified, the runner should use [MIN_INT64, +// MAX_INT64) by default. +message OrderedListStateGetRequest { + bytes continuation_token = 1; + OrderedListRange range = 2; +} + +// A response to the get state request for an ordered list. +message OrderedListStateGetResponse { + bytes continuation_token = 1; + bytes data = 2; Review Comment: how do we return multiple elements if the request was a range? should we return the sort-key for elements, that seems like part of the user-data for example if it's some id/timestamp the user might want it back instead of having to duplicate it in the payload as well. should this be repeated OrderedListEntry instead of data? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Implement ordered list state for FnApi. [beam]
shunping commented on PR #30317: URL: https://github.com/apache/beam/pull/30317#issuecomment-1946576345 Run Python PreCommit 3.9 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org