Repository: incubator-beam Updated Branches: refs/heads/master d627266d8 -> cc64d654c
[flink] replace obsolete reflection call Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f630002 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f630002 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f630002 Branch: refs/heads/master Commit: 9f630002e235f02042c309e57ea44a163ede8bdf Parents: d627266 Author: Maximilian Michels <m...@apache.org> Authored: Tue May 17 19:12:02 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Tue May 17 19:12:02 2016 +0200 ---------------------------------------------------------------------- .../flink/translation/wrappers/SinkOutputFormat.java | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f630002/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java index 2766a87..53e544d 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java @@ -46,23 +46,10 @@ public class SinkOutputFormat<T> implements OutputFormat<T> { private AbstractID uid = new AbstractID(); public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) { - this.sink = extractSink(transform); + this.sink = transform.getSink(); this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); } - private Sink<T> extractSink(Write.Bound<T> transform) { - // TODO possibly add a getter in the upstream - try { - Field sinkField = transform.getClass().getDeclaredField("sink"); - sinkField.setAccessible(true); - @SuppressWarnings("unchecked") - Sink<T> extractedSink = (Sink<T>) sinkField.get(transform); - return extractedSink; - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException("Could not acquire custom sink field.", e); - } - } - @Override public void configure(Configuration configuration) { writeOperation = sink.createWriteOperation(serializedOptions.getPipelineOptions());