[GitHub] [beam] HuangLED commented on a change in pull request #11746: [BEAM-10028] Add support for the state backed iterable coder to the Java SDK harness.
HuangLED commented on a change in pull request #11746: URL: https://github.com/apache/beam/pull/11746#discussion_r428333755 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java ## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.runners.core.construction.ModelCoders.STATE_BACKED_ITERABLE_CODER_URN; + +import com.google.auto.service.AutoService; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext; +import org.apache.beam.runners.core.construction.CoderTranslator; +import org.apache.beam.runners.core.construction.CoderTranslatorRegistrar; +import org.apache.beam.sdk.coders.IterableLikeCoder; +import org.apache.beam.sdk.fn.stream.DataStreams; +import org.apache.beam.sdk.util.BufferedElementCountingOutputStream; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams; + +/** + * A {@link BeamFnStateClient state} backed iterable which allows for fetching elements over the + * portability state API. See https://s.apache.org/beam-fn-state-api-and-bundle-processing";>remote references for + * additional details. + * + * One must supply a {@link StateBackedIterableTranslationContext} when using {@link + * CoderTranslator#fromComponents} to be able to create a {@link StateBackedIterable.Coder}. + */ +public class StateBackedIterable implements Iterable { + + private final BeamFnStateClient beamFnStateClient; + private final org.apache.beam.sdk.coders.Coder elemCoder; + @VisibleForTesting final StateRequest request; + @VisibleForTesting final List prefix; + + public StateBackedIterable( + BeamFnStateClient beamFnStateClient, + String instructionId, + ByteString runnerKey, + org.apache.beam.sdk.coders.Coder elemCoder, + List prefix) { +this.beamFnStateClient = beamFnStateClient; +this.elemCoder = elemCoder; + +StateRequest.Builder requestBuilder = StateRequest.newBuilder(); +requestBuilder +.setInstructionId(instructionId) +.getStateKeyBuilder() +.getRunnerBuilder() +.setKey(runnerKey); +this.request = requestBuilder.build(); +this.prefix = prefix; + } + + @Override + public Iterator iterator() { +return Iterators.concat( +prefix.iterator(), +new DataStreams.DataStreamDecoder( +elemCoder, + DataStreams.inbound(StateFetchingIterators.forFirstChunk(beamFnStateClient, request; + } + + /** + * Decodes an {@link Iterable} that might be backed by state. If the terminator at the end of the + * value stream is {@code -1} then we return a {@link StateBackedIterable} otherwise we return an + * {@link Iterable}. + */ + public static class Coder extends IterableLikeCoder> { + +private final BeamFnStateClient beamFnStateClient; +private final Supplier instructionId; + +public Coder( +BeamFnStateClient beamFnStateClient, +Supplier instructionId, +org.apache.beam.sdk.coders.Coder elemCoder) { + super(elemCoder, "StateBackedIterable"); + this.beamFnStateClient = beamFnStateClient; + this.instructionId = instructionId; +} + +@Override +protected Iterable decodeToIterable(List decodedElements) { + return decodedElements; +} + +@Override
[GitHub] [beam] HuangLED commented on a change in pull request #11746: [BEAM-10028] Add support for the state backed iterable coder to the Java SDK harness.
HuangLED commented on a change in pull request #11746: URL: https://github.com/apache/beam/pull/11746#discussion_r427657090 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java ## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +import static org.apache.beam.runners.core.construction.ModelCoders.STATE_BACKED_ITERABLE_CODER_URN; + +import com.google.auto.service.AutoService; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; +import org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext; +import org.apache.beam.runners.core.construction.CoderTranslator; +import org.apache.beam.runners.core.construction.CoderTranslatorRegistrar; +import org.apache.beam.sdk.coders.IterableLikeCoder; +import org.apache.beam.sdk.fn.stream.DataStreams; +import org.apache.beam.sdk.util.BufferedElementCountingOutputStream; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams; + +/** + * A {@link BeamFnStateClient state} backed iterable which allows for fetching elements over the + * portability state API. See https://s.apache.org/beam-fn-state-api-and-bundle-processing";>remote references for + * additional details. + * + * One must supply a {@link StateBackedIterableTranslationContext} when using {@link + * CoderTranslator#fromComponents} to be able to create a {@link StateBackedIterable.Coder}. + */ +public class StateBackedIterable implements Iterable { + + private final BeamFnStateClient beamFnStateClient; + private final org.apache.beam.sdk.coders.Coder elemCoder; + @VisibleForTesting final StateRequest request; + @VisibleForTesting final List prefix; + + public StateBackedIterable( + BeamFnStateClient beamFnStateClient, + String instructionId, + ByteString runnerKey, + org.apache.beam.sdk.coders.Coder elemCoder, + List prefix) { +this.beamFnStateClient = beamFnStateClient; +this.elemCoder = elemCoder; + +StateRequest.Builder requestBuilder = StateRequest.newBuilder(); +requestBuilder +.setInstructionId(instructionId) +.getStateKeyBuilder() +.getRunnerBuilder() +.setKey(runnerKey); +this.request = requestBuilder.build(); +this.prefix = prefix; + } + + @Override + public Iterator iterator() { +return Iterators.concat( +prefix.iterator(), +new DataStreams.DataStreamDecoder( +elemCoder, + DataStreams.inbound(StateFetchingIterators.forFirstChunk(beamFnStateClient, request; + } + + /** + * Decodes an {@link Iterable} that might be backed by state. If the terminator at the end of the + * value stream is {@code -1} then we return a {@link StateBackedIterable} otherwise we return an + * {@link Iterable}. + */ + public static class Coder extends IterableLikeCoder> { + +private final BeamFnStateClient beamFnStateClient; +private final Supplier instructionId; + +public Coder( +BeamFnStateClient beamFnStateClient, +Supplier instructionId, +org.apache.beam.sdk.coders.Coder elemCoder) { + super(elemCoder, "StateBackedIterable"); + this.beamFnStateClient = beamFnStateClient; + this.instructionId = instructionId; +} + +@Override +protected Iterable decodeToIterable(List decodedElements) { + return decodedElements; +} + +@Override