URNs for DirectRunner TransformEvaluator and RootInputProvider

This makes two of the Java DirectRunner's registries key off URN instead
of Java class. A root TransformEvaluator requires shards generated by
its associated RootInputProvider, hence changing both at once.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e29cc52
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e29cc52
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e29cc52

Branch: refs/heads/master
Commit: 0e29cc52a3e4b0d9ae5ff3907f10e4e87b734186
Parents: 663ad88
Author: Kenneth Knowles <k...@google.com>
Authored: Fri May 19 20:53:32 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Thu May 25 11:17:08 2017 -0700

----------------------------------------------------------------------
 runners/direct-java/pom.xml                     |   5 +
 .../direct/BoundedReadEvaluatorFactory.java     |  14 +-
 .../beam/runners/direct/DirectGroupByKey.java   |  21 ++-
 .../beam/runners/direct/EmptyInputProvider.java |   8 +-
 .../direct/ParDoMultiOverrideFactory.java       |  13 +-
 .../runners/direct/ReadEvaluatorFactory.java    |  97 ++++++++++++++
 .../beam/runners/direct/RootInputProvider.java  |   7 +-
 .../runners/direct/RootProviderRegistry.java    |  28 ++--
 .../apache/beam/runners/direct/SourceShard.java |  33 +++++
 .../direct/TestStreamEvaluatorFactory.java      |  28 ++--
 .../direct/TransformEvaluatorRegistry.java      | 128 ++++++++++++++-----
 .../direct/UnboundedReadEvaluatorFactory.java   |  31 +++--
 .../runners/direct/ViewOverrideFactory.java     |  12 +-
 .../src/main/resources/beam/findbugs-filter.xml |   7 +
 14 files changed, 344 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index bec2113..cba4b09 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -208,6 +208,11 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 76db861..fcaaa84 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -33,10 +33,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.direct.StepTransformResult.Builder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -180,16 +180,17 @@ final class BoundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
   }
 
   @AutoValue
-  abstract static class BoundedSourceShard<T> {
+  abstract static class BoundedSourceShard<T> implements SourceShard<T> {
     static <T> BoundedSourceShard<T> of(BoundedSource<T> source) {
       return new 
AutoValue_BoundedReadEvaluatorFactory_BoundedSourceShard<>(source);
     }
 
-    abstract BoundedSource<T> getSource();
+    @Override
+    public abstract BoundedSource<T> getSource();
   }
 
   static class InputProvider<T>
-      implements RootInputProvider<T, BoundedSourceShard<T>, PBegin, 
Read.Bounded<T>> {
+      implements RootInputProvider<T, BoundedSourceShard<T>, PBegin> {
     private final EvaluationContext evaluationContext;
 
     InputProvider(EvaluationContext evaluationContext) {
@@ -198,9 +199,10 @@ final class BoundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
 
     @Override
     public Collection<CommittedBundle<BoundedSourceShard<T>>> getInitialInputs(
-        AppliedPTransform<PBegin, PCollection<T>, Read.Bounded<T>> transform, 
int targetParallelism)
+        AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, 
PCollection<T>>> transform,
+        int targetParallelism)
         throws Exception {
-      BoundedSource<T> source = transform.getTransform().getSource();
+      BoundedSource<T> source = 
ReadTranslation.boundedSourceFromTransform(transform);
       PipelineOptions options = evaluationContext.getPipelineOptions();
       long estimatedBytes = source.getEstimatedSizeBytes(options);
       long bytesPerBundle = estimatedBytes / targetParallelism;

http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 791615a..f239070 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -20,9 +20,11 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.protobuf.Message;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -37,6 +39,9 @@ class DirectGroupByKey<K, V>
     extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>> {
   private final GroupByKey<K, V> original;
 
+  static final String DIRECT_GBKO_URN = 
"urn:beam:directrunner:transforms:gbko:v1";
+  static final String DIRECT_GABW_URN = 
"urn:beam:directrunner:transforms:gabw:v1";
+
   DirectGroupByKey(GroupByKey<K, V> from) {
     this.original = from;
   }
@@ -68,7 +73,8 @@ class DirectGroupByKey<K, V>
   }
 
   static final class DirectGroupByKeyOnly<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, 
V>>> {
+      extends PTransformTranslation.RawPTransform<
+          PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, Message> {
     @Override
     public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> 
input) {
       return PCollection.createPrimitiveOutputInternal(
@@ -86,10 +92,16 @@ class DirectGroupByKey<K, V>
           GroupByKey.getInputValueCoder(input.getCoder()),
           input.getWindowingStrategy().getWindowFn().windowCoder());
     }
+
+    @Override
+    public String getUrn() {
+      return DIRECT_GBKO_URN;
+    }
   }
 
   static final class DirectGroupAlsoByWindow<K, V>
-      extends PTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, 
Iterable<V>>>> {
+      extends PTransformTranslation.RawPTransform<
+          PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, 
Message> {
 
     private final WindowingStrategy<?, ?> inputWindowingStrategy;
     private final WindowingStrategy<?, ?> outputWindowingStrategy;
@@ -135,5 +147,10 @@ class DirectGroupByKey<K, V>
       return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(), outputWindowingStrategy, input.isBounded());
     }
+
+    @Override
+    public String getUrn() {
+      return DIRECT_GABW_URN;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
index c36879a..a5a53bc 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
@@ -20,13 +20,12 @@ package org.apache.beam.runners.direct;
 import java.util.Collection;
 import java.util.Collections;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 
 /** A {@link RootInputProvider} that provides a singleton empty bundle. */
-class EmptyInputProvider<T>
-    implements RootInputProvider<T, Void, PCollectionList<T>, 
Flatten.PCollections<T>> {
+class EmptyInputProvider<T> implements RootInputProvider<T, Void, 
PCollectionList<T>> {
   EmptyInputProvider() {}
 
   /**
@@ -36,7 +35,8 @@ class EmptyInputProvider<T>
    */
   @Override
   public Collection<CommittedBundle<Void>> getInitialInputs(
-      AppliedPTransform<PCollectionList<T>, PCollection<T>, 
Flatten.PCollections<T>>
+      AppliedPTransform<
+              PCollectionList<T>, PCollection<T>, 
PTransform<PCollectionList<T>, PCollection<T>>>
           transform,
       int targetParallelism) {
     return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index be433db..df2054b 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -19,11 +19,13 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.protobuf.Message;
 import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
@@ -165,8 +167,12 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
     }
   }
 
+  static final String DIRECT_STATEFUL_PAR_DO_URN =
+      "urn:beam:directrunner:transforms:stateful_pardo:v1";
+
   static class StatefulParDo<K, InputT, OutputT>
-      extends PTransform<PCollection<? extends KeyedWorkItem<K, KV<K, 
InputT>>>, PCollectionTuple> {
+      extends PTransformTranslation.RawPTransform<
+          PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, 
PCollectionTuple, Message> {
     private final transient MultiOutput<KV<K, InputT>, OutputT> 
underlyingParDo;
     private final transient PCollection<KV<K, InputT>> originalInput;
 
@@ -201,6 +207,11 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
 
       return outputs;
     }
+
+    @Override
+    public String getUrn() {
+      return DIRECT_STATEFUL_PAR_DO_URN;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ReadEvaluatorFactory.java
new file mode 100644
index 0000000..8521706
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ReadEvaluatorFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.ReadTranslation;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator 
TransformEvaluators}
+ * for the {@link Read Read} primitives, whether bounded or unbounded.
+ */
+final class ReadEvaluatorFactory implements TransformEvaluatorFactory {
+
+  final BoundedReadEvaluatorFactory boundedFactory;
+  final UnboundedReadEvaluatorFactory unboundedFactory;
+
+  public ReadEvaluatorFactory(EvaluationContext context) {
+    boundedFactory = new BoundedReadEvaluatorFactory(context);
+    unboundedFactory = new UnboundedReadEvaluatorFactory(context);
+  }
+
+  @Nullable
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) 
throws Exception {
+    switch (ReadTranslation.sourceIsBounded(application)) {
+      case BOUNDED:
+        return boundedFactory.forApplication(application, inputBundle);
+      case UNBOUNDED:
+        return unboundedFactory.forApplication(application, inputBundle);
+      default:
+        throw new IllegalArgumentException("PCollection is neither bounded nor 
unbounded?!?");
+    }
+  }
+
+  @Override
+  public void cleanup() throws Exception {
+    boundedFactory.cleanup();
+    unboundedFactory.cleanup();
+  }
+
+  static <T> InputProvider<T> inputProvider(EvaluationContext context) {
+    return new InputProvider(context);
+  }
+
+  private static class InputProvider<T> implements RootInputProvider<T, 
SourceShard<T>, PBegin> {
+
+    private final UnboundedReadEvaluatorFactory.InputProvider<T> 
unboundedInputProvider;
+    private final BoundedReadEvaluatorFactory.InputProvider<T> 
boundedInputProvider;
+
+    InputProvider(EvaluationContext context) {
+      this.unboundedInputProvider = new 
UnboundedReadEvaluatorFactory.InputProvider<T>(context);
+      this.boundedInputProvider = new 
BoundedReadEvaluatorFactory.InputProvider<T>(context);
+    }
+
+    @Override
+    public Collection<CommittedBundle<SourceShard<T>>> getInitialInputs(
+        AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, 
PCollection<T>>>
+            appliedTransform,
+        int targetParallelism)
+        throws Exception {
+      switch (ReadTranslation.sourceIsBounded(appliedTransform)) {
+        case BOUNDED:
+          // This cast could be made unnecessary, but too much bounded 
polymorphism
+          return (Collection)
+              boundedInputProvider.getInitialInputs(appliedTransform, 
targetParallelism);
+        case UNBOUNDED:
+          // This cast could be made unnecessary, but too much bounded 
polymorphism
+          return (Collection)
+              unboundedInputProvider.getInitialInputs(appliedTransform, 
targetParallelism);
+        default:
+          throw new IllegalArgumentException("PCollection is neither bounded 
nor unbounded?!?");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
index ce69518..0b3de32 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
@@ -29,8 +29,7 @@ import org.apache.beam.sdk.values.PInput;
  * Provides {@link CommittedBundle bundles} that will be provided to the 
{@link PTransform
  * PTransforms} that are at the root of a {@link Pipeline}.
  */
-interface RootInputProvider<
-    T, ShardT, InputT extends PInput, TransformT extends PTransform<InputT, 
PCollection<T>>> {
+interface RootInputProvider<T, ShardT, InputT extends PInput> {
   /**
    * Get the initial inputs for the {@link AppliedPTransform}. The {@link 
AppliedPTransform} will be
    * provided with these {@link CommittedBundle bundles} as input when the 
{@link Pipeline} runs.
@@ -44,6 +43,8 @@ interface RootInputProvider<
    *     greater than or equal to 1.
    */
   Collection<CommittedBundle<ShardT>> getInitialInputs(
-      AppliedPTransform<InputT, PCollection<T>, TransformT> transform, int 
targetParallelism)
+      AppliedPTransform<InputT, PCollection<T>, PTransform<InputT, 
PCollection<T>>>
+          transform,
+      int targetParallelism)
       throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
index 4b0c06d..5cbeab7 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
@@ -18,13 +18,14 @@
 package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.runners.core.construction.PTransformTranslation.FLATTEN_TRANSFORM_URN;
+import static 
org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory.DIRECT_TEST_STREAM_URN;
 
 import com.google.common.collect.ImmutableMap;
 import java.util.Collection;
 import java.util.Map;
-import org.apache.beam.sdk.io.Read;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Flatten.PCollections;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**
@@ -33,34 +34,31 @@ import org.apache.beam.sdk.transforms.PTransform;
  */
 class RootProviderRegistry {
   public static RootProviderRegistry defaultRegistry(EvaluationContext 
context) {
-    ImmutableMap.Builder<Class<? extends PTransform>, RootInputProvider<?, ?, 
?, ?>>
+    ImmutableMap.Builder<String, RootInputProvider<?, ?, ?>>
         defaultProviders = ImmutableMap.builder();
     defaultProviders
-        .put(Read.Bounded.class, new 
BoundedReadEvaluatorFactory.InputProvider(context))
-        .put(Read.Unbounded.class, new 
UnboundedReadEvaluatorFactory.InputProvider(context))
-        .put(
-            
TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream.class,
-            new TestStreamEvaluatorFactory.InputProvider(context))
-        .put(PCollections.class, new EmptyInputProvider());
+        .put(PTransformTranslation.READ_TRANSFORM_URN, 
ReadEvaluatorFactory.inputProvider(context))
+        .put(DIRECT_TEST_STREAM_URN, new 
TestStreamEvaluatorFactory.InputProvider(context))
+        .put(FLATTEN_TRANSFORM_URN, new EmptyInputProvider());
     return new RootProviderRegistry(defaultProviders.build());
   }
 
-  private final Map<Class<? extends PTransform>, RootInputProvider<?, ?, ?, 
?>> providers;
+  private final Map<String, RootInputProvider<?, ?, ?>> providers;
 
   private RootProviderRegistry(
-      Map<Class<? extends PTransform>, RootInputProvider<?, ?, ?, ?>> 
providers) {
+      Map<String, RootInputProvider<?, ?, ?>> providers) {
     this.providers = providers;
   }
 
   public Collection<CommittedBundle<?>> getInitialInputs(
       AppliedPTransform<?, ?, ?> transform, int targetParallelism) throws 
Exception {
-    Class<? extends PTransform> transformClass = 
transform.getTransform().getClass();
+    String transformUrn = 
PTransformTranslation.urnForTransform(transform.getTransform());
     RootInputProvider provider =
         checkNotNull(
-            providers.get(transformClass),
-            "Tried to get a %s for a Transform of type %s, but there is no 
such provider",
+            providers.get(transformUrn),
+            "Tried to get a %s for a transform \"%s\", but there is no such 
provider",
             RootInputProvider.class.getSimpleName(),
-            transformClass.getSimpleName());
+            transformUrn);
     return provider.getInitialInputs(transform, targetParallelism);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SourceShard.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SourceShard.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SourceShard.java
new file mode 100644
index 0000000..a054333
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SourceShard.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+
+/**
+ * A shard for a source in the {@link Read} transform.
+ *
+ * <p>Since {@link UnboundedSource} and {@link BoundedSource} have radically 
different needs, this
+ * is a mostly-empty interface.
+ */
+interface SourceShard<T> {
+  Source<T> getSource();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 8b21d5a..b1db58f 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -22,12 +22,14 @@ import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Iterables;
+import com.google.protobuf.Message;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -180,7 +182,10 @@ class TestStreamEvaluatorFactory implements 
TransformEvaluatorFactory {
       return ReplacementOutputs.singleton(outputs, newOutput);
     }
 
-    static class DirectTestStream<T> extends PTransform<PBegin, 
PCollection<T>> {
+    static final String DIRECT_TEST_STREAM_URN = 
"urn:beam:directrunner:transforms:test_stream:v1";
+
+    static class DirectTestStream<T>
+        extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>, 
Message> {
       private final transient DirectRunner runner;
       private final TestStream<T> original;
 
@@ -197,12 +202,15 @@ class TestStreamEvaluatorFactory implements 
TransformEvaluatorFactory {
                 input.getPipeline(), WindowingStrategy.globalDefault(), 
IsBounded.UNBOUNDED)
             .setCoder(original.getValueCoder());
       }
+
+      @Override
+      public String getUrn() {
+        return DIRECT_TEST_STREAM_URN;
+      }
     }
   }
 
-  static class InputProvider<T>
-      implements RootInputProvider<
-          T, TestStreamIndex<T>, PBegin, 
DirectTestStreamFactory.DirectTestStream<T>> {
+  static class InputProvider<T> implements RootInputProvider<T, 
TestStreamIndex<T>, PBegin> {
     private final EvaluationContext evaluationContext;
 
     InputProvider(EvaluationContext evaluationContext) {
@@ -211,15 +219,17 @@ class TestStreamEvaluatorFactory implements 
TransformEvaluatorFactory {
 
     @Override
     public Collection<CommittedBundle<TestStreamIndex<T>>> getInitialInputs(
-        AppliedPTransform<PBegin, PCollection<T>, 
DirectTestStreamFactory.DirectTestStream<T>>
-            transform,
+        AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, 
PCollection<T>>> transform,
         int targetParallelism) {
+
+      // This will always be run on an execution-time transform, so it can be 
downcast
+      DirectTestStreamFactory.DirectTestStream<T> testStream =
+          (DirectTestStreamFactory.DirectTestStream<T>) 
transform.getTransform();
+
       CommittedBundle<TestStreamIndex<T>> initialBundle =
           evaluationContext
               .<TestStreamIndex<T>>createRootBundle()
-              .add(
-                  WindowedValue.valueInGlobalWindow(
-                      TestStreamIndex.of(transform.getTransform().original)))
+              
.add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(testStream.original)))
               .commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
       return Collections.singleton(initialBundle);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 718cca2..d144b20 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -19,23 +19,33 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.beam.runners.core.construction.PTransformTranslation.FLATTEN_TRANSFORM_URN;
+import static 
org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
+import static 
org.apache.beam.runners.core.construction.PTransformTranslation.READ_TRANSFORM_URN;
+import static 
org.apache.beam.runners.core.construction.PTransformTranslation.WINDOW_TRANSFORM_URN;
+import static 
org.apache.beam.runners.core.construction.SplittableParDo.SPLITTABLE_PROCESS_URN;
+import static org.apache.beam.runners.direct.DirectGroupByKey.DIRECT_GABW_URN;
+import static org.apache.beam.runners.direct.DirectGroupByKey.DIRECT_GBKO_URN;
+import static 
org.apache.beam.runners.direct.ParDoMultiOverrideFactory.DIRECT_STATEFUL_PAR_DO_URN;
+import static 
org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory.DIRECT_TEST_STREAM_URN;
+import static 
org.apache.beam.runners.direct.ViewOverrideFactory.DIRECT_WRITE_VIEW_URN;
 
+import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
-import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
-import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
-import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
-import org.apache.beam.sdk.io.Read;
+import 
org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import 
org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
+import 
org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Flatten.PCollections;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.Window;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,43 +55,93 @@ import org.slf4j.LoggerFactory;
  */
 class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(TransformEvaluatorRegistry.class);
+
   public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext 
ctxt) {
-    @SuppressWarnings({"rawtypes"})
-    ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> 
primitives =
-        ImmutableMap.<Class<? extends PTransform>, 
TransformEvaluatorFactory>builder()
-            .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt))
-            .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
+    ImmutableMap<String, TransformEvaluatorFactory> primitives =
+        ImmutableMap.<String, TransformEvaluatorFactory>builder()
+            // Beam primitives
+            .put(READ_TRANSFORM_URN, new ReadEvaluatorFactory(ctxt))
             .put(
-                ParDo.MultiOutput.class,
+                PAR_DO_TRANSFORM_URN,
                 new ParDoEvaluatorFactory<>(ctxt, 
ParDoEvaluator.defaultRunnerFactory()))
-            .put(StatefulParDo.class, new 
StatefulParDoEvaluatorFactory<>(ctxt))
-            .put(PCollections.class, new FlattenEvaluatorFactory(ctxt))
-            .put(WriteView.class, new ViewEvaluatorFactory(ctxt))
-            .put(Window.Assign.class, new WindowEvaluatorFactory(ctxt))
-            // Runner-specific primitives used in expansion of GroupByKey
-            .put(DirectGroupByKeyOnly.class, new 
GroupByKeyOnlyEvaluatorFactory(ctxt))
-            .put(DirectGroupAlsoByWindow.class, new 
GroupAlsoByWindowEvaluatorFactory(ctxt))
-            .put(
-                
TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream.class,
-                new TestStreamEvaluatorFactory(ctxt))
-            // Runner-specific primitive used in expansion of SplittableParDo
-            .put(
-                SplittableParDoViaKeyedWorkItems.ProcessElements.class,
-                new SplittableProcessElementsEvaluatorFactory<>(ctxt))
+            .put(FLATTEN_TRANSFORM_URN, new FlattenEvaluatorFactory(ctxt))
+            .put(WINDOW_TRANSFORM_URN, new WindowEvaluatorFactory(ctxt))
+
+            // Runner-specific primitives
+            .put(DIRECT_WRITE_VIEW_URN, new ViewEvaluatorFactory(ctxt))
+            .put(DIRECT_STATEFUL_PAR_DO_URN, new 
StatefulParDoEvaluatorFactory<>(ctxt))
+            .put(DIRECT_GBKO_URN, new GroupByKeyOnlyEvaluatorFactory(ctxt))
+            .put(DIRECT_GABW_URN, new GroupAlsoByWindowEvaluatorFactory(ctxt))
+            .put(DIRECT_TEST_STREAM_URN, new TestStreamEvaluatorFactory(ctxt))
+
+            // Runners-core primitives
+            .put(SPLITTABLE_PROCESS_URN, new 
SplittableProcessElementsEvaluatorFactory<>(ctxt))
             .build();
     return new TransformEvaluatorRegistry(primitives);
   }
 
+  /** Registers classes specialized to the direct runner. */
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class DirectTransformsRegistrar implements 
TransformPayloadTranslatorRegistrar {
+    @Override
+    public Map<
+            ? extends Class<? extends PTransform>,
+            ? extends PTransformTranslation.TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return ImmutableMap
+          .<Class<? extends PTransform>, 
PTransformTranslation.TransformPayloadTranslator>builder()
+          .put(
+              DirectGroupByKey.DirectGroupByKeyOnly.class,
+              new PTransformTranslation.RawPTransformTranslator<>())
+          .put(
+              DirectGroupByKey.DirectGroupAlsoByWindow.class,
+              new PTransformTranslation.RawPTransformTranslator())
+          .put(
+              ParDoMultiOverrideFactory.StatefulParDo.class,
+              new PTransformTranslation.RawPTransformTranslator<>())
+          .put(
+              ViewOverrideFactory.WriteView.class,
+              new PTransformTranslation.RawPTransformTranslator<>())
+          .put(DirectTestStream.class, new 
PTransformTranslation.RawPTransformTranslator<>())
+          .put(
+              SplittableParDoViaKeyedWorkItems.ProcessElements.class,
+              new SplittableParDoProcessElementsTranslator())
+          .build();
+    }
+  }
+
+  /**
+   * A translator just to vend the URN. This will need to be moved to 
runners-core-construction-java
+   * once SDF is reorganized appropriately.
+   */
+  private static class SplittableParDoProcessElementsTranslator
+      implements TransformPayloadTranslator<ProcessElements<?, ?, ?, ?>> {
+
+    private SplittableParDoProcessElementsTranslator() {}
+
+    @Override
+    public String getUrn(ProcessElements<?, ?, ?, ?> transform) {
+      return SPLITTABLE_PROCESS_URN;
+    }
+
+    @Override
+    public FunctionSpec translate(
+        AppliedPTransform<?, ?, ProcessElements<?, ?, ?, ?>> transform, 
SdkComponents components) {
+      throw new UnsupportedOperationException(
+          String.format("%s should never be translated",
+          ProcessElements.class.getCanonicalName()));
+    }
+  }
+
   // the TransformEvaluatorFactories can construct instances of all generic 
types of transform,
   // so all instances of a primitive can be handled with the same evaluator 
factory.
-  @SuppressWarnings("rawtypes")
-  private final Map<Class<? extends PTransform>, TransformEvaluatorFactory> 
factories;
+  private final Map<String, TransformEvaluatorFactory> factories;
 
   private final AtomicBoolean finished = new AtomicBoolean(false);
 
   private TransformEvaluatorRegistry(
       @SuppressWarnings("rawtypes")
-      Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories) {
+      Map<String, TransformEvaluatorFactory> factories) {
     this.factories = factories;
   }
 
@@ -91,10 +151,12 @@ class TransformEvaluatorRegistry implements 
TransformEvaluatorFactory {
       throws Exception {
     checkState(
         !finished.get(), "Tried to get an evaluator for a finished 
TransformEvaluatorRegistry");
-    Class<? extends PTransform> transformClass = 
application.getTransform().getClass();
+
+    String urn = 
PTransformTranslation.urnForTransform(application.getTransform());
+
     TransformEvaluatorFactory factory =
         checkNotNull(
-            factories.get(transformClass), "No evaluator for PTransform type 
%s", transformClass);
+            factories.get(urn), "No evaluator for PTransform \"%s\"", urn);
     return factory.forApplication(application, inputBundle);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index cba826c..7d4bba1 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.ReadTranslation;
 import 
org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Read.Unbounded;
@@ -253,7 +254,8 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
   }
 
   @AutoValue
-  abstract static class UnboundedSourceShard<T, CheckpointT extends 
CheckpointMark> {
+  abstract static class UnboundedSourceShard<T, CheckpointT extends 
CheckpointMark>
+      implements SourceShard<T> {
     static <T, CheckpointT extends CheckpointMark> UnboundedSourceShard<T, 
CheckpointT> unstarted(
         UnboundedSource<T, CheckpointT> source, UnboundedReadDeduplicator 
deduplicator) {
       return of(source, deduplicator, null, null);
@@ -268,7 +270,8 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
           source, deduplicator, reader, checkpoint);
     }
 
-    abstract UnboundedSource<T, CheckpointT> getSource();
+    @Override
+    public abstract UnboundedSource<T, CheckpointT> getSource();
 
     abstract UnboundedReadDeduplicator getDeduplicator();
 
@@ -283,9 +286,8 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
     }
   }
 
-  static class InputProvider<OutputT>
-      implements RootInputProvider<
-          OutputT, UnboundedSourceShard<OutputT, ?>, PBegin, 
Unbounded<OutputT>> {
+  static class InputProvider<T>
+      implements RootInputProvider<T, UnboundedSourceShard<T, ?>, PBegin> {
     private final EvaluationContext evaluationContext;
 
     InputProvider(EvaluationContext evaluationContext) {
@@ -293,27 +295,28 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
     }
 
     @Override
-    public Collection<CommittedBundle<UnboundedSourceShard<OutputT, ?>>> 
getInitialInputs(
-        AppliedPTransform<PBegin, PCollection<OutputT>, Unbounded<OutputT>> 
transform,
+    public Collection<CommittedBundle<UnboundedSourceShard<T, ?>>> 
getInitialInputs(
+        AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, 
PCollection<T>>>
+            transform,
         int targetParallelism)
         throws Exception {
-      UnboundedSource<OutputT, ?> source = 
transform.getTransform().getSource();
-      List<? extends UnboundedSource<OutputT, ?>> splits =
+      UnboundedSource<T, ?> source = 
ReadTranslation.unboundedSourceFromTransform(transform);
+      List<? extends UnboundedSource<T, ?>> splits =
           source.split(targetParallelism, 
evaluationContext.getPipelineOptions());
       UnboundedReadDeduplicator deduplicator =
           source.requiresDeduping()
               ? UnboundedReadDeduplicator.CachedIdDeduplicator.create()
               : NeverDeduplicator.create();
 
-      ImmutableList.Builder<CommittedBundle<UnboundedSourceShard<OutputT, ?>>> 
initialShards =
+      ImmutableList.Builder<CommittedBundle<UnboundedSourceShard<T, ?>>> 
initialShards =
           ImmutableList.builder();
-      for (UnboundedSource<OutputT, ?> split : splits) {
-        UnboundedSourceShard<OutputT, ?> shard =
+      for (UnboundedSource<T, ?> split : splits) {
+        UnboundedSourceShard<T, ?> shard =
             UnboundedSourceShard.unstarted(split, deduplicator);
         initialShards.add(
             evaluationContext
-                .<UnboundedSourceShard<OutputT, ?>>createRootBundle()
-                .add(WindowedValue.<UnboundedSourceShard<OutputT, 
?>>valueInGlobalWindow(shard))
+                .<UnboundedSourceShard<T, ?>>createRootBundle()
+                .add(WindowedValue.<UnboundedSourceShard<T, 
?>>valueInGlobalWindow(shard))
                 .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
       }
       return initialShards.build();

http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index b3bbac8..501b436 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -18,10 +18,12 @@
 
 package org.apache.beam.runners.direct;
 
+import com.google.protobuf.Message;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
+import 
org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -93,7 +95,7 @@ class ViewOverrideFactory<ElemT, ViewT>
    * to {@link ViewT}.
    */
   static final class WriteView<ElemT, ViewT>
-      extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> 
{
+      extends RawPTransform<PCollection<Iterable<ElemT>>, 
PCollectionView<ViewT>, Message> {
     private final CreatePCollectionView<ElemT, ViewT> og;
 
     WriteView(CreatePCollectionView<ElemT, ViewT> og) {
@@ -110,5 +112,13 @@ class ViewOverrideFactory<ElemT, ViewT>
     public PCollectionView<ViewT> getView() {
       return og.getView();
     }
+
+    @Override
+    public String getUrn() {
+      return DIRECT_WRITE_VIEW_URN;
+    }
   }
+
+  public static final String DIRECT_WRITE_VIEW_URN =
+      "urn:beam:directrunner:transforms:write_view:v1";
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e29cc52/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml 
b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 8ff0cb0..3430750 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -405,4 +405,11 @@
     <Bug pattern="NM_CLASS_NOT_EXCEPTION"/>
     <!-- It is clear from the name that this class holds either StorageObject 
or IOException. -->
   </Match>
+
+  <Match>
+    <Class 
name="org.apache.beam.runners.direct.ParDoMultiOverrideFactory$StatefulParDo"/>
+    <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
+    <!-- PTransforms do not actually support serialization. -->
+  </Match>
+
 </FindBugsFilter>

Reply via email to