Repository: incubator-beam
Updated Branches:
  refs/heads/master d627266d8 -> cc64d654c


[flink] replace obsolete reflection call


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f630002
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f630002
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f630002

Branch: refs/heads/master
Commit: 9f630002e235f02042c309e57ea44a163ede8bdf
Parents: d627266
Author: Maximilian Michels <m...@apache.org>
Authored: Tue May 17 19:12:02 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Tue May 17 19:12:02 2016 +0200

----------------------------------------------------------------------
 .../flink/translation/wrappers/SinkOutputFormat.java | 15 +--------------
 1 file changed, 1 insertion(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f630002/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
index 2766a87..53e544d 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
@@ -46,23 +46,10 @@ public class SinkOutputFormat<T> implements OutputFormat<T> 
{
   private AbstractID uid = new AbstractID();
 
   public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions 
pipelineOptions) {
-    this.sink = extractSink(transform);
+    this.sink = transform.getSink();
     this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
   }
 
-  private Sink<T> extractSink(Write.Bound<T> transform) {
-    // TODO possibly add a getter in the upstream
-    try {
-      Field sinkField = transform.getClass().getDeclaredField("sink");
-      sinkField.setAccessible(true);
-      @SuppressWarnings("unchecked")
-      Sink<T> extractedSink = (Sink<T>) sinkField.get(transform);
-      return extractedSink;
-    } catch (NoSuchFieldException | IllegalAccessException e) {
-      throw new RuntimeException("Could not acquire custom sink field.", e);
-    }
-  }
-
   @Override
   public void configure(Configuration configuration) {
     writeOperation = 
sink.createWriteOperation(serializedOptions.getPipelineOptions());

Reply via email to