Repository: beam
Updated Branches:
  refs/heads/master fac4f3e3c -> 686b774ce


[BEAM-1993] Remove special unbounded Flink source/sink


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8213fa6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8213fa6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8213fa6

Branch: refs/heads/master
Commit: d8213fa6b274cd6acbf4da6deffd21ca23fd7f42
Parents: fac4f3e
Author: Ismaël Mejía <ieme...@apache.org>
Authored: Tue Apr 18 16:03:11 2017 +0200
Committer: Ismaël Mejía <ieme...@apache.org>
Committed: Tue Apr 18 16:15:09 2017 +0200

----------------------------------------------------------------------
 .../examples/streaming/KafkaIOExamples.java     | 338 -------------------
 .../KafkaWindowedWordCountExample.java          | 164 ---------
 .../FlinkStreamingTransformTranslators.java     |  87 +----
 .../flink/translation/types/FlinkCoder.java     |  63 ----
 .../streaming/io/UnboundedFlinkSink.java        | 200 -----------
 .../streaming/io/UnboundedFlinkSource.java      | 120 -------
 6 files changed, 12 insertions(+), 960 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
deleted file mode 100644
index 616e276..0000000
--- 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.examples.streaming;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Properties;
-import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkRunner;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.Write;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-/**
- * Recipes/Examples that demonstrate how to read/write data from/to Kafka.
- */
-public class KafkaIOExamples {
-
-
-  private static final String KAFKA_TOPIC = "input";  // Default kafka topic 
to read from
-  private static final String KAFKA_AVRO_TOPIC = "output";  // Default kafka 
topic to read from
-  private static final String KAFKA_BROKER = "localhost:9092";  // Default 
kafka broker to contact
-  private static final String GROUP_ID = "myGroup";  // Default groupId
-  private static final String ZOOKEEPER = "localhost:2181";  // Default 
zookeeper to connect (Kafka)
-
-  /**
-   * Read/Write String data to Kafka.
-   */
-  public static class KafkaString {
-
-    /**
-     * Read String data from Kafka.
-     */
-    public static class ReadStringFromKafka {
-
-      public static void main(String[] args) {
-
-        Pipeline p = initializePipeline(args);
-        KafkaOptions options = getOptions(p);
-
-        FlinkKafkaConsumer08<String> kafkaConsumer =
-            new FlinkKafkaConsumer08<>(options.getKafkaTopic(),
-                new SimpleStringSchema(), getKafkaProps(options));
-
-        p
-            
.apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer))).setCoder(StringUtf8Coder.of())
-            .apply(ParDo.of(new PrintFn<>()));
-
-        p.run();
-
-      }
-
-    }
-
-    /**
-     * Write String data to Kafka.
-     */
-    public static class WriteStringToKafka {
-
-      public static void main(String[] args) {
-
-        Pipeline p = initializePipeline(args);
-        KafkaOptions options = getOptions(p);
-
-        PCollection<String> words =
-            p.apply(Create.of("These", "are", "some", "words"));
-
-        FlinkKafkaProducer08<String> kafkaSink =
-            new FlinkKafkaProducer08<>(options.getKafkaTopic(),
-                new SimpleStringSchema(), getKafkaProps(options));
-
-        words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
-
-        p.run();
-      }
-
-    }
-  }
-
-  /**
-   * Read/Write Avro data to Kafka.
-   */
-  public static class KafkaAvro {
-
-    /**
-     * Read Avro data from Kafka.
-     */
-    public static class ReadAvroFromKafka {
-
-      public static void main(String[] args) {
-
-        Pipeline p = initializePipeline(args);
-        KafkaOptions options = getOptions(p);
-
-        FlinkKafkaConsumer08<MyType> kafkaConsumer =
-            new FlinkKafkaConsumer08<>(options.getKafkaAvroTopic(),
-                new AvroSerializationDeserializationSchema<>(MyType.class), 
getKafkaProps(options));
-
-        p
-            .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
-                .setCoder(AvroCoder.of(MyType.class))
-            .apply(ParDo.of(new PrintFn<>()));
-
-        p.run();
-
-      }
-
-    }
-
-    /**
-     * Write Avro data to Kafka.
-     */
-    public static class WriteAvroToKafka {
-
-      public static void main(String[] args) {
-
-        Pipeline p = initializePipeline(args);
-        KafkaOptions options = getOptions(p);
-
-        PCollection<MyType> words =
-            p.apply(Create.of(
-                new MyType("word", 1L),
-                new MyType("another", 2L),
-                new MyType("yet another", 3L)));
-
-        FlinkKafkaProducer08<MyType> kafkaSink =
-            new FlinkKafkaProducer08<>(options.getKafkaAvroTopic(),
-                new AvroSerializationDeserializationSchema<>(MyType.class), 
getKafkaProps(options));
-
-        words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
-
-        p.run();
-
-      }
-    }
-
-    /**
-     * Serialiation/Deserialiation schema for Avro types.
-     * @param <T> the type being encoded
-     */
-    static class AvroSerializationDeserializationSchema<T>
-        implements SerializationSchema<T>, DeserializationSchema<T> {
-
-      private final Class<T> avroType;
-
-      private final AvroCoder<T> coder;
-      private transient ByteArrayOutputStream out;
-
-      AvroSerializationDeserializationSchema(Class<T> clazz) {
-        this.avroType = clazz;
-        this.coder = AvroCoder.of(clazz);
-        this.out = new ByteArrayOutputStream();
-      }
-
-      @Override
-      public byte[] serialize(T element) {
-        if (out == null) {
-          out = new ByteArrayOutputStream();
-        }
-        try {
-          out.reset();
-          coder.encode(element, out, Coder.Context.NESTED);
-        } catch (IOException e) {
-          throw new RuntimeException("Avro encoding failed.", e);
-        }
-        return out.toByteArray();
-      }
-
-      @Override
-      public T deserialize(byte[] message) throws IOException {
-        return coder.decode(new ByteArrayInputStream(message), 
Coder.Context.NESTED);
-      }
-
-      @Override
-      public boolean isEndOfStream(T nextElement) {
-        return false;
-      }
-
-      @Override
-      public TypeInformation<T> getProducedType() {
-        return TypeExtractor.getForClass(avroType);
-      }
-    }
-
-    /**
-     * Custom type for Avro serialization.
-     */
-    static class MyType implements Serializable {
-
-      public MyType() {}
-
-      MyType(String word, long count) {
-        this.word = word;
-        this.count = count;
-      }
-
-      String word;
-      long count;
-
-      @Override
-      public String toString() {
-        return "MyType{"
-            + "word='" + word + '\''
-            + ", count=" + count
-            + '}';
-      }
-    }
-  }
-
-  // -------------- Utilities --------------
-
-  /**
-   * Custom options for the Pipeline.
-   */
-  public interface KafkaOptions extends FlinkPipelineOptions {
-    @Description("The Kafka topic to read from")
-    @Default.String(KAFKA_TOPIC)
-    String getKafkaTopic();
-
-    void setKafkaTopic(String value);
-
-    void setKafkaAvroTopic(String value);
-
-    @Description("The Kafka topic to read from")
-    @Default.String(KAFKA_AVRO_TOPIC)
-    String getKafkaAvroTopic();
-
-    @Description("The Kafka Broker to read from")
-    @Default.String(KAFKA_BROKER)
-    String getBroker();
-
-    void setBroker(String value);
-
-    @Description("The Zookeeper server to connect to")
-    @Default.String(ZOOKEEPER)
-    String getZookeeper();
-
-    void setZookeeper(String value);
-
-    @Description("The groupId")
-    @Default.String(GROUP_ID)
-    String getGroup();
-
-    void setGroup(String value);
-  }
-
-  /**
-   * Initializes some options for the Flink runner.
-   * @param args The command line args
-   * @return the pipeline
-   */
-  private static Pipeline initializePipeline(String[] args) {
-    KafkaOptions options =
-        PipelineOptionsFactory.fromArgs(args).as(KafkaOptions.class);
-
-    options.setStreaming(true);
-    options.setRunner(FlinkRunner.class);
-
-    options.setCheckpointingInterval(1000L);
-    options.setNumberOfExecutionRetries(5);
-    options.setExecutionRetryDelay(3000L);
-
-    return Pipeline.create(options);
-  }
-
-  /**
-   * Gets KafkaOptions from the Pipeline.
-   * @param p the pipeline
-   * @return KafkaOptions
-   */
-  private static KafkaOptions getOptions(Pipeline p) {
-    return p.getOptions().as(KafkaOptions.class);
-  }
-
-  /**
-   * Helper method to set the Kafka props from the pipeline options.
-   * @param options KafkaOptions
-   * @return Kafka props
-   */
-  private static Properties getKafkaProps(KafkaOptions options) {
-
-    Properties props = new Properties();
-    props.setProperty("zookeeper.connect", options.getZookeeper());
-    props.setProperty("bootstrap.servers", options.getBroker());
-    props.setProperty("group.id", options.getGroup());
-
-    return props;
-  }
-
-  /**
-   * Print contents to stdout.
-   * @param <T> type of the input
-   */
-  private static class PrintFn<T> extends DoFn<T, T> {
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      System.out.println(c.element().toString());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
deleted file mode 100644
index ee0e874..0000000
--- 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.examples.streaming;
-
-import java.util.Properties;
-import org.apache.beam.runners.flink.FlinkRunner;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.joda.time.Duration;
-
-/**
- * Wordcount example using Kafka topic.
- */
-public class KafkaWindowedWordCountExample {
-
-  static final String KAFKA_TOPIC = "test";  // Default kafka topic to read 
from
-  static final String KAFKA_BROKER = "localhost:9092";  // Default kafka 
broker to contact
-  static final String GROUP_ID = "myGroup";  // Default groupId
-  static final String ZOOKEEPER = "localhost:2181";  // Default zookeeper to 
connect to for Kafka
-
-  /**
-   * Function to extract words.
-   */
-  public static class ExtractWordsFn extends DoFn<String, String> {
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", Sum.ofLongs());
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      if (c.element().trim().isEmpty()) {
-        emptyLines.addValue(1L);
-      }
-
-      // Split the line into words.
-      String[] words = c.element().split("[^a-zA-Z']+");
-
-      // Output each word encountered into the output PCollection.
-      for (String word : words) {
-        if (!word.isEmpty()) {
-          c.output(word);
-        }
-      }
-    }
-  }
-
-  /**
-   * Function to format KV as String.
-   */
-  public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      String row = c.element().getKey() + " - " + c.element().getValue() + " @ 
"
-          + c.timestamp().toString();
-      System.out.println(row);
-      c.output(row);
-    }
-  }
-
-  /**
-   * Pipeline options.
-   */
-  public interface KafkaStreamingWordCountOptions
-      extends WindowedWordCount.StreamingWordCountOptions {
-    @Description("The Kafka topic to read from")
-    @Default.String(KAFKA_TOPIC)
-    String getKafkaTopic();
-
-    void setKafkaTopic(String value);
-
-    @Description("The Kafka Broker to read from")
-    @Default.String(KAFKA_BROKER)
-    String getBroker();
-
-    void setBroker(String value);
-
-    @Description("The Zookeeper server to connect to")
-    @Default.String(ZOOKEEPER)
-    String getZookeeper();
-
-    void setZookeeper(String value);
-
-    @Description("The groupId")
-    @Default.String(GROUP_ID)
-    String getGroup();
-
-    void setGroup(String value);
-
-  }
-
-  public static void main(String[] args) {
-    PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
-    KafkaStreamingWordCountOptions options = 
PipelineOptionsFactory.fromArgs(args)
-        .as(KafkaStreamingWordCountOptions.class);
-    options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() 
+ " seconds");
-    options.setStreaming(true);
-    options.setCheckpointingInterval(1000L);
-    options.setNumberOfExecutionRetries(5);
-    options.setExecutionRetryDelay(3000L);
-    options.setRunner(FlinkRunner.class);
-
-    System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() 
+ " "
-        + options.getBroker() + " " + options.getGroup());
-    Pipeline pipeline = Pipeline.create(options);
-
-    Properties p = new Properties();
-    p.setProperty("zookeeper.connect", options.getZookeeper());
-    p.setProperty("bootstrap.servers", options.getBroker());
-    p.setProperty("group.id", options.getGroup());
-
-    // this is the Flink consumer that reads the input to
-    // the program from a kafka topic.
-    FlinkKafkaConsumer08<String> kafkaConsumer = new FlinkKafkaConsumer08<>(
-        options.getKafkaTopic(),
-        new SimpleStringSchema(), p);
-
-    PCollection<String> words = pipeline
-        .apply("StreamingWordCount", 
Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
-        .apply(ParDo.of(new ExtractWordsFn()))
-        .apply(Window.<String>into(FixedWindows.of(
-            Duration.standardSeconds(options.getWindowSize())))
-            
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
-            .discardingFiredPanes());
-
-    PCollection<KV<String, Long>> wordCounts =
-        words.apply(Count.<String>perElement());
-
-    wordCounts.apply(ParDo.of(new FormatAsStringFn()))
-        .apply(TextIO.Write.to("./outputKafka.txt"));
-
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index fbd7620..123d5e7 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -36,7 +36,6 @@ import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.types.FlinkCoder;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
@@ -45,17 +44,13 @@ import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDo
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -69,7 +64,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.AppliedCombineFn;
@@ -94,12 +88,10 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -124,7 +116,6 @@ class FlinkStreamingTransformTranslators {
   static {
     TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
     TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
-    TRANSLATORS.put(Write.class, new WriteSinkStreamingTranslator());
     TRANSLATORS.put(TextIO.Write.Bound.class, new 
TextIOWriteBoundStreamingTranslator());
 
     TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator());
@@ -203,31 +194,6 @@ class FlinkStreamingTransformTranslators {
     }
   }
 
-  private static class WriteSinkStreamingTranslator<T>
-      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write<T>> {
-
-    @Override
-    public void translateNode(Write<T> transform, 
FlinkStreamingTranslationContext context) {
-      String name = transform.getName();
-      PValue input = context.getInput(transform);
-
-      Sink<T> sink = transform.getSink();
-      if (!(sink instanceof UnboundedFlinkSink)) {
-        throw new UnsupportedOperationException(
-            "At the time, only unbounded Flink sinks are supported.");
-      }
-
-      DataStream<WindowedValue<T>> inputDataSet = 
context.getInputDataStream(input);
-
-      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>() {
-        @Override
-        public void flatMap(WindowedValue<T> value, Collector<Object> out) 
throws Exception {
-          out.collect(value.getValue());
-        }
-      }).addSink(((UnboundedFlinkSink<Object>) 
sink).getFlinkSource()).name(name);
-    }
-  }
-
   private static class UnboundedReadSourceTranslator<T>
       extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
 
@@ -241,47 +207,18 @@ class FlinkStreamingTransformTranslators {
           context.getTypeInfo(context.getOutput(transform));
 
       DataStream<WindowedValue<T>> source;
-      if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) 
{
-        @SuppressWarnings("unchecked")
-        UnboundedFlinkSource<T> flinkSourceFunction =
-            (UnboundedFlinkSource<T>) transform.getSource();
-
-        final AssignerWithPeriodicWatermarks<T> flinkAssigner =
-            flinkSourceFunction.getFlinkTimestampAssigner();
-
-        DataStream<T> flinkSource = context.getExecutionEnvironment()
-            .addSource(flinkSourceFunction.getFlinkSource());
-
-        flinkSourceFunction.setCoder(
-            new FlinkCoder<T>(flinkSource.getType(),
-              context.getExecutionEnvironment().getConfig()));
-
-        source = flinkSource
-            .assignTimestampsAndWatermarks(flinkAssigner)
-            .flatMap(new FlatMapFunction<T, WindowedValue<T>>() {
-              @Override
-              public void flatMap(T s, Collector<WindowedValue<T>> collector) 
throws Exception {
-                collector.collect(
-                    WindowedValue.of(
-                        s,
-                        new Instant(flinkAssigner.extractTimestamp(s, -1)),
-                        GlobalWindow.INSTANCE,
-                        PaneInfo.NO_FIRING));
-              }}).returns(outputTypeInfo);
-      } else {
-        try {
-          UnboundedSourceWrapper<T, ?> sourceWrapper =
-              new UnboundedSourceWrapper<>(
-                  context.getPipelineOptions(),
-                  transform.getSource(),
-                  context.getExecutionEnvironment().getParallelism());
-          source = context
-              .getExecutionEnvironment()
-              
.addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
-        } catch (Exception e) {
-          throw new RuntimeException(
-              "Error while translating UnboundedSource: " + 
transform.getSource(), e);
-        }
+      try {
+        UnboundedSourceWrapper<T, ?> sourceWrapper =
+            new UnboundedSourceWrapper<>(
+                context.getPipelineOptions(),
+                transform.getSource(),
+                context.getExecutionEnvironment().getParallelism());
+        source = context
+            .getExecutionEnvironment()
+            
.addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "Error while translating UnboundedSource: " + 
transform.getSource(), e);
       }
 
       context.setOutputDataStream(output, source);

http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java
deleted file mode 100644
index 8b90c73..0000000
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-
-/**
- * A Coder that uses Flink's serialization system.
- * @param <T> The type of the value to be encoded
- */
-public class FlinkCoder<T> extends StandardCoder<T> {
-
-  private final TypeSerializer<T> typeSerializer;
-
-  public FlinkCoder(TypeInformation<T> typeInformation, ExecutionConfig 
executionConfig) {
-    this.typeSerializer = typeInformation.createSerializer(executionConfig);
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context) throws 
IOException {
-    typeSerializer.serialize(value, new 
DataOutputViewStreamWrapper(outStream));
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context) throws IOException {
-    return typeSerializer.deserialize(new 
DataInputViewStreamWrapper(inStream));
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
deleted file mode 100644
index af36b80..0000000
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.io.Sink;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-
-/**
- * A wrapper translating Flink sinks implementing the {@link SinkFunction} 
interface, into
- * unbounded Beam sinks (see {@link UnboundedSource}).
- * */
-public class UnboundedFlinkSink<T> extends Sink<T> {
-
-  /* The Flink sink function */
-  private final SinkFunction<T> flinkSink;
-
-  private UnboundedFlinkSink(SinkFunction<T> flinkSink) {
-    this.flinkSink = flinkSink;
-  }
-
-  public SinkFunction<T> getFlinkSource() {
-    return this.flinkSink;
-  }
-
-  @Override
-  public void validate(PipelineOptions options) {
-  }
-
-  @Override
-  public WriteOperation<T, ?> createWriteOperation(PipelineOptions options) {
-    return new WriteOperation<T, Object>() {
-      @Override
-      public void initialize(PipelineOptions options) throws Exception {
-
-      }
-
-      @Override
-      public void setWindowedWrites(boolean windowedWrites) {
-      }
-
-      @Override
-      public void finalize(Iterable<Object> writerResults, PipelineOptions 
options)
-          throws Exception {
-
-      }
-
-      @Override
-      public Coder<Object> getWriterResultCoder() {
-        return new Coder<Object>() {
-          @Override
-          public void encode(Object value, OutputStream outStream, Context 
context)
-              throws CoderException, IOException {
-
-          }
-
-          @Override
-          public Object decode(InputStream inStream, Context context)
-              throws CoderException, IOException {
-            return null;
-          }
-
-          @Override
-          public List<? extends Coder<?>> getCoderArguments() {
-            return null;
-          }
-
-          @Override
-          public CloudObject asCloudObject() {
-            return null;
-          }
-
-          @Override
-          public void verifyDeterministic() throws NonDeterministicException {
-
-          }
-
-          @Override
-          public boolean consistentWithEquals() {
-            return false;
-          }
-
-          @Override
-          public Object structuralValue(Object value) throws Exception {
-            return null;
-          }
-
-          @Override
-          public boolean isRegisterByteSizeObserverCheap(Object value, Context 
context) {
-            return false;
-          }
-
-          @Override
-          public void registerByteSizeObserver(Object value,
-                                               ElementByteSizeObserver 
observer,
-                                               Context context) throws 
Exception {
-
-          }
-
-          @Override
-          public String getEncodingId() {
-            return null;
-          }
-
-          @Override
-          public Collection<String> getAllowedEncodings() {
-            return null;
-          }
-
-          @Override
-          public TypeDescriptor<Object> getEncodedTypeDescriptor() {
-            return TypeDescriptor.of(Object.class);
-          }
-        };
-      }
-
-      @Override
-      public Writer<T, Object> createWriter(PipelineOptions options) throws 
Exception {
-        return new Writer<T, Object>() {
-          @Override
-          public void openWindowed(String uId,
-                                   BoundedWindow window,
-                                   PaneInfo paneInfo,
-                                   int shard,
-                                   int numShards) throws Exception {
-          }
-
-          @Override
-          public void openUnwindowed(String uId, int shard, int numShards) 
throws Exception {
-          }
-
-          @Override
-          public void cleanup() throws Exception {
-
-          }
-
-          @Override
-          public void write(T value) throws Exception {
-
-          }
-
-          @Override
-          public Object close() throws Exception {
-            return null;
-          }
-
-          @Override
-          public WriteOperation<T, Object> getWriteOperation() {
-            return null;
-          }
-
-        };
-      }
-
-      @Override
-      public Sink<T> getSink() {
-        return UnboundedFlinkSink.this;
-      }
-    };
-  }
-
-  /**
-   * Creates a Flink sink to write to using the Write API.
-   * @param flinkSink The Flink sink, e.g. FlinkKafkaProducer09
-   * @param <T> The input type of the sink
-   * @return A Beam sink wrapping a Flink sink
-   */
-  public static <T> Sink<T> of(SinkFunction<T> flinkSink) {
-    return new UnboundedFlinkSink<>(flinkSink);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
deleted file mode 100644
index ac20c34..0000000
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
-/**
- * A wrapper translating Flink Sources implementing the {@link SourceFunction} 
interface, into
- * unbounded Beam sources (see {@link UnboundedSource}).
- * */
-public class UnboundedFlinkSource<T> extends UnboundedSource<T, 
UnboundedSource.CheckpointMark> {
-
-  private final SourceFunction<T> flinkSource;
-
-  /** Coder set during translation. */
-  private Coder<T> coder;
-
-  /** Timestamp / watermark assigner for source; defaults to ingestion time. */
-  private AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner =
-      new IngestionTimeExtractor<T>();
-
-  public UnboundedFlinkSource(SourceFunction<T> source) {
-    flinkSource = checkNotNull(source);
-  }
-
-  public UnboundedFlinkSource(SourceFunction<T> source,
-                              AssignerWithPeriodicWatermarks<T> 
timestampAssigner) {
-    flinkSource = checkNotNull(source);
-    flinkTimestampAssigner = checkNotNull(timestampAssigner);
-  }
-
-  public SourceFunction<T> getFlinkSource() {
-    return this.flinkSource;
-  }
-
-  public AssignerWithPeriodicWatermarks<T> getFlinkTimestampAssigner() {
-    return flinkTimestampAssigner;
-  }
-
-  @Override
-  public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> 
generateInitialSplits(
-      int desiredNumSplits,
-      PipelineOptions options) throws Exception {
-    throw new RuntimeException("Flink Sources are supported only when "
-        + "running with the FlinkRunner.");
-  }
-
-  @Override
-  public UnboundedReader<T> createReader(PipelineOptions options,
-                                         @Nullable CheckpointMark 
checkpointMark) {
-    throw new RuntimeException("Flink Sources are supported only when "
-        + "running with the FlinkRunner.");
-  }
-
-  @Nullable
-  @Override
-  public Coder<UnboundedSource.CheckpointMark> getCheckpointMarkCoder() {
-    throw new RuntimeException("Flink Sources are supported only when "
-        + "running with the FlinkRunner.");
-  }
-
-
-  @Override
-  public void validate() {
-  }
-
-  @Override
-  public Coder<T> getDefaultOutputCoder() {
-    // The coder derived from the Flink source
-    return coder;
-  }
-
-  public void setCoder(Coder<T> coder) {
-    this.coder = coder;
-  }
-
-  public void setFlinkTimestampAssigner(AssignerWithPeriodicWatermarks<T> 
flinkTimestampAssigner) {
-    this.flinkTimestampAssigner = flinkTimestampAssigner;
-  }
-
-  /**
-   * Creates a new unbounded source from a Flink source.
-   * @param flinkSource The Flink source function
-   * @param <T> The type that the source function produces.
-   * @return The wrapped source function.
-   */
-  public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(
-      SourceFunction<T> flinkSource) {
-    return new UnboundedFlinkSource<>(flinkSource);
-  }
-
-  public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(
-          SourceFunction<T> flinkSource, AssignerWithPeriodicWatermarks<T> 
flinkTimestampAssigner) {
-    return new UnboundedFlinkSource<>(flinkSource, flinkTimestampAssigner);
-  }
-}

Reply via email to