Repository: beam
Updated Branches:
  refs/heads/master 4f56acbba -> 2c71354d0


Use strings for ids in Fn API


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9933f271
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9933f271
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9933f271

Branch: refs/heads/master
Commit: 9933f27140ddfe5b9ded4a0688a9c0506ef94113
Parents: 4f56acb
Author: Kenneth Knowles <k...@google.com>
Authored: Sun Feb 12 22:23:32 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Thu Feb 23 17:35:22 2017 -0800

----------------------------------------------------------------------
 .../fn-api/src/main/proto/beam_fn_api.proto     | 48 +++++-----
 .../fn/harness/control/BeamFnControlClient.java |  3 +-
 .../harness/control/ProcessBundleHandler.java   |  8 +-
 .../fn/harness/control/RegisterHandler.java     |  8 +-
 .../BeamFnDataBufferingOutboundObserver.java    |  4 +-
 .../beam/fn/harness/data/BeamFnDataClient.java  |  4 +-
 .../fn/harness/data/BeamFnDataGrpcClient.java   |  4 +-
 .../harness/data/BeamFnDataGrpcMultiplexer.java | 11 +--
 .../fn/harness/logging/BeamFnLoggingClient.java |  4 +-
 .../beam/runners/core/BeamFnDataReadRunner.java |  4 +-
 .../runners/core/BeamFnDataWriteRunner.java     |  4 +-
 .../apache/beam/fn/harness/FnHarnessTest.java   |  8 +-
 .../control/BeamFnControlClientTest.java        | 12 +--
 .../control/ProcessBundleHandlerTest.java       | 95 ++++++++++----------
 .../fn/harness/control/RegisterHandlerTest.java | 18 ++--
 ...BeamFnDataBufferingOutboundObserverTest.java |  9 +-
 .../harness/data/BeamFnDataGrpcClientTest.java  | 21 +++--
 .../data/BeamFnDataGrpcMultiplexerTest.java     | 12 +--
 .../data/BeamFnDataInboundObserverTest.java     |  4 +-
 .../runners/core/BeamFnDataReadRunnerTest.java  |  8 +-
 .../runners/core/BeamFnDataWriteRunnerTest.java |  8 +-
 21 files changed, 159 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto 
b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index 3ac0fbf..80bae2e 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -59,7 +59,7 @@ message Target {
   }
 
   // (Required) The id of the PrimitiveTransform which is the target.
-  int64 primitive_transform_reference = 1;
+  string primitive_transform_reference = 1;
 
   // (Required) The local name of an input or output defined on the primitive
   // transform.
@@ -69,7 +69,7 @@ message Target {
 // Information defining a PCollection
 message PCollection {
   // (Required) A reference to a coder.
-  int64 coder_reference = 1;
+  string coder_reference = 1;
 
   // TODO: Windowing strategy, ...
 }
@@ -78,7 +78,7 @@ message PCollection {
 message PrimitiveTransform {
   // (Required) A pipeline level unique id which can be used as a reference to
   // refer to this.
-  int64 id = 1;
+  string id = 1;
 
   // (Required) A function spec that is used by this primitive
   // transform to process data.
@@ -117,7 +117,7 @@ message PrimitiveTransform {
 message FunctionSpec {
   // (Required) A pipeline level unique id which can be used as a reference to
   // refer to this.
-  int64 id = 1;
+  string id = 1;
 
   // (Required) A globally unique name representing this user definable
   // function.
@@ -131,7 +131,7 @@ message FunctionSpec {
 
   // (Required) Reference to specification of execution environment required to
   // invoke this function.
-  int64 environment_reference = 3;
+  string environment_reference = 3;
 
   // Data used to parameterize this function. Depending on the urn, this may be
   // optional or required.
@@ -179,7 +179,7 @@ message Coder {
   //
   // TODO: Perhaps this is redundant with the data of the FunctionSpec
   // for known coders?
-  repeated int64 component_coder_reference = 2;
+  repeated string component_coder_reference = 2;
 }
 
 // A descriptor for connecting to a remote port using the Beam Fn Data API.
@@ -218,7 +218,7 @@ service BeamFnControl {
 message InstructionRequest {
   // (Required) An unique identifier provided by the runner which represents
   // this requests execution. The InstructionResponse MUST have the matching 
id.
-  int64 instruction_id = 1;
+  string instruction_id = 1;
 
   // (Required) A request that the SDK Harness needs to interpret.
   oneof request {
@@ -235,7 +235,7 @@ message InstructionResponse {
   // (Required) A reference provided by the runner which represents a requests
   // execution. The InstructionResponse MUST have the matching id when
   // responding to the runner.
-  int64 instruction_id = 1;
+  string instruction_id = 1;
 
   // If this is specified, then this instruction has failed.
   // A human readable string representing the reason as to why processing has
@@ -269,7 +269,7 @@ message RegisterResponse {
 message ProcessBundleDescriptor {
   // (Required) A pipeline level unique id which can be used as a reference to
   // refer to this.
-  int64 id = 1;
+  string id = 1;
 
   // (Required) A list of primitive transforms that should
   // be used to construct the bundle processing graph.
@@ -282,7 +282,7 @@ message ProcessBundleDescriptor {
 // A request to process a given bundle.
 // Stable
 message ProcessBundleRequest {
-  int64 process_bundle_descriptor_reference = 1;
+  string process_bundle_descriptor_reference = 1;
 }
 
 // Stable
@@ -292,7 +292,7 @@ message ProcessBundleResponse {
 message ProcessBundleProgressRequest {
   // (Required) A reference to an active process bundle request with the given
   // instruction id.
-  int64 instruction_reference = 1;
+  string instruction_reference = 1;
 }
 
 message ProcessBundleProgressResponse {
@@ -309,7 +309,7 @@ message ProcessBundleProgressResponse {
 message ProcessBundleSplitRequest {
   // (Required) A reference to an active process bundle request with the given
   // instruction id.
-  int64 instruction_reference = 1;
+  string instruction_reference = 1;
 
   // (Required) The fraction of work (when compared to the known amount of 
work)
   // the process bundle request should try to split at.
@@ -344,7 +344,7 @@ message PrimitiveTransformSplit {
   // (Required) A reference to a primitive transform with the given id that
   // is part of the active process bundle request with the given instruction
   // id.
-  int64 primitive_transform_reference = 1;
+  string primitive_transform_reference = 1;
 
   // (Required) A function specification describing the restriction
   // that has been completed by the primitive transform.
@@ -425,7 +425,7 @@ message Elements {
   message Data {
     // (Required) A reference to an active instruction request with the given
     // instruction id.
-    int64 instruction_reference = 1;
+    string instruction_reference = 1;
 
     // (Required) A definition representing a consumer or producer of this 
data.
     // If received by a harness, this represents the consumer within that
@@ -475,12 +475,12 @@ service BeamFnData {
 message StateRequest {
   // (Required) An unique identifier provided by the SDK which represents this
   // requests execution. The StateResponse must have the matching id.
-  int64 id = 1;
+  string id = 1;
 
   // (Required) The associated instruction id of the work that is currently
   // being processed. This allows for the runner to associate any modifications
   // to state to be committed with the appropriate work execution.
-  int64 instruction_reference = 2;
+  string instruction_reference = 2;
 
   // At least one of the following fields should be populated.
   // Also, no request should use a state key referred to in another state key.
@@ -499,11 +499,11 @@ message StateResponse {
   // (Required) A reference provided by the SDK which represents a requests
   // execution. The StateResponse must have the matching id when responding
   // to the SDK.
-  int64 id = 1;
+  string id = 1;
 
   // (Required) The associated instruction id of the work that is currently
   // being processed.
-  int64 instruction_reference = 2;
+  string instruction_reference = 2;
 
   // (Required) A key to associate with the version of this state. Allows for
   // SDKs to share state across work items if they have the same cache key and
@@ -563,7 +563,7 @@ message StateKey {
   // (Required) Represents the namespace for the state. If this state is for a
   // DoFn, then this reference is expected to point to the DoFn. If this state
   // is for a side input, then this is expected to reference the ViewFn.
-  int64 function_spec_reference = 1;
+  string function_spec_reference = 1;
 
   // (Required) The bytes of the window which this state request is for encoded
   // in the outer context.
@@ -693,11 +693,11 @@ message LogEntry {
 
   // (Optional) A reference to the instruction this log statement is associated
   // with.
-  int64 instruction_reference = 5;
+  string instruction_reference = 5;
 
   // (Optional) A reference to the primitive transform this log statement is
   // associated with.
-  int64 primitive_transform_reference = 6;
+  string primitive_transform_reference = 6;
 
   // (Optional) Human-readable name of the function or method being invoked,
   // with optional context such as the class or package name. The format can
@@ -734,7 +734,7 @@ service BeamFnLogging {
 message ApiServiceDescriptor {
   // (Required) A pipeline level unique id which can be used as a reference to
   // refer to this.
-  int64 id = 1;
+  string id = 1;
 
   // (Required) The URL to connect to.
   string url = 2;
@@ -758,7 +758,7 @@ message OAuth2ClientCredentialsGrant {
 message DockerContainer {
   // (Required) A pipeline level unique id which can be used as a reference to
   // refer to this.
-  int64 id = 1;
+  string id = 1;
 
   // (Required) The Docker container URI
   // For example "dataflow.gcr.io/v1beta3/java-batch:1.5.1"
@@ -767,5 +767,5 @@ message DockerContainer {
   // (Optional) Docker registry specification.
   // If unspecified, the uri is expected to be able to be fetched without
   // requiring additional configuration by a runner.
-  int64 registry_reference = 3;
+  string registry_reference = 3;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
index 6d75315..e40bb2f 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
@@ -52,9 +52,10 @@ import org.slf4j.LoggerFactory;
  * {@link org.apache.beam.fn.v1.BeamFnApi.InstructionRequest}s.
  */
 public class BeamFnControlClient {
+  private static final String FAKE_INSTRUCTION_ID = "FAKE_INSTRUCTION_ID";
   private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnControlClient.class);
   private static final BeamFnApi.InstructionRequest POISON_PILL =
-      
BeamFnApi.InstructionRequest.newBuilder().setInstructionId(Long.MIN_VALUE).build();
+      
BeamFnApi.InstructionRequest.newBuilder().setInstructionId(FAKE_INSTRUCTION_ID).build();
 
   private final StreamObserver<BeamFnApi.InstructionResponse> outboundObserver;
   private final BlockingDeque<BeamFnApi.InstructionRequest> 
bufferedInstructions;

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index d764a95..1f82085 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -85,12 +85,12 @@ public class ProcessBundleHandler {
   private static final Logger LOG = 
LoggerFactory.getLogger(ProcessBundleHandler.class);
 
   private final PipelineOptions options;
-  private final Function<Long, Message> fnApiRegistry;
+  private final Function<String, Message> fnApiRegistry;
   private final BeamFnDataClient beamFnDataClient;
 
   public ProcessBundleHandler(
       PipelineOptions options,
-      Function<Long, Message> fnApiRegistry,
+      Function<String, Message> fnApiRegistry,
       BeamFnDataClient beamFnDataClient) {
     this.options = options;
     this.fnApiRegistry = fnApiRegistry;
@@ -99,7 +99,7 @@ public class ProcessBundleHandler {
 
   protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
       BeamFnApi.PrimitiveTransform primitiveTransform,
-      Supplier<Long> processBundleInstructionId,
+      Supplier<String> processBundleInstructionId,
       Function<BeamFnApi.Target, 
Collection<ThrowingConsumer<WindowedValue<OutputT>>>> consumers,
       BiConsumer<BeamFnApi.Target, ThrowingConsumer<WindowedValue<InputT>>> 
addConsumer,
       Consumer<ThrowingRunnable> addStartFunction,
@@ -209,7 +209,7 @@ public class ProcessBundleHandler {
         BeamFnApi.InstructionResponse.newBuilder()
             
.setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance());
 
-    long bundleId = 
request.getProcessBundle().getProcessBundleDescriptorReference();
+    String bundleId = 
request.getProcessBundle().getProcessBundleDescriptorReference();
     BeamFnApi.ProcessBundleDescriptor bundleDescriptor =
         (BeamFnApi.ProcessBundleDescriptor) fnApiRegistry.apply(bundleId);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
index be75b50..fb06231 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
@@ -37,13 +37,13 @@ import org.slf4j.LoggerFactory;
  */
 public class RegisterHandler {
   private static final Logger LOG = 
LoggerFactory.getLogger(RegisterHandler.class);
-  private final ConcurrentMap<Long, CompletableFuture<Message>> idToObject;
+  private final ConcurrentMap<String, CompletableFuture<Message>> idToObject;
 
   public RegisterHandler() {
     idToObject = new ConcurrentHashMap<>();
   }
 
-  public <T extends Message> T getById(long id) {
+  public <T extends Message> T getById(String id) {
     try {
       @SuppressWarnings("unchecked")
       CompletableFuture<T> returnValue = (CompletableFuture<T>) 
computeIfAbsent(id);
@@ -86,7 +86,7 @@ public class RegisterHandler {
     return response;
   }
 
-  private CompletableFuture<Message> computeIfAbsent(long id) {
-    return idToObject.computeIfAbsent(id, (Long ignored) -> new 
CompletableFuture<>());
+  private CompletableFuture<Message> computeIfAbsent(String id) {
+    return idToObject.computeIfAbsent(id, (String ignored) -> new 
CompletableFuture<>());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
index 25560ef..18e0d95 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserver.java
@@ -61,13 +61,13 @@ public class BeamFnDataBufferingOutboundObserver<T>
   private long counter;
   private final int bufferLimit;
   private final Coder<WindowedValue<T>> coder;
-  private final KV<Long, BeamFnApi.Target> outputLocation;
+  private final KV<String, BeamFnApi.Target> outputLocation;
   private final StreamObserver<BeamFnApi.Elements> outboundObserver;
   private final ByteString.Output bufferedElements;
 
   public BeamFnDataBufferingOutboundObserver(
       PipelineOptions options,
-      KV<Long, BeamFnApi.Target> outputLocation,
+      KV<String, BeamFnApi.Target> outputLocation,
       Coder<WindowedValue<T>> coder,
       StreamObserver<BeamFnApi.Elements> outboundObserver) {
     this.bufferLimit = getBufferLimit(options);

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
index 27b1acb..7be96b6 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java
@@ -44,7 +44,7 @@ public interface BeamFnDataClient {
    */
   <T> CompletableFuture<Void> forInboundConsumer(
       BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
-      KV<Long, BeamFnApi.Target> inputLocation,
+      KV<String, BeamFnApi.Target> inputLocation,
       Coder<WindowedValue<T>> coder,
       ThrowingConsumer<WindowedValue<T>> consumer);
 
@@ -59,6 +59,6 @@ public interface BeamFnDataClient {
    */
   <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer(
       BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
-      KV<Long, BeamFnApi.Target> outputLocation,
+      KV<String, BeamFnApi.Target> outputLocation,
       Coder<WindowedValue<T>> coder);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
index 8db1f48..4137cd7 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java
@@ -75,7 +75,7 @@ public class BeamFnDataGrpcClient implements BeamFnDataClient 
{
   @Override
   public <T> CompletableFuture<Void> forInboundConsumer(
       BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
-      KV<Long, BeamFnApi.Target> inputLocation,
+      KV<String, BeamFnApi.Target> inputLocation,
       Coder<WindowedValue<T>> coder,
       ThrowingConsumer<WindowedValue<T>> consumer) {
     LOG.debug("Registering consumer instruction {} for target {}",
@@ -102,7 +102,7 @@ public class BeamFnDataGrpcClient implements 
BeamFnDataClient {
   @Override
   public <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer(
       BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
-      KV<Long, BeamFnApi.Target> outputLocation,
+      KV<String, BeamFnApi.Target> outputLocation,
       Coder<WindowedValue<T>> coder) {
     BeamFnDataGrpcMultiplexer client = getClientFor(apiServiceDescriptor);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
index fe3a693..53dfe11 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java
@@ -50,8 +50,9 @@ public class BeamFnDataGrpcMultiplexer {
   private final StreamObserver<BeamFnApi.Elements> inboundObserver;
   private final StreamObserver<BeamFnApi.Elements> outboundObserver;
   @VisibleForTesting
-  final ConcurrentMap<KV<Long, BeamFnApi.Target>,
-                              
CompletableFuture<Consumer<BeamFnApi.Elements.Data>>> consumers;
+  final ConcurrentMap<
+          KV<String, BeamFnApi.Target>, 
CompletableFuture<Consumer<BeamFnApi.Elements.Data>>>
+      consumers;
 
   public BeamFnDataGrpcMultiplexer(
       BeamFnApi.ApiServiceDescriptor apiServiceDescriptor,
@@ -80,10 +81,10 @@ public class BeamFnDataGrpcMultiplexer {
   }
 
   public CompletableFuture<Consumer<BeamFnApi.Elements.Data>> futureForKey(
-      KV<Long, BeamFnApi.Target> key) {
+      KV<String, BeamFnApi.Target> key) {
     return consumers.computeIfAbsent(
         key,
-        (KV<Long, BeamFnApi.Target> providedKey) -> new CompletableFuture<>());
+        (KV<String, BeamFnApi.Target> providedKey) -> new 
CompletableFuture<>());
   }
 
   /**
@@ -99,7 +100,7 @@ public class BeamFnDataGrpcMultiplexer {
     public void onNext(BeamFnApi.Elements value) {
       for (BeamFnApi.Elements.Data data : value.getDataList()) {
         try {
-          KV<Long, BeamFnApi.Target> key =
+          KV<String, BeamFnApi.Target> key =
               KV.of(data.getInstructionReference(), data.getTarget());
           futureForKey(key).get().accept(data);
           if (data.getData().isEmpty()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index d74d9fa..e1ec03d 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -79,9 +79,11 @@ public class BeamFnLoggingClient implements AutoCloseable {
 
   private static final Formatter FORMATTER = new SimpleFormatter();
 
+  private static final String FAKE_INSTRUCTION_ID = "FAKE_INSTRUCTION_ID";
+
   /* Used to signal to a thread processing a queue to finish its work 
gracefully. */
   private static final BeamFnApi.LogEntry POISON_PILL =
-      
BeamFnApi.LogEntry.newBuilder().setInstructionReference(Long.MIN_VALUE).build();
+      
BeamFnApi.LogEntry.newBuilder().setInstructionReference(FAKE_INSTRUCTION_ID).build();
 
   /**
    * The number of log messages that will be buffered. Assuming log messages 
are at most 1 KiB,

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
index a6b8b33..034ef84 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
@@ -51,7 +51,7 @@ public class BeamFnDataReadRunner<OutputT> {
 
   private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor;
   private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers;
-  private final Supplier<Long> processBundleInstructionIdSupplier;
+  private final Supplier<String> processBundleInstructionIdSupplier;
   private final BeamFnDataClient beamFnDataClientFactory;
   private final Coder<WindowedValue<OutputT>> coder;
   private final BeamFnApi.Target inputTarget;
@@ -60,7 +60,7 @@ public class BeamFnDataReadRunner<OutputT> {
 
   public BeamFnDataReadRunner(
       BeamFnApi.FunctionSpec functionSpec,
-      Supplier<Long> processBundleInstructionIdSupplier,
+      Supplier<String> processBundleInstructionIdSupplier,
       BeamFnApi.Target inputTarget,
       BeamFnApi.Coder coderSpec,
       BeamFnDataClient beamFnDataClientFactory,

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
index 596afe5..54fd626 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
@@ -44,13 +44,13 @@ public class BeamFnDataWriteRunner<InputT> {
   private final BeamFnApi.Target outputTarget;
   private final Coder<WindowedValue<InputT>> coder;
   private final BeamFnDataClient beamFnDataClientFactory;
-  private final Supplier<Long> processBundleInstructionIdSupplier;
+  private final Supplier<String> processBundleInstructionIdSupplier;
 
   private CloseableThrowingConsumer<WindowedValue<InputT>> consumer;
 
   public BeamFnDataWriteRunner(
       BeamFnApi.FunctionSpec functionSpec,
-      Supplier<Long> processBundleInstructionIdSupplier,
+      Supplier<String> processBundleInstructionIdSupplier,
       BeamFnApi.Target outputTarget,
       BeamFnApi.Coder coderSpec,
       BeamFnDataClient beamFnDataClientFactory)

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
index ff05225..6a45647 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnHarnessTest.java
@@ -48,12 +48,12 @@ import org.junit.runners.JUnit4;
 public class FnHarnessTest {
   private static final BeamFnApi.InstructionRequest INSTRUCTION_REQUEST =
       BeamFnApi.InstructionRequest.newBuilder()
-      .setInstructionId(999L)
+      .setInstructionId("999L")
       .setRegister(BeamFnApi.RegisterRequest.getDefaultInstance())
       .build();
   private static final BeamFnApi.InstructionResponse INSTRUCTION_RESPONSE =
       BeamFnApi.InstructionResponse.newBuilder()
-      .setInstructionId(999L)
+      .setInstructionId("999L")
       .setRegister(BeamFnApi.RegisterResponse.getDefaultInstance())
       .build();
 
@@ -108,12 +108,12 @@ public class FnHarnessTest {
       try {
         BeamFnApi.ApiServiceDescriptor loggingDescriptor = 
BeamFnApi.ApiServiceDescriptor
             .newBuilder()
-            .setId(1L)
+            .setId("1L")
             .setUrl("localhost:" + loggingServer.getPort())
             .build();
         BeamFnApi.ApiServiceDescriptor controlDescriptor = 
BeamFnApi.ApiServiceDescriptor
             .newBuilder()
-            .setId(2L)
+            .setId("2L")
             .setUrl("localhost:" + controlServer.getPort())
             .build();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
index fc3af49..edb7903 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
@@ -51,33 +51,33 @@ import org.junit.runners.JUnit4;
 public class BeamFnControlClientTest {
   private static final BeamFnApi.InstructionRequest SUCCESSFUL_REQUEST =
       BeamFnApi.InstructionRequest.newBuilder()
-      .setInstructionId(1L)
+      .setInstructionId("1L")
       .setProcessBundle(BeamFnApi.ProcessBundleRequest.getDefaultInstance())
       .build();
   private static final BeamFnApi.InstructionResponse SUCCESSFUL_RESPONSE =
       BeamFnApi.InstructionResponse.newBuilder()
-      .setInstructionId(1L)
+      .setInstructionId("1L")
       .setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance())
       .build();
   private static final BeamFnApi.InstructionRequest UNKNOWN_HANDLER_REQUEST =
       BeamFnApi.InstructionRequest.newBuilder()
-      .setInstructionId(2L)
+      .setInstructionId("2L")
       .build();
   private static final BeamFnApi.InstructionResponse UNKNOWN_HANDLER_RESPONSE =
       BeamFnApi.InstructionResponse.newBuilder()
-      .setInstructionId(2L)
+      .setInstructionId("2L")
       .setError("Unknown InstructionRequest type "
           + BeamFnApi.InstructionRequest.RequestCase.REQUEST_NOT_SET)
       .build();
   private static final RuntimeException FAILURE = new 
RuntimeException("TestFailure");
   private static final BeamFnApi.InstructionRequest FAILURE_REQUEST =
       BeamFnApi.InstructionRequest.newBuilder()
-      .setInstructionId(3L)
+      .setInstructionId("3L")
       .setRegister(BeamFnApi.RegisterRequest.getDefaultInstance())
       .build();
   private static final BeamFnApi.InstructionResponse FAILURE_RESPONSE =
       BeamFnApi.InstructionResponse.newBuilder()
-      .setInstructionId(3L)
+      .setInstructionId("3L")
       .setError(getStackTraceAsString(FAILURE))
       .build();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 1d451b5..de105d7 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.beam.fn.harness.control;
 import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -93,11 +94,11 @@ public class ProcessBundleHandlerTest {
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private static final Coder<WindowedValue<String>> STRING_CODER =
       WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE);
-  private static final long LONG_CODER_SPEC_ID = 998L;
-  private static final long STRING_CODER_SPEC_ID = 999L;
+  private static final String LONG_CODER_SPEC_ID = "998L";
+  private static final String STRING_CODER_SPEC_ID = "999L";
   private static final BeamFnApi.RemoteGrpcPort REMOTE_PORT = 
BeamFnApi.RemoteGrpcPort.newBuilder()
       .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.newBuilder()
-          .setId(58L)
+          .setId("58L")
           .setUrl("TestUrl"))
       .build();
   private static final BeamFnApi.Coder LONG_CODER_SPEC;
@@ -141,10 +142,10 @@ public class ProcessBundleHandlerTest {
   public void testOrderOfStartAndFinishCalls() throws Exception {
     BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
         BeamFnApi.ProcessBundleDescriptor.newBuilder()
-        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(2L))
-        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(3L))
+        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L"))
+        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L"))
         .build();
-    Map<Long, Message> fnApiRegistry = ImmutableMap.of(1L, 
processBundleDescriptor);
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", 
processBundleDescriptor);
 
     List<BeamFnApi.PrimitiveTransform> transformsProcessed = new ArrayList<>();
     List<String> orderOfOperations = new ArrayList<>();
@@ -156,7 +157,7 @@ public class ProcessBundleHandlerTest {
       @Override
       protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
           BeamFnApi.PrimitiveTransform primitiveTransform,
-          Supplier<Long> processBundleInstructionId,
+          Supplier<String> processBundleInstructionId,
           Function<BeamFnApi.Target,
                    Collection<ThrowingConsumer<WindowedValue<OutputT>>>> 
consumers,
           BiConsumer<BeamFnApi.Target, 
ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
@@ -164,7 +165,7 @@ public class ProcessBundleHandlerTest {
           Consumer<ThrowingRunnable> addFinishFunction)
           throws IOException {
 
-        assertEquals((Long) 999L, processBundleInstructionId.get());
+        assertThat(processBundleInstructionId.get(), equalTo("999L"));
 
         transformsProcessed.add(primitiveTransform);
         addStartFunction.accept(
@@ -174,9 +175,9 @@ public class ProcessBundleHandlerTest {
       }
     };
     handler.processBundle(BeamFnApi.InstructionRequest.newBuilder()
-        .setInstructionId(999L)
+        .setInstructionId("999L")
         .setProcessBundle(
-            
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(1L))
+            
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L"))
         .build());
 
     // Processing of primitive transforms is performed in reverse order.
@@ -184,17 +185,17 @@ public class ProcessBundleHandlerTest {
         processBundleDescriptor.getPrimitiveTransform(1),
         processBundleDescriptor.getPrimitiveTransform(0)));
     // Start should occur in reverse order while finish calls should occur in 
forward order
-    assertThat(orderOfOperations, contains("Start3", "Start2", "Finish2", 
"Finish3"));
+    assertThat(orderOfOperations, contains("Start3L", "Start2L", "Finish2L", 
"Finish3L"));
   }
 
   @Test
   public void testCreatingPrimitiveTransformExceptionsArePropagated() throws 
Exception {
     BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
         BeamFnApi.ProcessBundleDescriptor.newBuilder()
-        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(2L))
-        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(3L))
+        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L"))
+        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L"))
         .build();
-    Map<Long, Message> fnApiRegistry = ImmutableMap.of(1L, 
processBundleDescriptor);
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", 
processBundleDescriptor);
 
     ProcessBundleHandler handler = new ProcessBundleHandler(
         PipelineOptionsFactory.create(),
@@ -203,7 +204,7 @@ public class ProcessBundleHandlerTest {
       @Override
       protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
           BeamFnApi.PrimitiveTransform primitiveTransform,
-          Supplier<Long> processBundleInstructionId,
+          Supplier<String> processBundleInstructionId,
           Function<BeamFnApi.Target,
                    Collection<ThrowingConsumer<WindowedValue<OutputT>>>> 
consumers,
           BiConsumer<BeamFnApi.Target, 
ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
@@ -217,7 +218,7 @@ public class ProcessBundleHandlerTest {
     };
     handler.processBundle(
         BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(
-            
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(1L))
+            
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L"))
         .build());
   }
 
@@ -225,10 +226,10 @@ public class ProcessBundleHandlerTest {
   public void testPrimitiveTransformStartExceptionsArePropagated() throws 
Exception {
     BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
         BeamFnApi.ProcessBundleDescriptor.newBuilder()
-        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(2L))
-        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(3L))
+        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L"))
+        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L"))
         .build();
-    Map<Long, Message> fnApiRegistry = ImmutableMap.of(1L, 
processBundleDescriptor);
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", 
processBundleDescriptor);
 
     ProcessBundleHandler handler = new ProcessBundleHandler(
         PipelineOptionsFactory.create(),
@@ -237,7 +238,7 @@ public class ProcessBundleHandlerTest {
       @Override
       protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
           BeamFnApi.PrimitiveTransform primitiveTransform,
-          Supplier<Long> processBundleInstructionId,
+          Supplier<String> processBundleInstructionId,
           Function<BeamFnApi.Target,
                    Collection<ThrowingConsumer<WindowedValue<OutputT>>>> 
consumers,
           BiConsumer<BeamFnApi.Target, 
ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
@@ -255,7 +256,7 @@ public class ProcessBundleHandlerTest {
     };
     handler.processBundle(
         BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(
-            
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(1L))
+            
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L"))
         .build());
   }
 
@@ -263,10 +264,10 @@ public class ProcessBundleHandlerTest {
   public void testPrimitiveTransformFinishExceptionsArePropagated() throws 
Exception {
     BeamFnApi.ProcessBundleDescriptor processBundleDescriptor =
         BeamFnApi.ProcessBundleDescriptor.newBuilder()
-        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(2L))
-        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId(3L))
+        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("2L"))
+        
.addPrimitiveTransform(BeamFnApi.PrimitiveTransform.newBuilder().setId("3L"))
         .build();
-    Map<Long, Message> fnApiRegistry = ImmutableMap.of(1L, 
processBundleDescriptor);
+    Map<String, Message> fnApiRegistry = ImmutableMap.of("1L", 
processBundleDescriptor);
 
     ProcessBundleHandler handler = new ProcessBundleHandler(
         PipelineOptionsFactory.create(),
@@ -275,7 +276,7 @@ public class ProcessBundleHandlerTest {
       @Override
       protected <InputT, OutputT> void createConsumersForPrimitiveTransform(
           BeamFnApi.PrimitiveTransform primitiveTransform,
-          Supplier<Long> processBundleInstructionId,
+          Supplier<String> processBundleInstructionId,
           Function<BeamFnApi.Target,
                    Collection<ThrowingConsumer<WindowedValue<OutputT>>>> 
consumers,
           BiConsumer<BeamFnApi.Target, 
ThrowingConsumer<WindowedValue<InputT>>> addConsumer,
@@ -293,7 +294,7 @@ public class ProcessBundleHandlerTest {
     };
     handler.processBundle(
         BeamFnApi.InstructionRequest.newBuilder().setProcessBundle(
-            
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(1L))
+            
BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference("1L"))
         .build());
   }
 
@@ -325,8 +326,8 @@ public class ProcessBundleHandlerTest {
    */
   @Test
   public void testCreatingAndProcessingDoFn() throws Exception {
-    Map<Long, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, 
STRING_CODER_SPEC);
-    long primitiveTransformId = 100L;
+    Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, 
STRING_CODER_SPEC);
+    String primitiveTransformId = "100L";
     long mainOutputId = 101L;
     long sideOutputId = 102L;
 
@@ -340,22 +341,22 @@ public class ProcessBundleHandlerTest {
             mainOutputId, TestDoFn.mainOutput,
             sideOutputId, TestDoFn.sideOutput));
     BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder()
-        .setId(1L)
+        .setId("1L")
         .setUrn(JAVA_DO_FN_URN)
         .setData(Any.pack(BytesValue.newBuilder()
             
.setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo)))
             .build()))
         .build();
     BeamFnApi.Target inputATarget1 = BeamFnApi.Target.newBuilder()
-        .setPrimitiveTransformReference(1000L)
+        .setPrimitiveTransformReference("1000L")
         .setName("inputATarget1")
         .build();
     BeamFnApi.Target inputATarget2 = BeamFnApi.Target.newBuilder()
-        .setPrimitiveTransformReference(1001L)
+        .setPrimitiveTransformReference("1001L")
         .setName("inputATarget1")
         .build();
     BeamFnApi.Target inputBTarget = BeamFnApi.Target.newBuilder()
-        .setPrimitiveTransformReference(1002L)
+        .setPrimitiveTransformReference("1002L")
         .setName("inputBTarget")
         .build();
     BeamFnApi.PrimitiveTransform primitiveTransform = 
BeamFnApi.PrimitiveTransform.newBuilder()
@@ -401,7 +402,7 @@ public class ProcessBundleHandlerTest {
         beamFnDataClient);
     handler.createConsumersForPrimitiveTransform(
         primitiveTransform,
-        Suppliers.ofInstance(57L)::get,
+        Suppliers.ofInstance("57L")::get,
         existingConsumers::get,
         newConsumers::put,
         startFunctions::add,
@@ -435,12 +436,12 @@ public class ProcessBundleHandlerTest {
 
   @Test
   public void testCreatingAndProcessingSource() throws Exception {
-    Map<Long, Message> fnApiRegistry = ImmutableMap.of(LONG_CODER_SPEC_ID, 
LONG_CODER_SPEC);
-    long primitiveTransformId = 100L;
+    Map<String, Message> fnApiRegistry = ImmutableMap.of(LONG_CODER_SPEC_ID, 
LONG_CODER_SPEC);
+    String primitiveTransformId = "100L";
     long outputId = 101L;
 
     BeamFnApi.Target inputTarget = BeamFnApi.Target.newBuilder()
-        .setPrimitiveTransformReference(1000L)
+        .setPrimitiveTransformReference("1000L")
         .setName("inputTarget")
         .build();
 
@@ -459,7 +460,7 @@ public class ProcessBundleHandlerTest {
     List<ThrowingRunnable> finishFunctions = new ArrayList<>();
 
     BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder()
-        .setId(1L)
+        .setId("1L")
         .setUrn(JAVA_SOURCE_URN)
         .setData(Any.pack(BytesValue.newBuilder()
             .setValue(ByteString.copyFrom(
@@ -483,7 +484,7 @@ public class ProcessBundleHandlerTest {
 
     handler.createConsumersForPrimitiveTransform(
         primitiveTransform,
-        Suppliers.ofInstance(57L)::get,
+        Suppliers.ofInstance("57L")::get,
         existingConsumers::get,
         newConsumers::put,
         startFunctions::add,
@@ -511,9 +512,9 @@ public class ProcessBundleHandlerTest {
 
   @Test
   public void testCreatingAndProcessingBeamFnDataReadRunner() throws Exception 
{
-    Map<Long, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, 
STRING_CODER_SPEC);
-    long bundleId = 57L;
-    long primitiveTransformId = 100L;
+    Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, 
STRING_CODER_SPEC);
+    String bundleId = "57L";
+    String primitiveTransformId = "100L";
     long outputId = 101L;
 
     List<WindowedValue<String>> outputValues = new ArrayList<>();
@@ -530,7 +531,7 @@ public class ProcessBundleHandlerTest {
     List<ThrowingRunnable> finishFunctions = new ArrayList<>();
 
     BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder()
-        .setId(1L)
+        .setId("1L")
         .setUrn(DATA_INPUT_URN)
         .setData(Any.pack(REMOTE_PORT))
         .build();
@@ -585,13 +586,13 @@ public class ProcessBundleHandlerTest {
 
   @Test
   public void testCreatingAndProcessingBeamFnDataWriteRunner() throws 
Exception {
-    Map<Long, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, 
STRING_CODER_SPEC);
-    long bundleId = 57L;
-    long primitiveTransformId = 100L;
+    Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, 
STRING_CODER_SPEC);
+    String bundleId = "57L";
+    String primitiveTransformId = "100L";
     long outputId = 101L;
 
     BeamFnApi.Target inputTarget = BeamFnApi.Target.newBuilder()
-        .setPrimitiveTransformReference(1000L)
+        .setPrimitiveTransformReference("1000L")
         .setName("inputTarget")
         .build();
 
@@ -603,7 +604,7 @@ public class ProcessBundleHandlerTest {
     List<ThrowingRunnable> finishFunctions = new ArrayList<>();
 
     BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder()
-        .setId(1L)
+        .setId("1L")
         .setUrn(DATA_OUTPUT_URN)
         .setData(Any.pack(REMOTE_PORT))
         .build();

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
index 7b07a08..c32fcc4 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/RegisterHandlerTest.java
@@ -39,14 +39,14 @@ public class RegisterHandlerTest {
 
   private static final BeamFnApi.InstructionRequest REGISTER_REQUEST =
       BeamFnApi.InstructionRequest.newBuilder()
-      .setInstructionId(1L)
+      .setInstructionId("1L")
       .setRegister(BeamFnApi.RegisterRequest.newBuilder()
-          
.addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(1L)
+          
.addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("1L")
               .addCoders(BeamFnApi.Coder.newBuilder().setFunctionSpec(
-                  BeamFnApi.FunctionSpec.newBuilder().setId(10L)).build()))
-          
.addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(2L)
+                  BeamFnApi.FunctionSpec.newBuilder().setId("10L")).build()))
+          
.addProcessBundleDescriptor(BeamFnApi.ProcessBundleDescriptor.newBuilder().setId("2L")
               .addCoders(BeamFnApi.Coder.newBuilder().setFunctionSpec(
-                  BeamFnApi.FunctionSpec.newBuilder().setId(20L)).build()))
+                  BeamFnApi.FunctionSpec.newBuilder().setId("20L")).build()))
           .build())
       .build();
   private static final BeamFnApi.InstructionResponse REGISTER_RESPONSE =
@@ -68,13 +68,13 @@ public class RegisterHandlerTest {
           }
     });
     assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(0),
-        handler.getById(1L));
+        handler.getById("1L"));
     assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1),
-        handler.getById(2L));
+        handler.getById("2L"));
     
assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(0).getCoders(0),
-        handler.getById(10L));
+        handler.getById("10L"));
     
assertEquals(REGISTER_REQUEST.getRegister().getProcessBundleDescriptor(1).getCoders(0),
-        handler.getById(20L));
+        handler.getById("20L"));
     assertEquals(REGISTER_RESPONSE, responseFuture.get());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
index 64a0e11..7cbf8eb 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataBufferingOutboundObserverTest.java
@@ -46,8 +46,13 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class BeamFnDataBufferingOutboundObserverTest {
   private static final int DEFAULT_BUFFER_LIMIT = 1_000_000;
-  private static final KV<Long, BeamFnApi.Target> OUTPUT_LOCATION = KV.of(777L,
-      
BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(555L).setName("Test").build());
+  private static final KV<String, BeamFnApi.Target> OUTPUT_LOCATION =
+      KV.of(
+          "777L",
+          BeamFnApi.Target.newBuilder()
+              .setPrimitiveTransformReference("555L")
+              .setName("Test")
+              .build());
   private static final Coder<WindowedValue<byte[]>> CODER =
       
LengthPrefixCoder.of(WindowedValue.getValueOnlyCoder(ByteArrayCoder.of()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
index 20566ea..31eb0db 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java
@@ -66,12 +66,21 @@ public class BeamFnDataGrpcClientTest {
       LengthPrefixCoder.of(
           WindowedValue.getFullCoder(StringUtf8Coder.of(),
               GlobalWindow.Coder.INSTANCE));
-  private static final KV<Long, BeamFnApi.Target> KEY_A = KV.of(
-      12L,
-      
BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(34L).setName("targetA").build());
-  private static final KV<Long, BeamFnApi.Target> KEY_B = KV.of(
-      56L,
-      
BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(78L).setName("targetB").build());
+  private static final KV<String, BeamFnApi.Target> KEY_A =
+      KV.of(
+          "12L",
+          BeamFnApi.Target.newBuilder()
+              .setPrimitiveTransformReference("34L")
+              .setName("targetA")
+              .build());
+
+  private static final KV<String, BeamFnApi.Target> KEY_B =
+      KV.of(
+          "56L",
+          BeamFnApi.Target.newBuilder()
+              .setPrimitiveTransformReference("78L")
+              .setName("targetB")
+              .build());
 
   private static final BeamFnApi.Elements ELEMENTS_A_1;
   private static final BeamFnApi.Elements ELEMENTS_A_2;

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
index 38d9e2c..a9095ae 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexerTest.java
@@ -39,11 +39,13 @@ import org.junit.Test;
 public class BeamFnDataGrpcMultiplexerTest {
   private static final BeamFnApi.ApiServiceDescriptor DESCRIPTOR =
       BeamFnApi.ApiServiceDescriptor.newBuilder().setUrl("test").build();
-  private static final KV<Long, BeamFnApi.Target> OUTPUT_LOCATION = KV.of(777L,
-      BeamFnApi.Target.newBuilder()
-      .setName("name")
-      .setPrimitiveTransformReference(888L)
-      .build());
+  private static final KV<String, BeamFnApi.Target> OUTPUT_LOCATION =
+      KV.of(
+          "777L",
+          BeamFnApi.Target.newBuilder()
+              .setName("name")
+              .setPrimitiveTransformReference("888L")
+              .build());
   private static final BeamFnApi.Elements ELEMENTS = 
BeamFnApi.Elements.newBuilder()
       .addData(BeamFnApi.Elements.Data.newBuilder()
           .setInstructionReference(OUTPUT_LOCATION.getKey())

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
index ff0e083..c53f99d 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataInboundObserverTest.java
@@ -102,9 +102,9 @@ public class BeamFnDataInboundObserverTest {
 
   private BeamFnApi.Elements.Data dataWith(String ... values) throws Exception 
{
     BeamFnApi.Elements.Data.Builder builder = 
BeamFnApi.Elements.Data.newBuilder()
-        .setInstructionReference(777L)
+        .setInstructionReference("777L")
         .setTarget(BeamFnApi.Target.newBuilder()
-            .setPrimitiveTransformReference(999L)
+            .setPrimitiveTransformReference("999L")
             .setName("Test"));
     ByteString.Output output = ByteString.newOutput();
     for (String value : values) {

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
index 511cc3f..0cc5ef9 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
@@ -42,7 +42,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.test.TestExecutors;
@@ -85,7 +85,7 @@ public class BeamFnDataReadRunnerTest {
     }
   }
   private static final BeamFnApi.Target INPUT_TARGET = 
BeamFnApi.Target.newBuilder()
-      .setPrimitiveTransformReference(1)
+      .setPrimitiveTransformReference("1")
       .setName("out")
       .build();
 
@@ -112,7 +112,7 @@ public class BeamFnDataReadRunnerTest {
     Map<String, Collection<ThrowingConsumer<WindowedValue<String>>>> outputMap 
= ImmutableMap.of(
         "outA", ImmutableList.of(valuesA::add),
         "outB", ImmutableList.of(valuesB::add));
-    AtomicLong bundleId = new AtomicLong(0);
+    AtomicReference<String> bundleId = new AtomicReference<>("0");
     BeamFnDataReadRunner<String> readRunner = new BeamFnDataReadRunner<>(
         FUNCTION_SPEC,
         bundleId::get,
@@ -151,7 +151,7 @@ public class BeamFnDataReadRunnerTest {
     assertThat(valuesB, contains(valueInGlobalWindow("ABC"), 
valueInGlobalWindow("DEF")));
 
     // Process for bundle id 1
-    bundleId.incrementAndGet();
+    bundleId.set("1");
     valuesA.clear();
     valuesB.clear();
     readRunner.registerInputLocation();

http://git-wip-us.apache.org/repos/asf/beam/blob/9933f271/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
index ed67b14..378567a 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
@@ -34,7 +34,7 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.v1.BeamFnApi;
@@ -73,7 +73,7 @@ public class BeamFnDataWriteRunnerTest {
     }
   }
   private static final BeamFnApi.Target OUTPUT_TARGET = 
BeamFnApi.Target.newBuilder()
-      .setPrimitiveTransformReference(1)
+      .setPrimitiveTransformReference("1")
       .setName("out")
       .build();
 
@@ -92,7 +92,7 @@ public class BeamFnDataWriteRunnerTest {
         any(),
         any(),
         
Matchers.<Coder<WindowedValue<String>>>any())).thenReturn(valuesA).thenReturn(valuesB);
-    AtomicLong bundleId = new AtomicLong(0);
+    AtomicReference<String> bundleId = new AtomicReference<>("0");
     BeamFnDataWriteRunner<String> writeRunner = new BeamFnDataWriteRunner<>(
         FUNCTION_SPEC,
         bundleId::get,
@@ -116,7 +116,7 @@ public class BeamFnDataWriteRunnerTest {
     assertThat(valuesA, contains(valueInGlobalWindow("ABC"), 
valueInGlobalWindow("DEF")));
 
     // Process for bundle id 1
-    bundleId.incrementAndGet();
+    bundleId.set("1");
     valuesA.clear();
     valuesB.clear();
     writeRunner.registerForOutput();

Reply via email to