[GitHub] [beam] HuangLED commented on a change in pull request #11746: [BEAM-10028] Add support for the state backed iterable coder to the Java SDK harness.

2020-05-20 Thread GitBox


HuangLED commented on a change in pull request #11746:
URL: https://github.com/apache/beam/pull/11746#discussion_r428333755



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.runners.core.construction.ModelCoders.STATE_BACKED_ITERABLE_CODER_URN;
+
+import com.google.auto.service.AutoService;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import 
org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext;
+import org.apache.beam.runners.core.construction.CoderTranslator;
+import org.apache.beam.runners.core.construction.CoderTranslatorRegistrar;
+import org.apache.beam.sdk.coders.IterableLikeCoder;
+import org.apache.beam.sdk.fn.stream.DataStreams;
+import org.apache.beam.sdk.util.BufferedElementCountingOutputStream;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
+
+/**
+ * A {@link BeamFnStateClient state} backed iterable which allows for fetching 
elements over the
+ * portability state API. See https://s.apache.org/beam-fn-state-api-and-bundle-processing";>remote 
references for
+ * additional details.
+ *
+ * One must supply a {@link StateBackedIterableTranslationContext} when 
using {@link
+ * CoderTranslator#fromComponents} to be able to create a {@link 
StateBackedIterable.Coder}.
+ */
+public class StateBackedIterable implements Iterable {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final org.apache.beam.sdk.coders.Coder elemCoder;
+  @VisibleForTesting final StateRequest request;
+  @VisibleForTesting final List prefix;
+
+  public StateBackedIterable(
+  BeamFnStateClient beamFnStateClient,
+  String instructionId,
+  ByteString runnerKey,
+  org.apache.beam.sdk.coders.Coder elemCoder,
+  List prefix) {
+this.beamFnStateClient = beamFnStateClient;
+this.elemCoder = elemCoder;
+
+StateRequest.Builder requestBuilder = StateRequest.newBuilder();
+requestBuilder
+.setInstructionId(instructionId)
+.getStateKeyBuilder()
+.getRunnerBuilder()
+.setKey(runnerKey);
+this.request = requestBuilder.build();
+this.prefix = prefix;
+  }
+
+  @Override
+  public Iterator iterator() {
+return Iterators.concat(
+prefix.iterator(),
+new DataStreams.DataStreamDecoder(
+elemCoder,
+
DataStreams.inbound(StateFetchingIterators.forFirstChunk(beamFnStateClient, 
request;
+  }
+
+  /**
+   * Decodes an {@link Iterable} that might be backed by state. If the 
terminator at the end of the
+   * value stream is {@code -1} then we return a {@link StateBackedIterable} 
otherwise we return an
+   * {@link Iterable}.
+   */
+  public static class Coder extends IterableLikeCoder> {
+
+private final BeamFnStateClient beamFnStateClient;
+private final Supplier instructionId;
+
+public Coder(
+BeamFnStateClient beamFnStateClient,
+Supplier instructionId,
+org.apache.beam.sdk.coders.Coder elemCoder) {
+  super(elemCoder, "StateBackedIterable");
+  this.beamFnStateClient = beamFnStateClient;
+  this.instructionId = instructionId;
+}
+
+@Override
+protected Iterable decodeToIterable(List decodedElements) {
+  return decodedElements;
+}
+
+@Override

[GitHub] [beam] HuangLED commented on a change in pull request #11746: [BEAM-10028] Add support for the state backed iterable coder to the Java SDK harness.

2020-05-20 Thread GitBox


HuangLED commented on a change in pull request #11746:
URL: https://github.com/apache/beam/pull/11746#discussion_r427657090



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.state;
+
+import static 
org.apache.beam.runners.core.construction.ModelCoders.STATE_BACKED_ITERABLE_CODER_URN;
+
+import com.google.auto.service.AutoService;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import 
org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext;
+import org.apache.beam.runners.core.construction.CoderTranslator;
+import org.apache.beam.runners.core.construction.CoderTranslatorRegistrar;
+import org.apache.beam.sdk.coders.IterableLikeCoder;
+import org.apache.beam.sdk.fn.stream.DataStreams;
+import org.apache.beam.sdk.util.BufferedElementCountingOutputStream;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
+
+/**
+ * A {@link BeamFnStateClient state} backed iterable which allows for fetching 
elements over the
+ * portability state API. See https://s.apache.org/beam-fn-state-api-and-bundle-processing";>remote 
references for
+ * additional details.
+ *
+ * One must supply a {@link StateBackedIterableTranslationContext} when 
using {@link
+ * CoderTranslator#fromComponents} to be able to create a {@link 
StateBackedIterable.Coder}.
+ */
+public class StateBackedIterable implements Iterable {
+
+  private final BeamFnStateClient beamFnStateClient;
+  private final org.apache.beam.sdk.coders.Coder elemCoder;
+  @VisibleForTesting final StateRequest request;
+  @VisibleForTesting final List prefix;
+
+  public StateBackedIterable(
+  BeamFnStateClient beamFnStateClient,
+  String instructionId,
+  ByteString runnerKey,
+  org.apache.beam.sdk.coders.Coder elemCoder,
+  List prefix) {
+this.beamFnStateClient = beamFnStateClient;
+this.elemCoder = elemCoder;
+
+StateRequest.Builder requestBuilder = StateRequest.newBuilder();
+requestBuilder
+.setInstructionId(instructionId)
+.getStateKeyBuilder()
+.getRunnerBuilder()
+.setKey(runnerKey);
+this.request = requestBuilder.build();
+this.prefix = prefix;
+  }
+
+  @Override
+  public Iterator iterator() {
+return Iterators.concat(
+prefix.iterator(),
+new DataStreams.DataStreamDecoder(
+elemCoder,
+
DataStreams.inbound(StateFetchingIterators.forFirstChunk(beamFnStateClient, 
request;
+  }
+
+  /**
+   * Decodes an {@link Iterable} that might be backed by state. If the 
terminator at the end of the
+   * value stream is {@code -1} then we return a {@link StateBackedIterable} 
otherwise we return an
+   * {@link Iterable}.
+   */
+  public static class Coder extends IterableLikeCoder> {
+
+private final BeamFnStateClient beamFnStateClient;
+private final Supplier instructionId;
+
+public Coder(
+BeamFnStateClient beamFnStateClient,
+Supplier instructionId,
+org.apache.beam.sdk.coders.Coder elemCoder) {
+  super(elemCoder, "StateBackedIterable");
+  this.beamFnStateClient = beamFnStateClient;
+  this.instructionId = instructionId;
+}
+
+@Override
+protected Iterable decodeToIterable(List decodedElements) {
+  return decodedElements;
+}
+
+@Override