This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d9318bff08 Remove deprecated first implementations of data channel 
connector code on the runner side (#25104)
3d9318bff08 is described below

commit 3d9318bff0873fb7426cff7cf0ee4eeedbf5796f
Author: Luke Cwik <lc...@google.com>
AuthorDate: Tue Jan 24 09:59:04 2023 -0800

    Remove deprecated first implementations of data channel connector code on 
the runner side (#25104)
    
    This allows us to cleanup the existing code base now that the Dataflow JRH 
is gone and should improve the stability and performance for Java based 
portable runners built on top of this code since we have had a lot more use of 
the SDK harness variants at scale.
---
 .../fnexecution/control/SdkHarnessClient.java      | 135 ++++++-----
 .../runners/fnexecution/data/FnDataService.java    |  53 +++--
 .../runners/fnexecution/data/GrpcDataService.java  |  65 +++---
 .../control/DefaultJobBundleFactoryTest.java       |   6 +-
 .../fnexecution/control/SdkHarnessClientTest.java  | 259 +++++++++++----------
 .../fnexecution/data/GrpcDataServiceTest.java      |  37 +--
 .../sdk/fn/data/BeamFnDataGrpcMultiplexer.java     | 227 ------------------
 .../sdk/fn/data/BeamFnDataGrpcMultiplexer2.java    |   2 -
 .../sdk/fn/data/BeamFnDataInboundObserver.java     | 103 --------
 .../sdk/fn/data/BeamFnDataOutboundObserver.java    |  87 -------
 .../data/CompletableFutureInboundDataClient.java   |  78 -------
 .../sdk/fn/data/BeamFnDataGrpcMultiplexerTest.java | 127 ----------
 .../sdk/fn/data/BeamFnDataInboundObserverTest.java | 108 ---------
 .../CompletableFutureInboundDataClientTest.java    | 163 -------------
 .../beam/fn/harness/data/BeamFnDataClient.java     |   9 +-
 15 files changed, 298 insertions(+), 1161 deletions(-)

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
index 99cc2f793dc..609988b7d4e 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
@@ -20,10 +20,12 @@ package org.apache.beam.runners.fnexecution.control;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Phaser;
@@ -46,12 +48,16 @@ import 
org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
 import org.apache.beam.runners.fnexecution.state.StateDelegator;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2;
+import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
+import org.apache.beam.sdk.fn.data.DataEndpoint;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.fn.data.InboundDataClient;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
+import org.apache.beam.sdk.fn.data.TimerEndpoint;
 import org.apache.beam.sdk.util.MoreFutures;
 import org.apache.beam.sdk.values.KV;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -262,44 +268,64 @@ public class SdkHarnessClient implements AutoCloseable {
 
       CompletionStage<BeamFnApi.ProcessBundleResponse> specificResponse =
           genericResponse.thenApply(InstructionResponse::getProcessBundle);
-      Map<LogicalEndpoint, InboundDataClient> outputClients = new HashMap<>();
-      for (Map.Entry<String, RemoteOutputReceiver<?>> receiver : 
outputReceivers.entrySet()) {
-        LogicalEndpoint endpoint = LogicalEndpoint.data(bundleId, 
receiver.getKey());
-        InboundDataClient outputClient =
-            attachReceiver(endpoint, (RemoteOutputReceiver) 
receiver.getValue());
-        outputClients.put(endpoint, outputClient);
-      }
-      for (Map.Entry<KV<String, String>, RemoteOutputReceiver<Timer<?>>> 
timerReceiver :
-          timerReceivers.entrySet()) {
-        LogicalEndpoint endpoint =
-            LogicalEndpoint.timer(
-                bundleId, timerReceiver.getKey().getKey(), 
timerReceiver.getKey().getValue());
-        InboundDataClient outputClient = attachReceiver(endpoint, 
timerReceiver.getValue());
-        outputClients.put(endpoint, outputClient);
+      Optional<BeamFnDataInboundObserver2> beamFnDataInboundObserver;
+      if (outputReceivers.isEmpty() && timerReceivers.isEmpty()) {
+        beamFnDataInboundObserver = Optional.empty();
+      } else {
+        List<DataEndpoint<?>> dataEndpoints = new 
ArrayList<>(outputReceivers.size());
+        for (Map.Entry<String, RemoteOutputReceiver<?>> receiver : 
outputReceivers.entrySet()) {
+          dataEndpoints.add(
+              DataEndpoint.create(
+                  receiver.getKey(),
+                  (Coder<Object>) receiver.getValue().getCoder(),
+                  (FnDataReceiver<Object>) receiver.getValue().getReceiver()));
+        }
+        List<TimerEndpoint<?>> timerEndpoints = new 
ArrayList<>(timerReceivers.size());
+        for (Map.Entry<KV<String, String>, RemoteOutputReceiver<Timer<?>>> 
timerReceiver :
+            timerReceivers.entrySet()) {
+          timerEndpoints.add(
+              TimerEndpoint.create(
+                  timerReceiver.getKey().getKey(),
+                  timerReceiver.getKey().getValue(),
+                  timerReceiver.getValue().getCoder(),
+                  timerReceiver.getValue().getReceiver()));
+        }
+        beamFnDataInboundObserver =
+            Optional.of(BeamFnDataInboundObserver2.forConsumers(dataEndpoints, 
timerEndpoints));
+        fnApiDataService.registerReceiver(bundleId, 
beamFnDataInboundObserver.get());
       }
 
-      ImmutableMap.Builder<LogicalEndpoint, CloseableFnDataReceiver> 
receiverBuilder =
+      ImmutableMap.Builder<LogicalEndpoint, FnDataReceiver<?>> receiverBuilder 
=
           ImmutableMap.builder();
+      BeamFnDataOutboundAggregator beamFnDataOutboundAggregator =
+          fnApiDataService.createOutboundAggregator(() -> bundleId, false);
       for (RemoteInputDestination remoteInput : remoteInputs) {
         LogicalEndpoint endpoint = LogicalEndpoint.data(bundleId, 
remoteInput.getPTransformId());
         receiverBuilder.put(
             endpoint,
-            new CountingFnDataReceiver(fnApiDataService.send(endpoint, 
remoteInput.getCoder())));
+            new CountingFnDataReceiver(
+                beamFnDataOutboundAggregator.registerOutputDataLocation(
+                    remoteInput.getPTransformId(), remoteInput.getCoder())));
       }
 
       for (Map.Entry<String, Map<String, TimerSpec>> entry : 
timerSpecs.entrySet()) {
         for (TimerSpec timerSpec : entry.getValue().values()) {
           LogicalEndpoint endpoint =
               LogicalEndpoint.timer(bundleId, timerSpec.transformId(), 
timerSpec.timerId());
-          receiverBuilder.put(endpoint, fnApiDataService.send(endpoint, 
timerSpec.coder()));
+          receiverBuilder.put(
+              endpoint,
+              beamFnDataOutboundAggregator.registerOutputTimersLocation(
+                  timerSpec.transformId(), timerSpec.timerId(), 
timerSpec.coder()));
         }
       }
+      beamFnDataOutboundAggregator.start();
 
       return new ActiveBundle(
           bundleId,
           specificResponse,
+          beamFnDataOutboundAggregator,
           receiverBuilder.build(),
-          outputClients,
+          beamFnDataInboundObserver,
           stateDelegator.registerForProcessBundleInstructionId(bundleId, 
stateRequestHandler),
           progressHandler,
           splitHandler,
@@ -307,17 +333,13 @@ public class SdkHarnessClient implements AutoCloseable {
           finalizationHandler);
     }
 
-    private <OutputT> InboundDataClient attachReceiver(
-        LogicalEndpoint endpoint, RemoteOutputReceiver<OutputT> receiver) {
-      return fnApiDataService.receive(endpoint, receiver.getCoder(), 
receiver.getReceiver());
-    }
-
     /** An active bundle for a particular {@link 
BeamFnApi.ProcessBundleDescriptor}. */
     public class ActiveBundle implements RemoteBundle {
       private final String bundleId;
       private final CompletionStage<BeamFnApi.ProcessBundleResponse> response;
-      private final Map<LogicalEndpoint, CloseableFnDataReceiver> 
inputReceivers;
-      private final Map<LogicalEndpoint, InboundDataClient> outputClients;
+      private final BeamFnDataOutboundAggregator beamFnDataOutboundAggregator;
+      private final Map<LogicalEndpoint, FnDataReceiver<?>> inputReceivers;
+      private final Optional<BeamFnDataInboundObserver2> 
beamFnDataInboundObserver;
       private final StateDelegator.Registration stateRegistration;
       private final BundleProgressHandler progressHandler;
       private final BundleSplitHandler splitHandler;
@@ -330,8 +352,9 @@ public class SdkHarnessClient implements AutoCloseable {
       private ActiveBundle(
           String bundleId,
           CompletionStage<ProcessBundleResponse> response,
-          Map<LogicalEndpoint, CloseableFnDataReceiver> inputReceivers,
-          Map<LogicalEndpoint, InboundDataClient> outputClients,
+          BeamFnDataOutboundAggregator beamFnDataOutboundAggregator,
+          Map<LogicalEndpoint, FnDataReceiver<?>> inputReceivers,
+          Optional<BeamFnDataInboundObserver2> beamFnDataInboundObserver,
           StateDelegator.Registration stateRegistration,
           BundleProgressHandler progressHandler,
           BundleSplitHandler splitHandler,
@@ -339,8 +362,9 @@ public class SdkHarnessClient implements AutoCloseable {
           BundleFinalizationHandler finalizationHandler) {
         this.bundleId = bundleId;
         this.response = response;
+        this.beamFnDataOutboundAggregator = beamFnDataOutboundAggregator;
         this.inputReceivers = inputReceivers;
-        this.outputClients = outputClients;
+        this.beamFnDataInboundObserver = beamFnDataInboundObserver;
         this.stateRegistration = stateRegistration;
         this.progressHandler = progressHandler;
         this.splitHandler = splitHandler;
@@ -371,8 +395,7 @@ public class SdkHarnessClient implements AutoCloseable {
       @Override
       public Map<String, FnDataReceiver> getInputReceivers() {
         ImmutableMap.Builder<String, FnDataReceiver> rval = 
ImmutableMap.builder();
-        for (Map.Entry<LogicalEndpoint, CloseableFnDataReceiver> entry :
-            inputReceivers.entrySet()) {
+        for (Map.Entry<LogicalEndpoint, FnDataReceiver<?>> entry : 
inputReceivers.entrySet()) {
           if (!entry.getKey().isTimer()) {
             rval.put(entry.getKey().getTransformId(), entry.getValue());
           }
@@ -384,12 +407,11 @@ public class SdkHarnessClient implements AutoCloseable {
       public Map<KV<String, String>, FnDataReceiver<Timer>> 
getTimerReceivers() {
         ImmutableMap.Builder<KV<String, String>, FnDataReceiver<Timer>> rval =
             ImmutableMap.builder();
-        for (Map.Entry<LogicalEndpoint, CloseableFnDataReceiver> entry :
-            inputReceivers.entrySet()) {
+        for (Map.Entry<LogicalEndpoint, FnDataReceiver<?>> entry : 
inputReceivers.entrySet()) {
           if (entry.getKey().isTimer()) {
             rval.put(
                 KV.of(entry.getKey().getTransformId(), 
entry.getKey().getTimerFamilyId()),
-                entry.getValue());
+                (FnDataReceiver<Timer>) entry.getValue());
           }
         }
         return rval.build();
@@ -432,7 +454,7 @@ public class SdkHarnessClient implements AutoCloseable {
           outstandingRequests.register();
         }
         Map<String, DesiredSplit> splits = new HashMap<>();
-        for (Map.Entry<LogicalEndpoint, CloseableFnDataReceiver> 
ptransformToInput :
+        for (Map.Entry<LogicalEndpoint, FnDataReceiver<?>> ptransformToInput :
             inputReceivers.entrySet()) {
           if (!ptransformToInput.getKey().isTimer()) {
             splits.put(
@@ -487,16 +509,12 @@ public class SdkHarnessClient implements AutoCloseable {
         }
 
         Exception exception = null;
-        for (CloseableFnDataReceiver<?> inputReceiver : 
inputReceivers.values()) {
-          try {
-            inputReceiver.close();
-          } catch (Exception e) {
-            if (exception == null) {
-              exception = e;
-            } else {
-              exception.addSuppressed(e);
-            }
-          }
+        try {
+          
beamFnDataOutboundAggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
+        } catch (Exception e) {
+          exception = e;
+        } finally {
+          beamFnDataOutboundAggregator.discard();
         }
         try {
           // We don't have to worry about the completion stage.
@@ -537,12 +555,13 @@ public class SdkHarnessClient implements AutoCloseable {
             exception.addSuppressed(e);
           }
         }
-        for (InboundDataClient outputClient : outputClients.values()) {
+        if (beamFnDataInboundObserver.isPresent()) {
           try {
             if (exception == null) {
-              outputClient.awaitCompletion();
+              beamFnDataInboundObserver.get().awaitCompletion();
+              fnApiDataService.unregisterReceiver(bundleId);
             } else {
-              outputClient.cancel();
+              beamFnDataInboundObserver.get().close();
             }
           } catch (Exception e) {
             if (exception == null) {
@@ -696,14 +715,12 @@ public class SdkHarnessClient implements AutoCloseable {
     }
   }
 
-  /**
-   * A {@link CloseableFnDataReceiver} which counts the number of elements 
that have been accepted.
-   */
-  private static class CountingFnDataReceiver<T> implements 
CloseableFnDataReceiver<T> {
-    private final CloseableFnDataReceiver delegate;
+  /** A {@link FnDataReceiver} which counts the number of elements that have 
been accepted. */
+  private static class CountingFnDataReceiver<T> implements FnDataReceiver<T> {
+    private final FnDataReceiver<T> delegate;
     private long count;
 
-    private CountingFnDataReceiver(CloseableFnDataReceiver delegate) {
+    private CountingFnDataReceiver(FnDataReceiver<T> delegate) {
       this.delegate = delegate;
     }
 
@@ -716,16 +733,6 @@ public class SdkHarnessClient implements AutoCloseable {
       delegate.accept(input);
       count += 1;
     }
-
-    @Override
-    public void flush() throws Exception {
-      delegate.flush();
-    }
-
-    @Override
-    public void close() throws Exception {
-      delegate.close();
-    }
   }
 
   /** Registers a {@link BeamFnApi.ProcessBundleDescriptor} for future 
processing. */
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
index 1c33446993c..7c5f110eab2 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
@@ -17,12 +17,11 @@
  */
 package org.apache.beam.runners.fnexecution.data;
 
-import org.apache.beam.sdk.coders.Coder;
+import java.util.function.Supplier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
 import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.fn.data.InboundDataClient;
-import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 
 /**
  * The {@link FnDataService} is able to forward inbound elements to a consumer 
and is also a
@@ -32,33 +31,43 @@ import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 public interface FnDataService {
 
   /**
-   * Registers a receiver to be notified upon any incoming elements.
+   * Registers a receiver for the provided instruction id.
    *
-   * <p>The provided coder is used to decode inbound elements. The decoded 
elements are passed to
-   * the provided receiver.
+   * <p>The receiver is not required to be thread safe.
    *
-   * <p>Any failure during decoding or processing of the element will put the 
{@link
-   * InboundDataClient} into an error state such that {@link 
InboundDataClient#awaitCompletion()}
-   * will throw an exception.
+   * <p>Receivers for successfully processed bundles must be unregistered. See 
{@link
+   * #unregisterReceiver} for details.
    *
-   * <p>The provided receiver is not required to be thread safe.
+   * <p>Any failure during {@link FnDataReceiver#accept} will mark the 
provided {@code
+   * instructionId} as invalid and will ignore any future data. It is expected 
that if a bundle
+   * fails during processing then the failure will become visible to the 
{@link FnDataService}
+   * during a future {@link FnDataReceiver#accept} invocation.
    */
-  <T> InboundDataClient receive(
-      LogicalEndpoint inputLocation, Coder<T> coder, FnDataReceiver<T> 
listener);
+  void registerReceiver(String instructionId, 
CloseableFnDataReceiver<Elements> observer);
 
   /**
-   * Creates a receiver to which you can write data values and have them sent 
over this data plane
-   * service.
+   * Receivers are only expected to be unregistered when bundle processing has 
completed
+   * successfully.
    *
-   * <p>The provided coder is used to encode elements on the outbound stream.
-   *
-   * <p>Closing the returned receiver signals the end of the stream.
+   * <p>It is expected that if a bundle fails during processing then the 
failure will become visible
+   * to the {@link FnDataService} during a future {@link 
FnDataReceiver#accept} invocation.
+   */
+  void unregisterReceiver(String instructionId);
+
+  /**
+   * Creates a {@link BeamFnDataOutboundAggregator} for buffering and sending 
outbound data and
+   * timers over the data plane. It is important that {@link
+   * 
BeamFnDataOutboundAggregator#sendOrCollectBufferedDataAndFinishOutboundStreams()}
 is called on
+   * the returned BeamFnDataOutboundAggregator at the end of each bundle. If
+   * collectElementsIfNoFlushes is set to true, {@link
+   * 
BeamFnDataOutboundAggregator#sendOrCollectBufferedDataAndFinishOutboundStreams()}
 returns the
+   * buffered elements instead of sending it through the outbound 
StreamObserver if there's no
+   * previous flush.
    *
-   * <p>The returned receiver is not thread safe.
+   * <p>Closing the returned aggregator signals the end of the streams.
    *
-   * @deprecated Migrate to use {@link BeamFnDataOutboundAggregator} directly 
for sending outbound
-   *     data.
+   * <p>The returned aggregator is not thread safe.
    */
-  @Deprecated
-  <T> CloseableFnDataReceiver<T> send(LogicalEndpoint outputLocation, Coder<T> 
coder);
+  BeamFnDataOutboundAggregator createOutboundAggregator(
+      Supplier<String> processBundleRequestIdSupplier, boolean 
collectElementsIfNoFlushes);
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
index 47430530c8a..883d72103a9 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
@@ -23,17 +23,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
 import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer;
-import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
-import org.apache.beam.sdk.fn.data.BeamFnDataOutboundObserver;
+import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2;
+import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
-import org.apache.beam.sdk.fn.data.DecodingFnDataReceiver;
-import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.fn.data.InboundDataClient;
-import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 import org.apache.beam.sdk.fn.server.FnService;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -64,7 +60,7 @@ public class GrpcDataService extends 
BeamFnDataGrpc.BeamFnDataImplBase
     return new GrpcDataService(options, executor, outboundObserverFactory);
   }
 
-  private final SettableFuture<BeamFnDataGrpcMultiplexer> connectedClient;
+  private final SettableFuture<BeamFnDataGrpcMultiplexer2> connectedClient;
   /**
    * A collection of multiplexers which are not used to send data. A handle to 
these multiplexers is
    * maintained in order to perform an orderly shutdown.
@@ -72,7 +68,7 @@ public class GrpcDataService extends 
BeamFnDataGrpc.BeamFnDataImplBase
    * <p>TODO: (BEAM-3811) Replace with some cancellable collection, to ensure 
that new clients of a
    * closed {@link GrpcDataService} are closed with that {@link 
GrpcDataService}.
    */
-  private final Queue<BeamFnDataGrpcMultiplexer> additionalMultiplexers;
+  private final Queue<BeamFnDataGrpcMultiplexer2> additionalMultiplexers;
 
   private final PipelineOptions options;
   private final ExecutorService executor;
@@ -103,8 +99,8 @@ public class GrpcDataService extends 
BeamFnDataGrpc.BeamFnDataImplBase
   public StreamObserver<BeamFnApi.Elements> data(
       final StreamObserver<BeamFnApi.Elements> outboundElementObserver) {
     LOG.info("Beam Fn Data client connected.");
-    BeamFnDataGrpcMultiplexer multiplexer =
-        new BeamFnDataGrpcMultiplexer(
+    BeamFnDataGrpcMultiplexer2 multiplexer =
+        new BeamFnDataGrpcMultiplexer2(
             null, outboundObserverFactory, inbound -> outboundElementObserver);
     // First client that connects completes this future.
     if (!connectedClient.set(multiplexer)) {
@@ -125,7 +121,7 @@ public class GrpcDataService extends 
BeamFnDataGrpc.BeamFnDataImplBase
     // Multiplexer, but if there isn't any multiplexer it prevents callers 
blocking forever.
     connectedClient.cancel(true);
     // Close any other open connections
-    for (BeamFnDataGrpcMultiplexer additional : additionalMultiplexers) {
+    for (BeamFnDataGrpcMultiplexer2 additional : additionalMultiplexers) {
       try {
         additional.close();
       } catch (Exception ignored) {
@@ -139,18 +135,11 @@ public class GrpcDataService extends 
BeamFnDataGrpc.BeamFnDataImplBase
 
   @Override
   @SuppressWarnings("FutureReturnValueIgnored")
-  public <T> InboundDataClient receive(
-      final LogicalEndpoint inputLocation, Coder<T> coder, FnDataReceiver<T> 
listener) {
-    LOG.debug(
-        "Registering receiver for instruction {} and transform {}",
-        inputLocation.getInstructionId(),
-        inputLocation.getTransformId());
-    final BeamFnDataInboundObserver observer =
-        BeamFnDataInboundObserver.forConsumer(
-            inputLocation, new DecodingFnDataReceiver<T>(coder, listener));
+  public void registerReceiver(String instructionId, 
CloseableFnDataReceiver<Elements> observer) {
+    LOG.debug("Registering observer for instruction {}", instructionId);
     if (connectedClient.isDone()) {
       try {
-        connectedClient.get().registerConsumer(inputLocation, observer);
+        connectedClient.get().registerConsumer(instructionId, observer);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new RuntimeException(e);
@@ -161,7 +150,7 @@ public class GrpcDataService extends 
BeamFnDataGrpc.BeamFnDataImplBase
       executor.submit(
           () -> {
             try {
-              connectedClient.get().registerConsumer(inputLocation, observer);
+              connectedClient.get().registerConsumer(instructionId, observer);
             } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
               throw new RuntimeException(e);
@@ -170,21 +159,29 @@ public class GrpcDataService extends 
BeamFnDataGrpc.BeamFnDataImplBase
             }
           });
     }
-    return observer;
   }
 
   @Override
-  public <T> CloseableFnDataReceiver<T> send(LogicalEndpoint outputLocation, 
Coder<T> coder) {
-    LOG.debug(
-        "Creating sender for instruction {} and transform {}",
-        outputLocation.getInstructionId(),
-        outputLocation.getTransformId());
+  public void unregisterReceiver(String instructionId) {
     try {
-      return new BeamFnDataOutboundObserver<>(
-          outputLocation,
-          coder,
+      connectedClient.get().unregisterConsumer(instructionId);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
+  @Override
+  public BeamFnDataOutboundAggregator createOutboundAggregator(
+      Supplier<String> processBundleRequestIdSupplier, boolean 
collectElementsIfNoFlushes) {
+    try {
+      return new BeamFnDataOutboundAggregator(
+          options,
+          processBundleRequestIdSupplier,
           connectedClient.get(3, TimeUnit.MINUTES).getOutboundObserver(),
-          options);
+          collectElementsIfNoFlushes);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException(e);
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
index e0d41ba55d7..44ae7e6bca9 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.fnexecution.control;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -62,7 +63,7 @@ import 
org.apache.beam.runners.fnexecution.state.StateDelegator;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.IdGenerators;
-import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
+import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
 import org.apache.beam.sdk.fn.server.GrpcFnServer;
 import org.apache.beam.sdk.fn.server.ServerFactory;
 import org.apache.beam.sdk.options.ExperimentalOptions;
@@ -123,7 +124,8 @@ public class DefaultJobBundleFactoryTest {
     when(dataServer.getApiServiceDescriptor())
         .thenReturn(ApiServiceDescriptor.getDefaultInstance());
     GrpcDataService dataService = mock(GrpcDataService.class);
-    when(dataService.send(any(), 
any())).thenReturn(mock(CloseableFnDataReceiver.class));
+    when(dataService.createOutboundAggregator(any(), anyBoolean()))
+        .thenReturn(mock(BeamFnDataOutboundAggregator.class));
     when(dataServer.getService()).thenReturn(dataService);
     when(stateServer.getApiServiceDescriptor())
         .thenReturn(ApiServiceDescriptor.getDefaultInstance());
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
index 6c41dd4c290..52f5e2067c5 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
@@ -24,14 +24,17 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
@@ -46,6 +49,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
 import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
@@ -66,12 +70,13 @@ import 
org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
 import org.apache.beam.runners.fnexecution.state.StateDelegator;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.fn.data.InboundDataClient;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
 import org.apache.beam.sdk.transforms.Create;
@@ -93,6 +98,7 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.Mockito;
@@ -107,7 +113,7 @@ public class SdkHarnessClientTest {
 
   @Mock public FnApiControlClient fnApiControlClient;
   @Mock public FnDataService dataService;
-
+  @Captor ArgumentCaptor<CloseableFnDataReceiver<BeamFnApi.Elements>> 
outputReceiverCaptor;
   @Rule public EmbeddedSdkHarness harness = EmbeddedSdkHarness.create();
 
   @Rule public ExpectedException thrown = ExpectedException.none();
@@ -240,7 +246,8 @@ public class SdkHarnessClientTest {
             Collections.singletonList(
                 RemoteInputDestination.of(
                     (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM)));
-    when(dataService.send(any(), 
eq(coder))).thenReturn(mock(CloseableFnDataReceiver.class));
+    when(dataService.createOutboundAggregator(any(), anyBoolean()))
+        .thenReturn(mock(BeamFnDataOutboundAggregator.class));
 
     try (RemoteBundle activeBundle =
         processor.newBundle(Collections.emptyMap(), 
BundleProgressHandler.ignored())) {
@@ -270,7 +277,8 @@ public class SdkHarnessClientTest {
             Collections.singletonList(
                 RemoteInputDestination.of(
                     (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM)));
-    when(dataService.send(any(), 
eq(coder))).thenReturn(mock(CloseableFnDataReceiver.class));
+    when(dataService.createOutboundAggregator(any(), anyBoolean()))
+        .thenReturn(mock(BeamFnDataOutboundAggregator.class));
 
     RemoteBundle activeBundle =
         processor.newBundle(Collections.emptyMap(), 
BundleProgressHandler.ignored());
@@ -302,7 +310,8 @@ public class SdkHarnessClientTest {
             Collections.singletonList(
                 RemoteInputDestination.of(
                     (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM)));
-    when(dataService.send(any(), 
eq(coder))).thenReturn(mock(CloseableFnDataReceiver.class));
+    when(dataService.createOutboundAggregator(any(), anyBoolean()))
+        .thenReturn(mock(BeamFnDataOutboundAggregator.class));
 
     RemoteBundle activeBundle =
         processor.newBundle(Collections.emptyMap(), 
BundleProgressHandler.ignored());
@@ -352,7 +361,8 @@ public class SdkHarnessClientTest {
             Collections.singletonList(
                 RemoteInputDestination.of(
                     (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM)));
-    when(dataService.send(any(), 
eq(coder))).thenReturn(mock(CloseableFnDataReceiver.class));
+    when(dataService.createOutboundAggregator(any(), anyBoolean()))
+        .thenReturn(mock(BeamFnDataOutboundAggregator.class));
 
     BundleProgressHandler mockProgressHandler = 
mock(BundleProgressHandler.class);
 
@@ -423,7 +433,8 @@ public class SdkHarnessClientTest {
             Collections.singletonList(
                 RemoteInputDestination.of(
                     (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM)));
-    when(dataService.send(any(), 
eq(coder))).thenReturn(mock(CloseableFnDataReceiver.class));
+    when(dataService.createOutboundAggregator(any(), anyBoolean()))
+        .thenReturn(mock(BeamFnDataOutboundAggregator.class));
 
     BundleCheckpointHandler mockCheckpointHandler = 
mock(BundleCheckpointHandler.class);
     BundleSplitHandler mockSplitHandler = mock(BundleSplitHandler.class);
@@ -519,10 +530,9 @@ public class SdkHarnessClientTest {
 
   @Test
   public void handleCleanupWhenInputSenderFails() throws Exception {
-    Exception testException = new Exception();
+    RuntimeException testException = new RuntimeException();
 
-    InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
-    CloseableFnDataReceiver mockInputSender = 
mock(CloseableFnDataReceiver.class);
+    BeamFnDataOutboundAggregator mockInputSender = 
mock(BeamFnDataOutboundAggregator.class);
 
     CompletableFuture<InstructionResponse> processBundleResponseFuture = new 
CompletableFuture<>();
     when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
@@ -536,18 +546,21 @@ public class SdkHarnessClientTest {
             Collections.singletonList(
                 RemoteInputDestination.of(
                     (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM)));
-    when(dataService.receive(any(), any(), 
any())).thenReturn(mockOutputReceiver);
-    when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
+    doNothing().when(dataService).registerReceiver(any(), 
outputReceiverCaptor.capture());
+    when(dataService.createOutboundAggregator(any(), 
anyBoolean())).thenReturn(mockInputSender);
 
-    doThrow(testException).when(mockInputSender).close();
+    doThrow(testException)
+        .when(mockInputSender)
+        .sendOrCollectBufferedDataAndFinishOutboundStreams();
 
-    RemoteOutputReceiver mockRemoteOutputReceiver = 
mock(RemoteOutputReceiver.class);
     BundleProgressHandler mockProgressHandler = 
mock(BundleProgressHandler.class);
 
     try {
       try (RemoteBundle activeBundle =
           processor.newBundle(
-              ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, 
mockRemoteOutputReceiver),
+              ImmutableMap.of(
+                  SDK_GRPC_WRITE_TRANSFORM,
+                  RemoteOutputReceiver.of(ByteArrayCoder.of(), 
mock(FnDataReceiver.class))),
               mockProgressHandler)) {
         // We shouldn't be required to complete the process bundle response 
future.
       }
@@ -555,17 +568,21 @@ public class SdkHarnessClientTest {
     } catch (Exception e) {
       assertEquals(testException, e);
 
-      verify(mockOutputReceiver).cancel();
-      verifyNoMoreInteractions(mockOutputReceiver);
+      // We expect that we don't register the receiver and the next accept 
call will raise an error
+      // making the data service aware of the error.
+      verify(dataService, never()).unregisterReceiver(any());
+      assertThrows(
+          "Inbound observer closed.",
+          Exception.class,
+          () -> 
outputReceiverCaptor.getValue().accept(Elements.getDefaultInstance()));
     }
   }
 
   @Test
   public void handleCleanupWithStateWhenInputSenderFails() throws Exception {
-    Exception testException = new Exception();
+    RuntimeException testException = new RuntimeException();
 
-    InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
-    CloseableFnDataReceiver mockInputSender = 
mock(CloseableFnDataReceiver.class);
+    BeamFnDataOutboundAggregator mockInputSender = 
mock(BeamFnDataOutboundAggregator.class);
 
     StateDelegator mockStateDelegator = mock(StateDelegator.class);
     StateDelegator.Registration mockStateRegistration = 
mock(StateDelegator.Registration.class);
@@ -587,17 +604,18 @@ public class SdkHarnessClientTest {
             Collections.singletonList(
                 RemoteInputDestination.of((FullWindowedValueCoder) coder, 
SDK_GRPC_READ_TRANSFORM)),
             mockStateDelegator);
-    when(dataService.receive(any(), any(), 
any())).thenReturn(mockOutputReceiver);
-    when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
-
-    doThrow(testException).when(mockInputSender).close();
+    when(dataService.createOutboundAggregator(any(), 
anyBoolean())).thenReturn(mockInputSender);
 
-    RemoteOutputReceiver mockRemoteOutputReceiver = 
mock(RemoteOutputReceiver.class);
+    doThrow(testException)
+        .when(mockInputSender)
+        .sendOrCollectBufferedDataAndFinishOutboundStreams();
 
     try {
       try (RemoteBundle activeBundle =
           processor.newBundle(
-              ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, 
mockRemoteOutputReceiver),
+              ImmutableMap.of(
+                  SDK_GRPC_WRITE_TRANSFORM,
+                  RemoteOutputReceiver.of(ByteArrayCoder.of(), 
mock(FnDataReceiver.class))),
               mockStateHandler,
               mockProgressHandler)) {
         // We shouldn't be required to complete the process bundle response 
future.
@@ -607,17 +625,21 @@ public class SdkHarnessClientTest {
       assertEquals(testException, e);
 
       verify(mockStateRegistration).abort();
-      verify(mockOutputReceiver).cancel();
-      verifyNoMoreInteractions(mockStateRegistration, mockOutputReceiver);
+      // We expect that we don't register the receiver and the next accept 
call will raise an error
+      // making the data service aware of the error.
+      verify(dataService, never()).unregisterReceiver(any());
+      assertThrows(
+          "Inbound observer closed.",
+          Exception.class,
+          () -> 
outputReceiverCaptor.getValue().accept(Elements.getDefaultInstance()));
     }
   }
 
   @Test
   public void handleCleanupWhenProcessingBundleFails() throws Exception {
-    Exception testException = new Exception();
+    RuntimeException testException = new RuntimeException();
 
-    InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
-    CloseableFnDataReceiver mockInputSender = 
mock(CloseableFnDataReceiver.class);
+    BeamFnDataOutboundAggregator mockInputSender = 
mock(BeamFnDataOutboundAggregator.class);
 
     CompletableFuture<InstructionResponse> processBundleResponseFuture = new 
CompletableFuture<>();
     when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
@@ -631,16 +653,16 @@ public class SdkHarnessClientTest {
             Collections.singletonList(
                 RemoteInputDestination.of(
                     (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM)));
-    when(dataService.receive(any(), any(), 
any())).thenReturn(mockOutputReceiver);
-    when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
+    when(dataService.createOutboundAggregator(any(), 
anyBoolean())).thenReturn(mockInputSender);
 
-    RemoteOutputReceiver mockRemoteOutputReceiver = 
mock(RemoteOutputReceiver.class);
     BundleProgressHandler mockProgressHandler = 
mock(BundleProgressHandler.class);
 
     try {
       try (RemoteBundle activeBundle =
           processor.newBundle(
-              ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, 
mockRemoteOutputReceiver),
+              ImmutableMap.of(
+                  SDK_GRPC_WRITE_TRANSFORM,
+                  RemoteOutputReceiver.of(ByteArrayCoder.of(), 
mock(FnDataReceiver.class))),
               mockProgressHandler)) {
         processBundleResponseFuture.completeExceptionally(testException);
       }
@@ -648,17 +670,21 @@ public class SdkHarnessClientTest {
     } catch (ExecutionException e) {
       assertEquals(testException, e.getCause());
 
-      verify(mockOutputReceiver).cancel();
-      verifyNoMoreInteractions(mockOutputReceiver);
+      // We expect that we don't register the receiver and the next accept 
call will raise an error
+      // making the data service aware of the error.
+      verify(dataService, never()).unregisterReceiver(any());
+      assertThrows(
+          "Inbound observer closed.",
+          Exception.class,
+          () -> 
outputReceiverCaptor.getValue().accept(Elements.getDefaultInstance()));
     }
   }
 
   @Test
   public void handleCleanupWithStateWhenProcessingBundleFails() throws 
Exception {
-    Exception testException = new Exception();
+    RuntimeException testException = new RuntimeException();
 
-    InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
-    CloseableFnDataReceiver mockInputSender = 
mock(CloseableFnDataReceiver.class);
+    BeamFnDataOutboundAggregator mockInputSender = 
mock(BeamFnDataOutboundAggregator.class);
     StateDelegator mockStateDelegator = mock(StateDelegator.class);
     StateDelegator.Registration mockStateRegistration = 
mock(StateDelegator.Registration.class);
     when(mockStateDelegator.registerForProcessBundleInstructionId(any(), 
any()))
@@ -679,15 +705,14 @@ public class SdkHarnessClientTest {
             Collections.singletonList(
                 RemoteInputDestination.of((FullWindowedValueCoder) coder, 
SDK_GRPC_READ_TRANSFORM)),
             mockStateDelegator);
-    when(dataService.receive(any(), any(), 
any())).thenReturn(mockOutputReceiver);
-    when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
-
-    RemoteOutputReceiver mockRemoteOutputReceiver = 
mock(RemoteOutputReceiver.class);
+    when(dataService.createOutboundAggregator(any(), 
anyBoolean())).thenReturn(mockInputSender);
 
     try {
       try (RemoteBundle activeBundle =
           processor.newBundle(
-              ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, 
mockRemoteOutputReceiver),
+              ImmutableMap.of(
+                  SDK_GRPC_WRITE_TRANSFORM,
+                  RemoteOutputReceiver.of(ByteArrayCoder.of(), 
mock(FnDataReceiver.class))),
               mockStateHandler,
               mockProgressHandler)) {
         processBundleResponseFuture.completeExceptionally(testException);
@@ -697,17 +722,19 @@ public class SdkHarnessClientTest {
       assertEquals(testException, e.getCause());
 
       verify(mockStateRegistration).abort();
-      verify(mockOutputReceiver).cancel();
-      verifyNoMoreInteractions(mockStateRegistration, mockOutputReceiver);
+      // We expect that we don't register the receiver and the next accept 
call will raise an error
+      // making the data service aware of the error.
+      verify(dataService, never()).unregisterReceiver(any());
+      assertThrows(
+          "Inbound observer closed.",
+          Exception.class,
+          () -> 
outputReceiverCaptor.getValue().accept(Elements.getDefaultInstance()));
     }
   }
 
   @Test
   public void handleCleanupWhenAwaitingOnClosingOutputReceivers() throws 
Exception {
-    Exception testException = new Exception();
-
-    InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
-    CloseableFnDataReceiver mockInputSender = 
mock(CloseableFnDataReceiver.class);
+    BeamFnDataOutboundAggregator mockInputSender = 
mock(BeamFnDataOutboundAggregator.class);
 
     CompletableFuture<InstructionResponse> processBundleResponseFuture = new 
CompletableFuture<>();
     when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
@@ -721,41 +748,36 @@ public class SdkHarnessClientTest {
             Collections.singletonList(
                 RemoteInputDestination.of(
                     (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM)));
-    when(dataService.receive(any(), any(), 
any())).thenReturn(mockOutputReceiver);
-    when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
-    doThrow(testException).when(mockOutputReceiver).awaitCompletion();
+    doNothing().when(dataService).registerReceiver(any(), 
outputReceiverCaptor.capture());
+    when(dataService.createOutboundAggregator(any(), 
anyBoolean())).thenReturn(mockInputSender);
 
-    RemoteOutputReceiver mockRemoteOutputReceiver = 
mock(RemoteOutputReceiver.class);
     BundleProgressHandler mockProgressHandler = 
mock(BundleProgressHandler.class);
 
-    try {
-      try (RemoteBundle activeBundle =
-          processor.newBundle(
-              ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, 
mockRemoteOutputReceiver),
-              mockProgressHandler)) {
-        // Correlating the ProcessBundleRequest and ProcessBundleResponse is 
owned by the underlying
-        // FnApiControlClient. The SdkHarnessClient owns just wrapping the 
request and unwrapping
-        // the response.
-        //
-        // Currently there are no fields so there's nothing to check. This 
test is formulated
-        // to match the pattern it should have if/when the response is 
meaningful.
-        BeamFnApi.ProcessBundleResponse response =
-            BeamFnApi.ProcessBundleResponse.getDefaultInstance();
-        processBundleResponseFuture.complete(
-            
BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
-      }
-      fail("Exception expected");
-    } catch (Exception e) {
-      assertEquals(testException, e);
-    }
+    RemoteBundle activeBundle =
+        processor.newBundle(
+            ImmutableMap.of(
+                SDK_GRPC_WRITE_TRANSFORM,
+                RemoteOutputReceiver.of(ByteArrayCoder.of(), 
mock(FnDataReceiver.class))),
+            mockProgressHandler);
+    // Correlating the ProcessBundleRequest and ProcessBundleResponse is owned 
by the underlying
+    // FnApiControlClient. The SdkHarnessClient owns just wrapping the request 
and unwrapping
+    // the response.
+    //
+    // Currently there are no fields so there's nothing to check. This test is 
formulated
+    // to match the pattern it should have if/when the response is meaningful.
+    BeamFnApi.ProcessBundleResponse response = 
BeamFnApi.ProcessBundleResponse.getDefaultInstance();
+    processBundleResponseFuture.complete(
+        
BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
+    // Inject an error before we close the bundle as if the data service 
closed the stream
+    // explicitly
+    outputReceiverCaptor.getValue().close();
+
+    assertThrows("Inbound observer closed.", Exception.class, 
activeBundle::close);
   }
 
   @Test
   public void handleCleanupWithStateWhenAwaitingOnClosingOutputReceivers() 
throws Exception {
-    Exception testException = new Exception();
-
-    InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
-    CloseableFnDataReceiver mockInputSender = 
mock(CloseableFnDataReceiver.class);
+    BeamFnDataOutboundAggregator mockInputSender = 
mock(BeamFnDataOutboundAggregator.class);
     StateDelegator mockStateDelegator = mock(StateDelegator.class);
     StateDelegator.Registration mockStateRegistration = 
mock(StateDelegator.Registration.class);
     when(mockStateDelegator.registerForProcessBundleInstructionId(any(), 
any()))
@@ -776,37 +798,34 @@ public class SdkHarnessClientTest {
             Collections.singletonList(
                 RemoteInputDestination.of((FullWindowedValueCoder) coder, 
SDK_GRPC_READ_TRANSFORM)),
             mockStateDelegator);
-    when(dataService.receive(any(), any(), 
any())).thenReturn(mockOutputReceiver);
-    when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
-    doThrow(testException).when(mockOutputReceiver).awaitCompletion();
-
-    RemoteOutputReceiver mockRemoteOutputReceiver = 
mock(RemoteOutputReceiver.class);
+    doNothing().when(dataService).registerReceiver(any(), 
outputReceiverCaptor.capture());
+    when(dataService.createOutboundAggregator(any(), 
anyBoolean())).thenReturn(mockInputSender);
 
-    try {
-      try (RemoteBundle activeBundle =
-          processor.newBundle(
-              ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, 
mockRemoteOutputReceiver),
-              mockStateHandler,
-              mockProgressHandler)) {
-        // Correlating the ProcessBundleRequest and ProcessBundleResponse is 
owned by the underlying
-        // FnApiControlClient. The SdkHarnessClient owns just wrapping the 
request and unwrapping
-        // the response.
-        //
-        // Currently there are no fields so there's nothing to check. This 
test is formulated
-        // to match the pattern it should have if/when the response is 
meaningful.
-        BeamFnApi.ProcessBundleResponse response =
-            BeamFnApi.ProcessBundleResponse.getDefaultInstance();
-        processBundleResponseFuture.complete(
-            
BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
-      }
-      fail("Exception expected");
-    } catch (Exception e) {
-      assertEquals(testException, e);
-    }
+    RemoteBundle activeBundle =
+        processor.newBundle(
+            ImmutableMap.of(
+                SDK_GRPC_WRITE_TRANSFORM,
+                RemoteOutputReceiver.of(ByteArrayCoder.of(), 
mock(FnDataReceiver.class))),
+            mockStateHandler,
+            mockProgressHandler);
+    // Correlating the ProcessBundleRequest and ProcessBundleResponse is owned 
by the underlying
+    // FnApiControlClient. The SdkHarnessClient owns just wrapping the request 
and unwrapping
+    // the response.
+    //
+    // Currently there are no fields so there's nothing to check. This test is 
formulated
+    // to match the pattern it should have if/when the response is meaningful.
+    BeamFnApi.ProcessBundleResponse response = 
BeamFnApi.ProcessBundleResponse.getDefaultInstance();
+    processBundleResponseFuture.complete(
+        
BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
+    // Inject an error before we close the bundle as if the data service 
closed the stream
+    // explicitly.
+    outputReceiverCaptor.getValue().close();
+    assertThrows("Inbound observer closed.", Exception.class, 
activeBundle::close);
   }
 
   @Test
   public void verifyCacheTokensAreUsedInNewBundleRequest() throws 
InterruptedException {
+    BeamFnDataOutboundAggregator mockInputSender = 
mock(BeamFnDataOutboundAggregator.class);
     when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
         .thenReturn(
             CompletableFuture.<InstructionResponse>completedFuture(
@@ -822,7 +841,7 @@ public class SdkHarnessClientTest {
                 SDK_GRPC_READ_TRANSFORM));
 
     BundleProcessor processor1 = sdkHarnessClient.getProcessor(descriptor1, 
remoteInputs);
-    when(dataService.send(any(), 
any())).thenReturn(mock(CloseableFnDataReceiver.class));
+    when(dataService.createOutboundAggregator(any(), 
anyBoolean())).thenReturn(mockInputSender);
 
     StateRequestHandler stateRequestHandler = 
Mockito.mock(StateRequestHandler.class);
     List<BeamFnApi.ProcessBundleRequest.CacheToken> cacheTokens =
@@ -831,7 +850,9 @@ public class SdkHarnessClientTest {
     when(stateRequestHandler.getCacheTokens()).thenReturn(cacheTokens);
 
     processor1.newBundle(
-        ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, 
mock(RemoteOutputReceiver.class)),
+        ImmutableMap.of(
+            SDK_GRPC_WRITE_TRANSFORM,
+            RemoteOutputReceiver.of(ByteArrayCoder.of(), 
mock(FnDataReceiver.class))),
         stateRequestHandler,
         BundleProgressHandler.ignored());
 
@@ -850,8 +871,7 @@ public class SdkHarnessClientTest {
 
   @Test
   public void testBundleCheckpointCallback() throws Exception {
-    InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
-    CloseableFnDataReceiver mockInputSender = 
mock(CloseableFnDataReceiver.class);
+    BeamFnDataOutboundAggregator mockInputSender = 
mock(BeamFnDataOutboundAggregator.class);
 
     CompletableFuture<InstructionResponse> processBundleResponseFuture = new 
CompletableFuture<>();
     when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
@@ -865,10 +885,8 @@ public class SdkHarnessClientTest {
             Collections.singletonList(
                 RemoteInputDestination.of(
                     (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM)));
-    when(dataService.receive(any(), any(), 
any())).thenReturn(mockOutputReceiver);
-    when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
+    when(dataService.createOutboundAggregator(any(), 
anyBoolean())).thenReturn(mockInputSender);
 
-    RemoteOutputReceiver mockRemoteOutputReceiver = 
mock(RemoteOutputReceiver.class);
     BundleProgressHandler mockProgressHandler = 
mock(BundleProgressHandler.class);
     BundleSplitHandler mockSplitHandler = mock(BundleSplitHandler.class);
     BundleCheckpointHandler mockCheckpointHandler = 
mock(BundleCheckpointHandler.class);
@@ -880,7 +898,7 @@ public class SdkHarnessClientTest {
             .build();
     try (ActiveBundle activeBundle =
         processor.newBundle(
-            ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, 
mockRemoteOutputReceiver),
+            Collections.emptyMap(),
             Collections.emptyMap(),
             (request) -> {
               throw new UnsupportedOperationException();
@@ -895,13 +913,12 @@ public class SdkHarnessClientTest {
 
     verify(mockProgressHandler).onCompleted(response);
     verify(mockCheckpointHandler).onCheckpoint(response);
-    verifyZeroInteractions(mockFinalizationHandler, mockSplitHandler);
+    verifyNoMoreInteractions(mockFinalizationHandler, mockSplitHandler);
   }
 
   @Test
   public void testBundleFinalizationCallback() throws Exception {
-    InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
-    CloseableFnDataReceiver mockInputSender = 
mock(CloseableFnDataReceiver.class);
+    BeamFnDataOutboundAggregator mockInputSender = 
mock(BeamFnDataOutboundAggregator.class);
 
     CompletableFuture<InstructionResponse> processBundleResponseFuture = new 
CompletableFuture<>();
     when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
@@ -915,10 +932,8 @@ public class SdkHarnessClientTest {
             Collections.singletonList(
                 RemoteInputDestination.of(
                     (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM)));
-    when(dataService.receive(any(), any(), 
any())).thenReturn(mockOutputReceiver);
-    when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
+    when(dataService.createOutboundAggregator(any(), 
anyBoolean())).thenReturn(mockInputSender);
 
-    RemoteOutputReceiver mockRemoteOutputReceiver = 
mock(RemoteOutputReceiver.class);
     BundleProgressHandler mockProgressHandler = 
mock(BundleProgressHandler.class);
     BundleSplitHandler mockSplitHandler = mock(BundleSplitHandler.class);
     BundleCheckpointHandler mockCheckpointHandler = 
mock(BundleCheckpointHandler.class);
@@ -929,7 +944,7 @@ public class SdkHarnessClientTest {
     String bundleId;
     try (ActiveBundle activeBundle =
         processor.newBundle(
-            ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, 
mockRemoteOutputReceiver),
+            Collections.emptyMap(),
             Collections.emptyMap(),
             (request) -> {
               throw new UnsupportedOperationException();
@@ -945,7 +960,7 @@ public class SdkHarnessClientTest {
 
     verify(mockProgressHandler).onCompleted(response);
     verify(mockFinalizationHandler).requestsFinalization(bundleId);
-    verifyZeroInteractions(mockCheckpointHandler, mockSplitHandler);
+    verifyNoMoreInteractions(mockCheckpointHandler, mockSplitHandler);
   }
 
   private static class TestFn extends DoFn<String, String> {
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java
index ae79d909c2c..86bf247fa34 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java
@@ -24,7 +24,9 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -38,9 +40,10 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
-import org.apache.beam.sdk.fn.data.InboundDataClient;
-import org.apache.beam.sdk.fn.data.LogicalEndpoint;
+import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2;
+import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
+import org.apache.beam.sdk.fn.data.DataEndpoint;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.server.GrpcFnServer;
 import org.apache.beam.sdk.fn.server.InProcessServerFactory;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
@@ -93,13 +96,16 @@ public class GrpcDataServiceTest {
       }
 
       for (int i = 0; i < 3; ++i) {
-        CloseableFnDataReceiver<WindowedValue<String>> consumer =
-            service.send(LogicalEndpoint.data(Integer.toString(i), 
TRANSFORM_ID), CODER);
-
+        final String instructionId = Integer.toString(i);
+        BeamFnDataOutboundAggregator aggregator =
+            service.createOutboundAggregator(() -> instructionId, false);
+        aggregator.start();
+        FnDataReceiver<WindowedValue<String>> consumer =
+            aggregator.registerOutputDataLocation(TRANSFORM_ID, CODER);
         consumer.accept(WindowedValue.valueInGlobalWindow("A" + i));
         consumer.accept(WindowedValue.valueInGlobalWindow("B" + i));
         consumer.accept(WindowedValue.valueInGlobalWindow("C" + i));
-        consumer.close();
+        aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
       }
       waitForInboundElements.countDown();
       for (Future<Void> clientFuture : clientFutures) {
@@ -144,18 +150,19 @@ public class GrpcDataServiceTest {
       }
 
       List<Collection<WindowedValue<String>>> serverInboundValues = new 
ArrayList<>();
-      Collection<InboundDataClient> readFutures = new ArrayList<>();
+      Collection<BeamFnDataInboundObserver2> inboundObservers = new 
ArrayList<>();
       for (int i = 0; i < 3; ++i) {
         final Collection<WindowedValue<String>> serverInboundValue = new 
ArrayList<>();
         serverInboundValues.add(serverInboundValue);
-        readFutures.add(
-            service.receive(
-                LogicalEndpoint.data(Integer.toString(i), TRANSFORM_ID),
-                CODER,
-                serverInboundValue::add));
+        BeamFnDataInboundObserver2 inboundObserver =
+            BeamFnDataInboundObserver2.forConsumers(
+                Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, 
serverInboundValue::add)),
+                Collections.emptyList());
+        service.registerReceiver(Integer.toString(i), inboundObserver);
+        inboundObservers.add(inboundObserver);
       }
-      for (InboundDataClient readFuture : readFutures) {
-        readFuture.awaitCompletion();
+      for (BeamFnDataInboundObserver2 inboundObserver : inboundObservers) {
+        inboundObserver.awaitCompletion();
       }
       waitForInboundElements.countDown();
       for (Future<Void> clientFuture : clientFutures) {
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java
deleted file mode 100644
index d71c87a80de..00000000000
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.fn.data;
-
-import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
-import org.apache.beam.model.pipeline.v1.Endpoints;
-import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Status;
-import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.StreamObserver;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A gRPC multiplexer for a specific {@link Endpoints.ApiServiceDescriptor}.
- *
- * <p>Multiplexes data for inbound consumers based upon their individual {@link
- * org.apache.beam.model.fnexecution.v1.BeamFnApi.Target}s.
- *
- * <p>Multiplexing inbound and outbound streams is as thread safe as the 
consumers of those streams.
- * For inbound streams, this is as thread safe as the inbound observers. For 
outbound streams, this
- * is as thread safe as the underlying stream observer.
- *
- * <p>TODO: Add support for multiplexing over multiple outbound observers by 
stickying the output
- * location with a specific outbound observer.
- *
- * @deprecated Migrate to {@link BeamFnDataGrpcMultiplexer2}.
- */
-@Deprecated
-public class BeamFnDataGrpcMultiplexer implements AutoCloseable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer.class);
-  private final Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor;
-  private final StreamObserver<BeamFnApi.Elements> inboundObserver;
-  private final StreamObserver<BeamFnApi.Elements> outboundObserver;
-  private final ConcurrentMap<LogicalEndpoint, 
CompletableFuture<BiConsumer<ByteString, Boolean>>>
-      consumers;
-
-  public BeamFnDataGrpcMultiplexer(
-      Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor,
-      OutboundObserverFactory outboundObserverFactory,
-      OutboundObserverFactory.BasicFactory<Elements, Elements> 
baseOutboundObserverFactory) {
-    this.apiServiceDescriptor = apiServiceDescriptor;
-    this.consumers = new ConcurrentHashMap<>();
-    this.inboundObserver = new InboundObserver();
-    this.outboundObserver =
-        
outboundObserverFactory.outboundObserverFor(baseOutboundObserverFactory, 
inboundObserver);
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(this)
-        .omitNullValues()
-        .add("apiServiceDescriptor", apiServiceDescriptor)
-        .add("consumers", consumers)
-        .toString();
-  }
-
-  public StreamObserver<BeamFnApi.Elements> getInboundObserver() {
-    return inboundObserver;
-  }
-
-  public StreamObserver<BeamFnApi.Elements> getOutboundObserver() {
-    return outboundObserver;
-  }
-
-  private CompletableFuture<BiConsumer<ByteString, Boolean>> receiverFuture(
-      LogicalEndpoint endpoint) {
-    return consumers.computeIfAbsent(
-        endpoint, (LogicalEndpoint unused) -> new CompletableFuture<>());
-  }
-
-  public <T> void registerConsumer(
-      LogicalEndpoint inputLocation, BiConsumer<ByteString, Boolean> 
bytesReceiver) {
-    receiverFuture(inputLocation).complete(bytesReceiver);
-  }
-
-  @VisibleForTesting
-  boolean hasConsumer(LogicalEndpoint outputLocation) {
-    return consumers.containsKey(outputLocation);
-  }
-
-  @Override
-  public void close() {
-    for (CompletableFuture<BiConsumer<ByteString, Boolean>> receiver :
-        ImmutableList.copyOf(consumers.values())) {
-      // Cancel any observer waiting for the client to complete. If the 
receiver has already been
-      // completed or cancelled, this call will be ignored.
-      receiver.cancel(true);
-    }
-    // Cancel any outbound calls and complete any inbound calls, as this 
multiplexer is hanging up
-    outboundObserver.onError(
-        Status.CANCELLED.withDescription("Multiplexer hanging 
up").asException());
-    inboundObserver.onCompleted();
-  }
-
-  /**
-   * A multiplexing {@link StreamObserver} that selects the inbound {@link 
Consumer} to pass the
-   * elements to.
-   *
-   * <p>The inbound observer blocks until the {@link Consumer} is bound 
allowing for the sending
-   * harness to initiate transmitting data without needing for the receiving 
harness to signal that
-   * it is ready to consume that data.
-   */
-  private final class InboundObserver implements 
StreamObserver<BeamFnApi.Elements> {
-    @Override
-    public void onNext(BeamFnApi.Elements value) {
-      for (BeamFnApi.Elements.Data maybeData : value.getDataList()) {
-        BeamFnApi.Elements.Data data = checkArgumentNotNull(maybeData);
-        try {
-          LogicalEndpoint key =
-              LogicalEndpoint.data(data.getInstructionId(), 
data.getTransformId());
-          CompletableFuture<BiConsumer<ByteString, Boolean>> consumer = 
receiverFuture(key);
-          if (!consumer.isDone()) {
-            LOG.debug(
-                "Received data for key {} without consumer ready. "
-                    + "Waiting for consumer to be registered.",
-                key);
-          }
-          consumer.get().accept(data.getData(), data.getIsLast());
-          if (data.getIsLast()) {
-            consumers.remove(key);
-          }
-          /*
-           * TODO: On failure we should fail any bundles that were impacted 
eagerly
-           * instead of relying on the Runner harness to do all the failure 
handling.
-           */
-        } catch (ExecutionException | InterruptedException e) {
-          LOG.error(
-              "Client interrupted during handling of data for instruction {} 
and transform {}",
-              data.getInstructionId(),
-              data.getTransformId(),
-              e);
-          outboundObserver.onError(e);
-        } catch (RuntimeException e) {
-          LOG.error(
-              "Client failed to handle data for instruction {} and transform 
{}",
-              data.getInstructionId(),
-              data.getTransformId(),
-              e);
-          outboundObserver.onError(e);
-        }
-      }
-
-      for (BeamFnApi.Elements.Timers timer : value.getTimersList()) {
-        try {
-          LogicalEndpoint key =
-              LogicalEndpoint.timer(
-                  timer.getInstructionId(), timer.getTransformId(), 
timer.getTimerFamilyId());
-          CompletableFuture<BiConsumer<ByteString, Boolean>> consumer = 
receiverFuture(key);
-          if (!consumer.isDone()) {
-            LOG.debug(
-                "Received data for key {} without consumer ready. "
-                    + "Waiting for consumer to be registered.",
-                key);
-          }
-          consumer.get().accept(timer.getTimers(), timer.getIsLast());
-          if (timer.getIsLast()) {
-            consumers.remove(key);
-          }
-          /*
-           * TODO: On failure we should fail any bundles that were impacted 
eagerly
-           * instead of relying on the Runner harness to do all the failure 
handling.
-           */
-        } catch (ExecutionException | InterruptedException e) {
-          LOG.error(
-              "Client interrupted during handling of timer for instruction {}, 
transform {}, and timer family {}",
-              timer.getInstructionId(),
-              timer.getTransformId(),
-              timer.getTimerFamilyId(),
-              e);
-          outboundObserver.onError(e);
-        } catch (RuntimeException e) {
-          LOG.error(
-              "Client failed to handle timer for instruction {}, transform {}, 
and timer family {}",
-              timer.getInstructionId(),
-              timer.getTransformId(),
-              timer.getTimerFamilyId(),
-              e);
-          outboundObserver.onError(e);
-        }
-      }
-    }
-
-    @Override
-    public void onError(Throwable t) {
-      LOG.error(
-          "Failed to handle for {}",
-          apiServiceDescriptor == null ? "unknown endpoint" : 
apiServiceDescriptor,
-          t);
-    }
-
-    @Override
-    public void onCompleted() {
-      LOG.warn(
-          "Hanged up for {}.",
-          apiServiceDescriptor == null ? "unknown endpoint" : 
apiServiceDescriptor);
-    }
-  }
-}
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
index 3b80f272a71..bc552dce08d 100644
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java
@@ -253,7 +253,6 @@ public class BeamFnDataGrpcMultiplexer2 implements 
AutoCloseable {
           "Failed to handle for {}",
           apiServiceDescriptor == null ? "unknown endpoint" : 
apiServiceDescriptor,
           t);
-      outboundObserver.onCompleted();
     }
 
     @Override
@@ -261,7 +260,6 @@ public class BeamFnDataGrpcMultiplexer2 implements 
AutoCloseable {
       LOG.warn(
           "Hanged up for {}.",
           apiServiceDescriptor == null ? "unknown endpoint" : 
apiServiceDescriptor);
-      outboundObserver.onCompleted();
     }
   }
 }
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.java
deleted file mode 100644
index 324113be4c2..00000000000
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.fn.data;
-
-import java.util.function.BiConsumer;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Decodes individually consumed {@link ByteString}s with the provided {@link 
Coder} passing the
- * individual decoded elements to the provided consumer.
- *
- * @deprecated Migrate to {@link BeamFnDataInboundObserver2}.
- */
-@Deprecated
-public class BeamFnDataInboundObserver
-    implements BiConsumer<ByteString, Boolean>, InboundDataClient {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataInboundObserver.class);
-
-  public static BeamFnDataInboundObserver forConsumer(
-      LogicalEndpoint endpoint, FnDataReceiver<ByteString> receiver) {
-    return new BeamFnDataInboundObserver(
-        endpoint, receiver, CompletableFutureInboundDataClient.create());
-  }
-
-  private final LogicalEndpoint endpoint;
-  private final FnDataReceiver<ByteString> consumer;
-  private final InboundDataClient readFuture;
-  private long byteCounter;
-
-  public BeamFnDataInboundObserver(
-      LogicalEndpoint endpoint, FnDataReceiver<ByteString> consumer, 
InboundDataClient readFuture) {
-    this.endpoint = endpoint;
-    this.consumer = consumer;
-    this.readFuture = readFuture;
-  }
-
-  @Override
-  public void accept(ByteString payload, Boolean isLast) {
-    if (readFuture.isDone()) {
-      // Drop any incoming data if the stream processing has finished.
-      return;
-    }
-    try {
-      if (isLast) {
-        LOG.debug("Closing stream for {} having consumed {} bytes", endpoint, 
byteCounter);
-        readFuture.complete();
-        return;
-      }
-
-      byteCounter += payload.size();
-      consumer.accept(payload);
-    } catch (Exception e) {
-      readFuture.fail(e);
-    }
-  }
-
-  @Override
-  public void awaitCompletion() throws Exception {
-    readFuture.awaitCompletion();
-  }
-
-  @Override
-  public void runWhenComplete(Runnable completeRunnable) {
-    readFuture.runWhenComplete(completeRunnable);
-  }
-
-  @Override
-  public boolean isDone() {
-    return readFuture.isDone();
-  }
-
-  @Override
-  public void cancel() {
-    readFuture.cancel();
-  }
-
-  @Override
-  public void complete() {
-    readFuture.complete();
-  }
-
-  @Override
-  public void fail(Throwable t) {
-    readFuture.fail(t);
-  }
-}
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java
deleted file mode 100644
index 2fca907940d..00000000000
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.fn.data;
-
-import java.io.IOException;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.StreamObserver;
-
-/**
- * An outbound {@link FnDataReceiver} for the Beam Fn Data API.
- *
- * <p>TODO: Handle outputting large elements (&gt; 2GiBs). Note that this also 
applies to the input
- * side as well.
- *
- * <p>TODO: Handle outputting elements that are zero bytes by outputting a 
single byte as a marker,
- * detect on the input side that no bytes were read and force reading a single 
byte.
- *
- * @deprecated Migrate to use {@link BeamFnDataOutboundAggregator} directly.
- */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Deprecated
-public class BeamFnDataOutboundObserver<T> implements 
CloseableFnDataReceiver<T> {
-
-  private boolean closed;
-  private final BeamFnDataOutboundAggregator aggregator;
-  private final FnDataReceiver<T> dataReceiver;
-
-  public BeamFnDataOutboundObserver(
-      LogicalEndpoint outputLocation,
-      Coder<T> coder,
-      StreamObserver<Elements> outboundObserver,
-      PipelineOptions options) {
-    this.aggregator =
-        new BeamFnDataOutboundAggregator(
-            options, outputLocation::getInstructionId, outboundObserver, 
false);
-    this.dataReceiver =
-        outputLocation.isTimer()
-            ? (FnDataReceiver<T>)
-                this.aggregator.registerOutputTimersLocation(
-                    outputLocation.getTransformId(),
-                    outputLocation.getTimerFamilyId(),
-                    (Coder<Object>) coder)
-            : (FnDataReceiver<T>)
-                aggregator.registerOutputDataLocation(
-                    outputLocation.getTransformId(), (Coder<Object>) coder);
-    aggregator.start();
-    this.closed = false;
-  }
-
-  @Override
-  public void close() throws Exception {
-    this.aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams();
-    this.closed = true;
-  }
-
-  @Override
-  public void flush() throws IOException {
-    aggregator.flush();
-  }
-
-  @Override
-  public void accept(T t) throws Exception {
-    if (closed) {
-      throw new IllegalStateException("Already closed.");
-    }
-    dataReceiver.accept(t);
-  }
-}
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClient.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClient.java
deleted file mode 100644
index 73b68222cfa..00000000000
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClient.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.fn.data;
-
-import java.util.concurrent.CompletableFuture;
-
-/** An {@link InboundDataClient} backed by a {@link CompletableFuture}. */
-public class CompletableFutureInboundDataClient implements InboundDataClient {
-  private static final Object COMPLETED = new Object();
-  /**
-   * Create a new {@link CompletableFutureInboundDataClient} using a new 
{@link CompletableFuture}.
-   */
-  public static InboundDataClient create() {
-    return forBackingFuture(new CompletableFuture<>());
-  }
-
-  /**
-   * Create a new {@link CompletableFutureInboundDataClient} wrapping the 
provided {@link
-   * CompletableFuture}.
-   */
-  static InboundDataClient forBackingFuture(CompletableFuture<Object> future) {
-    return new CompletableFutureInboundDataClient(future);
-  }
-
-  private final CompletableFuture<Object> future;
-
-  private CompletableFutureInboundDataClient(CompletableFuture<Object> future) 
{
-    this.future = future;
-  }
-
-  @Override
-  public void awaitCompletion() throws Exception {
-    future.get();
-  }
-
-  @Override
-  @SuppressWarnings("FutureReturnValueIgnored")
-  public void runWhenComplete(Runnable completeRunnable) {
-    future.whenComplete((result, throwable) -> completeRunnable.run());
-  }
-
-  @Override
-  public boolean isDone() {
-    return future.isDone();
-  }
-
-  @Override
-  public void cancel() {
-    future.cancel(true);
-  }
-
-  @Override
-  public void complete() {
-    future.complete(COMPLETED);
-  }
-
-  @Override
-  public void fail(Throwable t) {
-    // Use obtrudeException instead of CompleteExceptionally, forcing any 
future calls to .get()
-    // to raise the execption, even if the future is already compelted.
-    future.obtrudeException(t);
-  }
-}
diff --git 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexerTest.java
 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexerTest.java
deleted file mode 100644
index c82fd98bde6..00000000000
--- 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexerTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.fn.data;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.pipeline.v1.Endpoints;
-import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
-import org.apache.beam.sdk.fn.test.TestStreams;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
-import org.junit.Test;
-
-/** Tests for {@link BeamFnDataGrpcMultiplexer}. */
-public class BeamFnDataGrpcMultiplexerTest {
-  private static final Endpoints.ApiServiceDescriptor DESCRIPTOR =
-      Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build();
-  private static final LogicalEndpoint DATA_LOCATION = 
LogicalEndpoint.data("777L", "888L");
-  private static final LogicalEndpoint TIMER_LOCATION =
-      LogicalEndpoint.timer("999L", "555L", "333L");
-  private static final BeamFnApi.Elements ELEMENTS =
-      BeamFnApi.Elements.newBuilder()
-          .addData(
-              BeamFnApi.Elements.Data.newBuilder()
-                  .setInstructionId(DATA_LOCATION.getInstructionId())
-                  .setTransformId(DATA_LOCATION.getTransformId())
-                  .setData(ByteString.copyFrom(new byte[1])))
-          .addTimers(
-              BeamFnApi.Elements.Timers.newBuilder()
-                  .setInstructionId(TIMER_LOCATION.getInstructionId())
-                  .setTransformId(TIMER_LOCATION.getTransformId())
-                  .setTimerFamilyId(TIMER_LOCATION.getTimerFamilyId())
-                  .setTimers(ByteString.copyFrom(new byte[2])))
-          .build();
-  private static final BeamFnApi.Elements TERMINAL_ELEMENTS =
-      BeamFnApi.Elements.newBuilder()
-          .addData(
-              BeamFnApi.Elements.Data.newBuilder()
-                  .setInstructionId(DATA_LOCATION.getInstructionId())
-                  .setTransformId(DATA_LOCATION.getTransformId())
-                  .setIsLast(true))
-          .addTimers(
-              BeamFnApi.Elements.Timers.newBuilder()
-                  .setInstructionId(TIMER_LOCATION.getInstructionId())
-                  .setTransformId(TIMER_LOCATION.getTransformId())
-                  .setTimerFamilyId(TIMER_LOCATION.getTimerFamilyId())
-                  .setIsLast(true))
-          .build();
-
-  @Test
-  public void testOutboundObserver() {
-    final Collection<BeamFnApi.Elements> values = new ArrayList<>();
-    BeamFnDataGrpcMultiplexer multiplexer =
-        new BeamFnDataGrpcMultiplexer(
-            DESCRIPTOR,
-            OutboundObserverFactory.clientDirect(),
-            inboundObserver -> TestStreams.withOnNext(values::add).build());
-    multiplexer.getOutboundObserver().onNext(ELEMENTS);
-    assertThat(values, contains(ELEMENTS));
-  }
-
-  @Test
-  public void testInboundObserverBlocksTillConsumerConnects() throws Exception 
{
-    final Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>();
-    final Collection<KV<ByteString, Boolean>> dataInboundValues = new 
ArrayList<>();
-    final Collection<KV<ByteString, Boolean>> timerInboundValues = new 
ArrayList<>();
-    final BeamFnDataGrpcMultiplexer multiplexer =
-        new BeamFnDataGrpcMultiplexer(
-            DESCRIPTOR,
-            OutboundObserverFactory.clientDirect(),
-            inboundObserver -> 
TestStreams.withOnNext(outboundValues::add).build());
-    ExecutorService executorService = Executors.newCachedThreadPool();
-    executorService
-        .submit(
-            () -> {
-              // Purposefully sleep to simulate a delay in a consumer 
connecting.
-              Uninterruptibles.sleepUninterruptibly(100, 
TimeUnit.MILLISECONDS);
-              multiplexer.registerConsumer(
-                  DATA_LOCATION,
-                  (payload, isLast) -> dataInboundValues.add(KV.of(payload, 
isLast)));
-              multiplexer.registerConsumer(
-                  TIMER_LOCATION,
-                  (payload, isLast) -> timerInboundValues.add(KV.of(payload, 
isLast)));
-            })
-        .get();
-    multiplexer.getInboundObserver().onNext(ELEMENTS);
-    assertTrue(multiplexer.hasConsumer(DATA_LOCATION));
-    assertTrue(multiplexer.hasConsumer(TIMER_LOCATION));
-    // Ensure that when we see a terminal Elements object, we remove the 
consumer
-    multiplexer.getInboundObserver().onNext(TERMINAL_ELEMENTS);
-    assertFalse(multiplexer.hasConsumer(DATA_LOCATION));
-    assertFalse(multiplexer.hasConsumer(TIMER_LOCATION));
-
-    // Assert that normal and terminal Elements are passed to the consumer
-    assertThat(
-        dataInboundValues,
-        contains(KV.of(ELEMENTS.getData(0).getData(), false), 
KV.of(ByteString.EMPTY, true)));
-    assertThat(
-        timerInboundValues,
-        contains(KV.of(ELEMENTS.getTimers(0).getTimers(), false), 
KV.of(ByteString.EMPTY, true)));
-  }
-}
diff --git 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
deleted file mode 100644
index cf6747c9754..00000000000
--- 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.fn.data;
-
-import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.ExecutionException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.ByteStringOutputStream;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link BeamFnDataInboundObserver}. */
-@RunWith(JUnit4.class)
-public class BeamFnDataInboundObserverTest {
-  private static final Coder<WindowedValue<String>> CODER =
-      WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE);
-  private static final LogicalEndpoint DATA_ENDPOINT = 
LogicalEndpoint.data("777L", "999L");
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testDecodingElements() throws Exception {
-    Collection<WindowedValue<String>> values = new ArrayList<>();
-    InboundDataClient readFuture = CompletableFutureInboundDataClient.create();
-    BeamFnDataInboundObserver observer =
-        new BeamFnDataInboundObserver(
-            DATA_ENDPOINT, DecodingFnDataReceiver.create(CODER, values::add), 
readFuture);
-
-    // Test decoding multiple messages
-    observer.accept(dataWith("ABC", "DEF", "GHI"), false);
-    assertThat(
-        values,
-        contains(
-            valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"), 
valueInGlobalWindow("GHI")));
-    values.clear();
-
-    // Test empty message signaling end of stream
-    assertFalse(readFuture.isDone());
-    observer.accept(ByteString.EMPTY, true);
-    assertTrue(readFuture.isDone());
-
-    // Test messages after stream is finished are discarded
-    observer.accept(dataWith("ABC", "DEF", "GHI"), false);
-    assertThat(values, empty());
-  }
-
-  @Test
-  public void testConsumptionFailureCompletesReadFutureAndDiscardsMessages() 
throws Exception {
-    InboundDataClient readClient = CompletableFutureInboundDataClient.create();
-    BeamFnDataInboundObserver observer =
-        new BeamFnDataInboundObserver(
-            DATA_ENDPOINT, DecodingFnDataReceiver.create(CODER, 
this::throwOnDefValue), readClient);
-
-    assertFalse(readClient.isDone());
-    observer.accept(dataWith("ABC", "DEF", "GHI"), false);
-    assertTrue(readClient.isDone());
-
-    thrown.expect(ExecutionException.class);
-    thrown.expectCause(instanceOf(RuntimeException.class));
-    thrown.expectMessage("Failure");
-    readClient.awaitCompletion();
-  }
-
-  private void throwOnDefValue(WindowedValue<String> value) {
-    if ("DEF".equals(value.getValue())) {
-      throw new RuntimeException("Failure");
-    }
-  }
-
-  private ByteString dataWith(String... values) throws Exception {
-    ByteStringOutputStream output = new ByteStringOutputStream();
-    for (String value : values) {
-      CODER.encode(valueInGlobalWindow(value), output);
-    }
-    return output.toByteString();
-  }
-}
diff --git 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClientTest.java
 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClientTest.java
deleted file mode 100644
index b11dc5d78db..00000000000
--- 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClientTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.fn.data;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.fail;
-
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link CompletableFutureInboundDataClient}. */
-@RunWith(JUnit4.class)
-public class CompletableFutureInboundDataClientTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testComplete() throws Exception {
-    InboundDataClient client = CompletableFutureInboundDataClient.create();
-
-    assertThat(client.isDone(), is(false));
-
-    client.complete();
-
-    assertThat(client.isDone(), is(true));
-    // Should return immediately
-    client.awaitCompletion();
-  }
-
-  @Test
-  public void testCanceled() throws Exception {
-    InboundDataClient client = CompletableFutureInboundDataClient.create();
-
-    assertThat(client.isDone(), is(false));
-
-    client.cancel();
-
-    assertThat(client.isDone(), is(true));
-
-    thrown.expect(CancellationException.class);
-    // Should return immediately
-    client.awaitCompletion();
-  }
-
-  @Test
-  public void testFailed() throws Exception {
-    InboundDataClient client = CompletableFutureInboundDataClient.create();
-
-    assertThat(client.isDone(), is(false));
-
-    client.fail(new UnsupportedOperationException("message"));
-
-    assertThat(client.isDone(), is(true));
-
-    thrown.expect(ExecutionException.class);
-    thrown.expectCause(isA(UnsupportedOperationException.class));
-    thrown.expectMessage("message");
-    client.awaitCompletion();
-  }
-
-  @Test
-  public void testCompleteMultithreaded() throws Exception {
-    InboundDataClient client = CompletableFutureInboundDataClient.create();
-    Future<Void> waitingFuture =
-        Executors.newSingleThreadExecutor()
-            .submit(
-                () -> {
-                  client.awaitCompletion();
-                  return null;
-                });
-
-    try {
-      waitingFuture.get(50, TimeUnit.MILLISECONDS);
-      fail();
-    } catch (TimeoutException expected) {
-      // This should time out, as the client should never complete without 
external completion
-    }
-
-    client.complete();
-    // Blocks forever if the thread does not continue
-    waitingFuture.get();
-  }
-
-  @Test
-  public void testCompleteBackingFuture() throws Exception {
-    CompletableFuture<Object> future = new CompletableFuture<>();
-    InboundDataClient client = 
CompletableFutureInboundDataClient.forBackingFuture(future);
-
-    assertThat(future.isDone(), is(false));
-    assertThat(client.isDone(), is(false));
-
-    client.complete();
-
-    assertThat(future.isDone(), is(true));
-    assertThat(client.isDone(), is(true));
-    // Should return immediately
-    client.awaitCompletion();
-  }
-
-  @Test
-  public void testCancelBackingFuture() throws Exception {
-    CompletableFuture<Object> future = new CompletableFuture<>();
-    InboundDataClient client = 
CompletableFutureInboundDataClient.forBackingFuture(future);
-
-    assertThat(future.isDone(), is(false));
-    assertThat(client.isDone(), is(false));
-
-    client.cancel();
-
-    assertThat(future.isDone(), is(true));
-    assertThat(client.isDone(), is(true));
-    assertThat(future.isCancelled(), is(true));
-    thrown.expect(CancellationException.class);
-    // Should return immediately
-    future.get();
-  }
-
-  @Test
-  public void testFailBackingFuture() throws Exception {
-    CompletableFuture<Object> future = new CompletableFuture<>();
-    InboundDataClient client = 
CompletableFutureInboundDataClient.forBackingFuture(future);
-
-    assertThat(future.isDone(), is(false));
-    assertThat(client.isDone(), is(false));
-
-    client.fail(new UnsupportedOperationException("message"));
-
-    assertThat(client.isDone(), is(true));
-    assertThat(future.isDone(), is(true));
-    assertThat(future.isCompletedExceptionally(), is(true));
-
-    thrown.expect(ExecutionException.class);
-    thrown.expectCause(isA(UnsupportedOperationException.class));
-    thrown.expectMessage("message");
-    client.awaitCompletion();
-  }
-}
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
index 5ef1ed836e6..75f3a24301c 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
@@ -33,12 +33,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver;
  */
 public interface BeamFnDataClient {
   /**
-   * Registers the following inbound receiver for the provided instruction id.
-   *
-   * <p>The provided coder is used to decode inbound elements. The decoded 
elements are passed to
-   * the provided receiver. Any failure during decoding or processing of the 
element will complete
-   * the returned future exceptionally. On successful termination of the 
stream, the returned future
-   * is completed successfully.
+   * Registers a receiver for the provided instruction id.
    *
    * <p>The receiver is not required to be thread safe.
    *
@@ -69,7 +64,7 @@ public interface BeamFnDataClient {
    * timers over the data plane. It is important that {@link
    * 
BeamFnDataOutboundAggregator#sendOrCollectBufferedDataAndFinishOutboundStreams()}
 is called on
    * the returned BeamFnDataOutboundAggregator at the end of each bundle. If
-   * collectElementsIfNoFlushes is set to true, {@link *
+   * collectElementsIfNoFlushes is set to true, {@link
    * 
BeamFnDataOutboundAggregator#sendOrCollectBufferedDataAndFinishOutboundStreams()}
 returns the
    * buffered elements instead of sending it through the outbound 
StreamObserver if there's no
    * previous flush.

Reply via email to