http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
new file mode 100644
index 0000000..177a9ee
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
+  SingleOutputStreamOperator, GroupedDataStream}
+import scala.reflect.ClassTag
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.streaming.api.invokable.operator.MapInvokable
+import org.apache.flink.util.Collector
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.streaming.api.invokable.StreamInvokable
+import org.apache.flink.streaming.api.invokable.operator.{ 
GroupedReduceInvokable, StreamReduceInvokable }
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.streaming.api.function.sink.SinkFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
+import org.apache.flink.streaming.api.windowing.policy.{ EvictionPolicy, 
TriggerPolicy }
+import org.apache.flink.streaming.api.collector.OutputSelector
+import scala.collection.JavaConversions._
+import java.util.HashMap
+import org.apache.flink.streaming.api.function.aggregation.SumFunction
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction
+import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
+import 
org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy
+
+class DataStream[T](javaStream: JavaStream[T]) {
+
+  /**
+   * Gets the underlying java DataStream object.
+   */
+  def getJavaStream: JavaStream[T] = javaStream
+
+  /**
+   * Sets the degree of parallelism of this operation. This must be greater 
than 1.
+   */
+  def setParallelism(dop: Int): DataStream[T] = {
+    javaStream match {
+      case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop)
+      case _ =>
+        throw new UnsupportedOperationException("Operator " + 
javaStream.toString +  " cannot " +
+          "have " +
+          "parallelism.")
+    }
+    this
+  }
+
+  /**
+   * Returns the degree of parallelism of this operation.
+   */
+  def getParallelism: Int = javaStream match {
+    case op: SingleOutputStreamOperator[_, _] => op.getParallelism
+    case _ =>
+      throw new UnsupportedOperationException("Operator " + 
javaStream.toString + " does not have" +
+        " "  +
+        "parallelism.")
+  }
+  
+  def setChainingStrategy(strategy: ChainingStrategy): DataStream[T] = {
+    javaStream match {
+      case ds: SingleOutputStreamOperator[_, _] => 
ds.setChainingStrategy(strategy)
+      case _ =>
+        throw new UnsupportedOperationException("Only supported for 
operators.")
+    }
+    this
+  }
+
+  /**
+   * Creates a new DataStream by merging DataStream outputs of
+   * the same type with each other. The DataStreams merged using this operator
+   * will be transformed simultaneously.
+   *
+   */
+  def merge(dataStreams: DataStream[T]*): DataStream[T] =
+    javaStream.merge(dataStreams.map(_.getJavaStream): _*)
+
+  /**
+   * Creates a new ConnectedDataStream by connecting
+   * DataStream outputs of different type with each other. The
+   * DataStreams connected using this operators can be used with CoFunctions.
+   *
+   */
+  def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] = 
+    javaStream.connect(dataStream.getJavaStream)
+
+  /**
+   * Groups the elements of a DataStream by the given key positions (for 
tuple/array types) to
+   * be used with grouped operators like grouped reduce or grouped aggregations
+   *
+   */
+  def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*)
+
+  /**
+   * Groups the elements of a DataStream by the given field expressions to
+   * be used with grouped operators like grouped reduce or grouped aggregations
+   *
+   */
+  def groupBy(firstField: String, otherFields: String*): DataStream[T] = 
+   javaStream.groupBy(firstField +: otherFields.toArray: _*)   
+  
+  /**
+   * Groups the elements of a DataStream by the given K key to
+   * be used with grouped operators like grouped reduce or grouped aggregations
+   *
+   */
+  def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = {
+
+    val keyExtractor = new KeySelector[T, K] {
+      val cleanFun = clean(fun)
+      def getKey(in: T) = cleanFun(in)
+    }
+    javaStream.groupBy(keyExtractor)
+  }
+
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are broadcasted to every parallel instance of the next component. This
+   * setting only effects the how the outputs will be distributed between the
+   * parallel instances of the next processing operator.
+   *
+   */
+  def broadcast: DataStream[T] = javaStream.broadcast()
+  
+  /**
+   * Sets the partitioning of the DataStream so that the output values all go 
to 
+   * the first instance of the next processing operator. Use this setting with 
care
+   * since it might cause a serious performance bottleneck in the application.
+   */
+  def global: DataStream[T] = javaStream.global()
+
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are shuffled to the next component. This setting only effects the how the
+   * outputs will be distributed between the parallel instances of the next
+   * processing operator.
+   *
+   */
+  def shuffle: DataStream[T] = javaStream.shuffle()
+
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are forwarded to the local subtask of the next component (whenever
+   * possible). This is the default partitioner setting. This setting only
+   * effects the how the outputs will be distributed between the parallel
+   * instances of the next processing operator.
+   *
+   */
+  def forward: DataStream[T] = javaStream.forward()
+
+  /**
+   * Sets the partitioning of the DataStream so that the output tuples
+   * are distributed evenly to the next component.This setting only effects
+   * the how the outputs will be distributed between the parallel instances of
+   * the next processing operator.
+   *
+   */
+  def distribute: DataStream[T] = javaStream.distribute()
+
+  /**
+   * Initiates an iterative part of the program that creates a loop by feeding
+   * back data streams. To create a streaming iteration the user needs to 
define
+   * a transformation that creates two DataStreams.The first one one is the 
output
+   * that will be fed back to the start of the iteration and the second is the 
output
+   * stream of the iterative part.
+   * <p>
+   * stepfunction: initialStream => (feedback, output)
+   * <p>
+   * A common pattern is to use output splitting to create feedback and output 
DataStream.
+   * Please refer to the .split(...) method of the DataStream
+   * <p>
+   * By default a DataStream with iteration will never terminate, but the user
+   * can use the maxWaitTime parameter to set a max waiting time for the 
iteration head.
+   * If no data received in the set time the stream terminates.
+   *
+   *
+   */
+  def iterate[R](stepFunction: DataStream[T] => (DataStream[T], 
DataStream[R]),  
+        maxWaitTimeMillis:Long = 0): DataStream[R] = {
+    val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
+
+    val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
+    iterativeStream.closeWith(feedback.getJavaStream)
+    output
+  }
+
+  /**
+   * Applies an aggregation that that gives the current maximum of the data 
stream at
+   * the given position.
+   *
+   */
+  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, 
position)
+  
+  /**
+   * Applies an aggregation that that gives the current maximum of the data 
stream at
+   * the given field.
+   *
+   */
+  def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
+  
+  /**
+   * Applies an aggregation that that gives the current minimum of the data 
stream at
+   * the given position.
+   *
+   */
+  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, 
position)
+  
+  /**
+   * Applies an aggregation that that gives the current minimum of the data 
stream at
+   * the given field.
+   *
+   */
+  def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
+
+  /**
+   * Applies an aggregation that sums the data stream at the given position.
+   *
+   */
+  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, 
position)
+  
+  /**
+   * Applies an aggregation that sums the data stream at the given field.
+   *
+   */
+  def sum(field: String): DataStream[T] =  aggregate(AggregationType.SUM, 
field)
+
+  /**
+   * Applies an aggregation that that gives the current minimum element of the 
data stream by
+   * the given position. When equality, the first element is returned with the 
minimal value.
+   *
+   */
+  def minBy(position: Int): DataStream[T] = aggregate(AggregationType
+    .MINBY, position)
+    
+   /**
+   * Applies an aggregation that that gives the current minimum element of the 
data stream by
+   * the given field. When equality, the first element is returned with the 
minimal value.
+   *
+   */
+  def minBy(field: String): DataStream[T] = aggregate(AggregationType
+    .MINBY, field )
+
+   /**
+   * Applies an aggregation that that gives the current maximum element of the 
data stream by
+   * the given position. When equality, the first element is returned with the 
maximal value.
+   *
+   */
+  def maxBy(position: Int): DataStream[T] =
+    aggregate(AggregationType.MAXBY, position)
+    
+   /**
+   * Applies an aggregation that that gives the current maximum element of the 
data stream by
+   * the given field. When equality, the first element is returned with the 
maximal value.
+   *
+   */
+  def maxBy(field: String): DataStream[T] =
+    aggregate(AggregationType.MAXBY, field)
+    
+  private def aggregate(aggregationType: AggregationType, field: String): 
DataStream[T] = {
+    val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
+    aggregate(aggregationType, position)
+  }
+
+  private def aggregate(aggregationType: AggregationType, position: Int):
+    DataStream[T] = {
+
+    val jStream = javaStream.asInstanceOf[JavaStream[Product]]
+    val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
+
+    val agg = new 
ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), 
position)
+
+    val reducer = aggregationType match {
+      case AggregationType.SUM => new 
agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).
+        getTypeClass()))
+      case _ => new agg.ProductComparableAggregator(aggregationType, true)
+    }
+
+    val invokable = jStream match {
+      case groupedStream: GroupedDataStream[_] => new 
GroupedReduceInvokable(reducer,
+        groupedStream.getKeySelector())
+      case _ => new StreamReduceInvokable(reducer)
+    }
+    new DataStream[Product](jStream.transform("aggregation", jStream.getType(),
+      invokable)).asInstanceOf[DataStream[T]]
+  }
+
+  /**
+   * Creates a new DataStream containing the current number (count) of
+   * received records.
+   *
+   */
+  def count: DataStream[Long] = new DataStream[java.lang.Long](
+    javaStream.count()).asInstanceOf[DataStream[Long]]
+
+  /**
+   * Creates a new DataStream by applying the given function to every element 
of this DataStream.
+   */
+  def map[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+    val mapper = new MapFunction[T, R] {
+      val cleanFun = clean(fun)
+      def map(in: T): R = cleanFun(in)
+    }
+
+    javaStream.transform("map", implicitly[TypeInformation[R]], new 
MapInvokable[T, R](mapper))
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element 
of this DataStream.
+   */
+  def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): 
DataStream[R] = {
+    if (mapper == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+
+    javaStream.transform("map", implicitly[TypeInformation[R]], new 
MapInvokable[T, R](mapper))
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element 
and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, 
R]): DataStream[R] = {
+    if (flatMapper == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+   javaStream.transform("flatMap", implicitly[TypeInformation[R]], 
+       new FlatMapInvokable[T, R](flatMapper))
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element 
and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): 
DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    val flatMapper = new FlatMapFunction[T, R] {
+      val cleanFun = clean(fun)
+      def flatMap(in: T, out: Collector[R]) { cleanFun(in, out) }
+    }
+    flatMap(flatMapper)
+  }
+
+  /**
+   * Creates a new DataStream by applying the given function to every element 
and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): 
DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    val flatMapper = new FlatMapFunction[T, R] {
+      val cleanFun = clean(fun)
+      def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect 
}
+    }
+    flatMap(flatMapper)
+  }
+
+  /**
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream
+   * using an associative reduce function.
+   */
+  def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
+    if (reducer == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    javaStream match {
+      case ds: GroupedDataStream[_] => javaStream.transform("reduce",
+        javaStream.getType(), new GroupedReduceInvokable[T](reducer, 
ds.getKeySelector()))
+      case _ => javaStream.transform("reduce", javaStream.getType(),
+        new StreamReduceInvokable[T](reducer))
+    }
+  }
+
+  /**
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream
+   * using an associative reduce function.
+   */
+  def reduce(fun: (T, T) => T): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val reducer = new ReduceFunction[T] {
+      val cleanFun = clean(fun)
+      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+    }
+    reduce(reducer)
+  }
+
+  /**
+   * Creates a new DataStream that contains only the elements satisfying the 
given filter predicate.
+   */
+  def filter(filter: FilterFunction[T]): DataStream[T] = {
+    if (filter == null) {
+      throw new NullPointerException("Filter function must not be null.")
+    }
+    javaStream.filter(filter)
+  }
+
+  /**
+   * Creates a new DataStream that contains only the elements satisfying the 
given filter predicate.
+   */
+  def filter(fun: T => Boolean): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Filter function must not be null.")
+    }
+    val filter = new FilterFunction[T] {
+      val cleanFun = clean(fun)
+      def filter(in: T) = cleanFun(in)
+    }
+    this.filter(filter)
+  }
+
+  /**
+   * Create a WindowedDataStream that can be used to apply
+   * transformation like .reduce(...) or aggregations on
+   * preset chunks(windows) of the data stream. To define the windows one or
+   * more WindowingHelper-s such as Time, Count and
+   * Delta can be used.</br></br> When applied to a grouped data
+   * stream, the windows (evictions) and slide sizes (triggers) will be
+   * computed on a per group basis. </br></br> For more advanced control over
+   * the trigger and eviction policies please use to
+   * window(List(triggers), List(evicters))
+   */
+  def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
+    javaStream.window(windowingHelper: _*)
+
+  /**
+   * Create a WindowedDataStream using the given TriggerPolicy-s and 
EvictionPolicy-s.
+   * Windowing can be used to apply transformation like .reduce(...) or 
aggregations on
+   * preset chunks(windows) of the data stream.</br></br>For most common
+   * use-cases please refer to window(WindowingHelper[_]*)
+   *
+   */
+  def window(triggers: List[TriggerPolicy[T]], evicters: 
List[EvictionPolicy[T]]):
+    WindowedDataStream[T] = javaStream.window(triggers, evicters)
+
+  /**
+   *
+   * Operator used for directing tuples to specific named outputs using an
+   * OutputSelector. Calling this method on an operator creates a new
+   * SplitDataStream.
+   */
+  def split(selector: OutputSelector[T]): SplitDataStream[T] = 
javaStream.split(selector)
+
+  /**
+   * Creates a new SplitDataStream that contains only the elements satisfying 
the
+   *  given output selector predicate.
+   */
+  def split(fun: T => String): SplitDataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("OutputSelector must not be null.")
+    }
+    val selector = new OutputSelector[T] {
+      val cleanFun = clean(fun)
+      def select(in: T): java.lang.Iterable[String] = {
+        List(cleanFun(in))
+      }
+    }
+    split(selector)
+  }
+
+  /**
+   * Initiates a temporal Join transformation that joins the elements of two
+   * data streams on key equality over a specified time window.
+   *
+   * This method returns a StreamJoinOperator on which the
+   * .onWindow(..) should be called to define the
+   * window, and then the .where(..) and .equalTo(..) methods can be used to 
defin
+   * the join keys.</p> The user can also use the apply method of the returned 
JoinedStream
+   * to use custom join function.
+   *
+   */
+  def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] =
+    new StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
+
+  /**
+   * Initiates a temporal cross transformation that builds all pair
+   * combinations of elements of both DataStreams, i.e., it builds a Cartesian
+   * product.
+   *
+   * This method returns a StreamJoinOperator on which the
+   * .onWindow(..) should be called to define the
+   * window, and then the .where(..) and .equalTo(..) methods can be used to 
defin
+   * the join keys.</p> The user can also use the apply method of the returned 
JoinedStream
+   * to use custom join function.
+   *
+   */
+  def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] =
+    new StreamCrossOperator[T, R](javaStream, stream.getJavaStream)
+
+  /**
+   * Writes a DataStream to the standard output stream (stdout). For each
+   * element of the DataStream the result of .toString is
+   * written.
+   *
+   */
+  def print(): DataStream[T] = javaStream.print()
+
+  /**
+   * Writes a DataStream to the file specified by path in text format. The
+   * writing is performed periodically, in every millis milliseconds. For
+   * every element of the DataStream the result of .toString
+   * is written.
+   *
+   */
+  def writeAsText(path: String, millis: Long = 0): DataStream[T] =
+    javaStream.writeAsText(path, millis)
+
+  /**
+   * Writes a DataStream to the file specified by path in text format. The
+   * writing is performed periodically, in every millis milliseconds. For
+   * every element of the DataStream the result of .toString
+   * is written.
+   *
+   */
+  def writeAsCsv(path: String, millis: Long = 0): DataStream[T] =
+    javaStream.writeAsCsv(path, millis)
+
+  /**
+   * Adds the given sink to this DataStream. Only streams with sinks added
+   * will be executed once the StreamExecutionEnvironment.execute(...)
+   * method is called.
+   *
+   */
+  def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] =
+    javaStream.addSink(sinkFuntion)
+
+  /**
+   * Adds the given sink to this DataStream. Only streams with sinks added
+   * will be executed once the StreamExecutionEnvironment.execute(...)
+   * method is called.
+   *
+   */
+  def addSink(fun: T => Unit): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Sink function must not be null.")
+    }
+    val sinkFunction = new SinkFunction[T] {
+      val cleanFun = clean(fun)
+      def invoke(in: T) = cleanFun(in)
+    }
+    this.addSink(sinkFunction)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
new file mode 100644
index 0000000..9e33f80
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.datastream.{ SplitDataStream => 
SplitJavaStream }
+
+/**
+ * The SplitDataStream represents an operator that has been split using an
+ * {@link OutputSelector}. Named outputs can be selected using the
+ * {@link #select} function. To apply a transformation on the whole output 
simply call
+ * the appropriate method on this stream.
+ *
+ * @param <OUT>
+ *            The type of the output.
+ */
+class SplitDataStream[T](javaStream: SplitJavaStream[T]) extends 
DataStream[T](javaStream){
+
+  /**
+   *  Sets the output names for which the next operator will receive values.
+   */
+  def select(outputNames: String*): DataStream[T] = 
javaStream.select(outputNames: _*)
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
new file mode 100644
index 0000000..a408ec0
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import scala.reflect.ClassTag
+import org.apache.commons.lang.Validate
+import org.apache.flink.api.common.functions.CrossFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.scala.typeutils.CaseClassSerializer
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
+import org.apache.flink.streaming.api.function.co.CrossWindowFunction
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+import 
org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
+import java.util.concurrent.TimeUnit
+
+class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) 
extends
+  TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
+
+  override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, 
I2] = {
+
+    val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this,
+      (l: I1, r: I2) => (l, r))
+
+    val returnType = new CaseClassTypeInfo[(I1, I2)](
+
+      classOf[(I1, I2)], Seq(input1.getType, input2.getType), Array("_1", 
"_2")) {
+
+      override def createSerializer: TypeSerializer[(I1, I2)] = {
+        val fieldSerializers: Array[TypeSerializer[_]] = new 
Array[TypeSerializer[_]](getArity)
+        for (i <- 0 until getArity) {
+          fieldSerializers(i) = types(i).createSerializer
+        }
+
+        new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) 
{
+          override def createInstance(fields: Array[AnyRef]) = {
+            (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
+          }
+        }
+      }
+    }
+
+    val javaStream = input1.connect(input2).addGeneralWindowCombine(
+      crossWindowFunction,
+      returnType, windowSize,
+      slideInterval, timeStamp1, timeStamp2)
+
+    new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream)
+  }
+}
+object StreamCrossOperator {
+
+  private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2],
+                                           javaStream: JavaStream[(I1, I2)]) 
extends
+    DataStream[(I1, I2)](javaStream) with TemporalWindow[CrossWindow[I1, I2]] {
+
+    /**
+     * Sets a wrapper for the crossed elements. For each crossed pair, the 
result of the udf
+     * call will be emitted.
+     *
+     */
+    def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] 
= {
+
+      val invokable = new CoWindowInvokable[I1, I2, R](
+        clean(getCrossWindowFunction(op, fun)), op.windowSize, 
op.slideInterval, op.timeStamp1,
+        op.timeStamp2)
+
+      
javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(),
+        invokable)
+
+      javaStream.setType(implicitly[TypeInformation[R]])
+    }
+    
+    override def every(length: Long, timeUnit: TimeUnit): CrossWindow[I1, I2] 
= {
+      every(timeUnit.toMillis(length))
+    }
+
+    override def every(length: Long): CrossWindow[I1, I2] = {
+      val builder = javaStream.getExecutionEnvironment().getStreamGraph()
+      val invokable = builder.getInvokable(javaStream.getId())
+      invokable.asInstanceOf[CoWindowInvokable[_,_,_]].setSlideSize(length)
+      this
+    }
+  }
+
+  private[flink] def getCrossWindowFunction[I1, I2, R](op: 
StreamCrossOperator[I1, I2],
+                                                       crossFunction: (I1, I2) 
=> R):
+  CrossWindowFunction[I1, I2, R] = {
+    Validate.notNull(crossFunction, "Join function must not be null.")
+
+    val crossFun = new CrossFunction[I1, I2, R] {
+      val cleanFun = op.input1.clean(crossFunction)
+
+      override def cross(first: I1, second: I2): R = {
+        cleanFun(first, second)
+      }
+    }
+
+    new CrossWindowFunction[I1, I2, R](crossFun)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
new file mode 100644
index 0000000..394673c
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import scala.reflect.ClassTag
+import org.apache.commons.lang.Validate
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JavaEnv}
+import org.apache.flink.streaming.api.function.source.{ FromElementsFunction, 
SourceFunction }
+import org.apache.flink.util.Collector
+import org.apache.flink.api.scala.ClosureCleaner
+import 
org.apache.flink.streaming.api.function.source.FileMonitoringFunction.WatchType
+
+class StreamExecutionEnvironment(javaEnv: JavaEnv) {
+
+  /**
+   * Sets the degree of parallelism (DOP) for operations executed through this 
environment.
+   * Setting a DOP of x here will cause all operators (such as join, map, 
reduce) to run with
+   * x parallel instances. This value can be overridden by specific operations 
using
+   * [[DataStream.setParallelism]].
+   */
+  def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
+    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+  }
+
+  /**
+   * Returns the default degree of parallelism for this execution environment. 
Note that this
+   * value can be overridden by individual operations using 
[[DataStream.setParallelism]]
+   */
+  def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
+
+  /**
+   * Sets the maximum time frequency (milliseconds) for the flushing of the
+   * output buffers. By default the output buffers flush frequently to provide
+   * low latency and to aid smooth developer experience. Setting the parameter
+   * can result in three logical modes:
+   *
+   * <ul>
+   * <li>
+   * A positive integer triggers flushing periodically by that integer</li>
+   * <li>
+   * 0 triggers flushing after every record thus minimizing latency</li>
+   * <li>
+   * -1 triggers flushing only when the output buffer is full thus maximizing
+   * throughput</li>
+   * </ul>
+   *
+   */
+  def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
+    javaEnv.setBufferTimeout(timeoutMillis)
+    this
+  }
+
+  /**
+   * Gets the default buffer timeout set for this environment
+   */
+  def getBufferTimout: Long = javaEnv.getBufferTimeout()
+
+  /**
+   * Creates a DataStream that represents the Strings produced by reading the
+   * given file line wise. The file will be read with the system's default
+   * character set.
+   *
+   */
+  def readTextFile(filePath: String): DataStream[String] =
+    javaEnv.readTextFile(filePath)
+
+  /**
+   * Creates a DataStream that contains the contents of file created while
+   * system watches the given path. The file will be read with the system's
+   * default character set. The user can check the monitoring interval in 
milliseconds,
+   * and the way file modifications are handled. By default it checks for only 
new files
+   * every 100 milliseconds.
+   *
+   */
+  def readFileStream(StreamPath: String, intervalMillis: Long = 100, 
watchType: WatchType = 
+    WatchType.ONLY_NEW_FILES): DataStream[String] =
+    javaEnv.readFileStream(StreamPath, intervalMillis, watchType)
+
+  /**
+   * Creates a new DataStream that contains the strings received infinitely
+   * from socket. Received strings are decoded by the system's default
+   * character set.
+   *
+   */
+  def socketTextStream(hostname: String, port: Int, delimiter: Char): 
DataStream[String] =
+    javaEnv.socketTextStream(hostname, port, delimiter)
+
+  /**
+   * Creates a new DataStream that contains the strings received infinitely
+   * from socket. Received strings are decoded by the system's default
+   * character set, uses '\n' as delimiter.
+   *
+   */
+  def socketTextStream(hostname: String, port: Int): DataStream[String] =
+    javaEnv.socketTextStream(hostname, port)
+
+  /**
+   * Creates a new DataStream that contains a sequence of numbers.
+   *
+   */
+  def generateSequence(from: Long, to: Long): DataStream[Long] = {
+    new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
+      asInstanceOf[DataStream[Long]]
+  }
+
+  /**
+   * Creates a DataStream that contains the given elements. The elements must 
all be of the
+   * same type and must be serializable.
+   *
+   * * Note that this operation will result in a non-parallel data source, 
i.e. a data source with
+   * a degree of parallelism of one.
+   */
+  def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
+    val typeInfo = implicitly[TypeInformation[T]]
+    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
+  }
+
+  /**
+   * Creates a DataStream from the given non-empty [[Seq]]. The elements need 
to be serializable
+   * because the framework may move the elements into the cluster if needed.
+   *
+   * Note that this operation will result in a non-parallel data source, i.e. 
a data source with
+   * a degree of parallelism of one.
+   */
+  def fromCollection[T: ClassTag: TypeInformation](
+    data: Seq[T]): DataStream[T] = {
+    Validate.notNull(data, "Data must not be null.")
+    val typeInfo = implicitly[TypeInformation[T]]
+
+    val sourceFunction = new 
FromElementsFunction[T](scala.collection.JavaConversions
+        .asJavaCollection(data))
+        
+    javaEnv.addSource(sourceFunction, typeInfo)
+  }
+
+  /**
+   * Create a DataStream using a user defined source function for arbitrary
+   * source functionality. By default sources have a parallelism of 1. 
+   * To enable parallel execution, the user defined source should implement 
+   * ParallelSourceFunction or extend RichParallelSourceFunction. 
+   * In these cases the resulting source will have the parallelism of the 
environment. 
+   * To change this afterwards call DataStreamSource.setParallelism(int)
+   *
+   */
+  def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): 
DataStream[T] = {
+    Validate.notNull(function, "Function must not be null.")
+    val cleanFun = StreamExecutionEnvironment.clean(function)
+    val typeInfo = implicitly[TypeInformation[T]]
+    javaEnv.addSource(cleanFun, typeInfo)
+  }
+  
+   /**
+   * Create a DataStream using a user defined source function for arbitrary
+   * source functionality.
+   *
+   */
+  def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): 
DataStream[T] = {
+    Validate.notNull(function, "Function must not be null.")
+    val sourceFunction = new SourceFunction[T] {
+      val cleanFun = StreamExecutionEnvironment.clean(function)
+      override def invoke(out: Collector[T]) {
+        cleanFun(out)
+      }
+    }
+    addSource(sourceFunction)
+  }
+
+  /**
+   * Triggers the program execution. The environment will execute all parts of
+   * the program that have resulted in a "sink" operation. Sink operations are
+   * for example printing results or forwarding them to a message queue.
+   * <p>
+   * The program execution will be logged and displayed with a generated
+   * default name.
+   *
+   */
+  def execute() = javaEnv.execute()
+
+  /**
+   * Triggers the program execution. The environment will execute all parts of
+   * the program that have resulted in a "sink" operation. Sink operations are
+   * for example printing results or forwarding them to a message queue.
+   * <p>
+   * The program execution will be logged and displayed with the provided name
+   *
+   */
+  def execute(jobName: String) = javaEnv.execute(jobName)
+
+  /**
+   * Creates the plan with which the system will execute the program, and
+   * returns it as a String using a JSON representation of the execution data
+   * flow graph. Note that this needs to be called, before the plan is
+   * executed.
+   *
+   */
+  def getExecutionPlan() = javaEnv.getStreamGraph.getStreamingPlanAsJSON
+
+}
+
+object StreamExecutionEnvironment {
+  
+  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = 
true): F = {
+    ClosureCleaner.clean(f, checkSerializable)
+    f
+  }
+
+  /**
+   * Creates an execution environment that represents the context in which the 
program is
+   * currently executed. If the program is invoked standalone, this method 
returns a local
+   * execution environment. If the program is invoked from within the command 
line client
+   * to be submitted to a cluster, this method returns the execution 
environment of this cluster.
+   */
+  def getExecutionEnvironment: StreamExecutionEnvironment = {
+    new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
+  }
+
+  /**
+   * Creates a local execution environment. The local execution environment 
will run the program in
+   * a multi-threaded fashion in the same JVM as the environment was created 
in. The default degree
+   * of parallelism of the local environment is the number of hardware 
contexts (CPU cores/threads).
+   */
+  def createLocalEnvironment(
+    degreeOfParallelism: Int =  Runtime.getRuntime.availableProcessors()):
+  StreamExecutionEnvironment = {
+    new 
StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism))
+  }
+
+  /**
+   * Creates a remote execution environment. The remote environment sends 
(parts of) the program to
+   * a cluster for execution. Note that all file paths used in the program 
must be accessible from
+   * the cluster. The execution will use the cluster's default degree of 
parallelism, unless the
+   * parallelism is set explicitly via 
[[StreamExecutionEnvironment.setDegreeOfParallelism()]].
+   *
+   * @param host The host name or address of the master (JobManager),
+   *             where the program should be executed.
+   * @param port The port of the master (JobManager), where the program should 
be executed.
+   * @param jarFiles The JAR files with code that needs to be shipped to the 
cluster. If the
+   *                 program uses
+   *                 user-defined functions, user-defined input formats, or 
any libraries,
+   *                 those must be
+   *                 provided in the JAR files.
+   */
+  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*):
+  StreamExecutionEnvironment = {
+    new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, 
jarFiles: _*))
+  }
+
+  /**
+   * Creates a remote execution environment. The remote environment sends 
(parts of) the program
+   * to a cluster for execution. Note that all file paths used in the program 
must be accessible
+   * from the cluster. The execution will use the specified degree of 
parallelism.
+   *
+   * @param host The host name or address of the master (JobManager),
+   *             where the program should be executed.
+   * @param port The port of the master (JobManager), where the program should 
be executed.
+   * @param degreeOfParallelism The degree of parallelism to use during the 
execution.
+   * @param jarFiles The JAR files with code that needs to be shipped to the 
cluster. If the
+   *                 program uses
+   *                 user-defined functions, user-defined input formats, or 
any libraries,
+   *                 those must be
+   *                 provided in the JAR files.
+   */
+  def createRemoteEnvironment(
+    host: String,
+    port: Int,
+    degreeOfParallelism: Int,
+    jarFiles: String*): StreamExecutionEnvironment = {
+    val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
+    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+    new StreamExecutionEnvironment(javaEnv)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
new file mode 100644
index 0000000..1bd1bfb
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import scala.Array.canBuildFrom
+import scala.reflect.ClassTag
+import org.apache.commons.lang.Validate
+import org.apache.flink.api.common.functions.JoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.operators.Keys
+import org.apache.flink.api.scala.typeutils.CaseClassSerializer
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.streaming.api.function.co.JoinWindowFunction
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+import org.apache.flink.streaming.util.keys.KeySelectorUtil
+import 
org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
+import java.util.concurrent.TimeUnit
+
+class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) 
extends 
+TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
+
+  override def createNextWindowOperator() = {
+    new StreamJoinOperator.JoinWindow[I1, I2](this)
+  }
+}
+
+object StreamJoinOperator {
+
+  class JoinWindow[I1, I2](private[flink]op: StreamJoinOperator[I1, I2]) 
extends 
+  TemporalWindow[JoinWindow[I1, I2]] {
+
+    private[flink] val type1 = op.input1.getType()
+
+    /**
+     * Continues a temporal Join transformation by defining
+     * the fields in the first stream to be used as keys for the join.
+     * The resulting incomplete join can be completed by 
JoinPredicate.equalTo()
+     * to define the second key.
+     */
+    def where(fields: Int*) = {
+      new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
+        new Keys.ExpressionKeys(fields.toArray, type1), type1))
+    }
+
+    /**
+     * Continues a temporal Join transformation by defining
+     * the fields in the first stream to be used as keys for the join.
+     * The resulting incomplete join can be completed by 
JoinPredicate.equalTo()
+     * to define the second key.
+     */
+    def where(firstField: String, otherFields: String*) =
+      new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
+        new Keys.ExpressionKeys(firstField +: otherFields.toArray, type1), 
type1))
+
+    /**
+     * Continues a temporal Join transformation by defining
+     * the keyselector function that will be used to extract keys from the 
first stream
+     * for the join.
+     * The resulting incomplete join can be completed by 
JoinPredicate.equalTo()
+     * to define the second key.
+     */
+    def where[K: TypeInformation](fun: (I1) => K) = {
+      val keyType = implicitly[TypeInformation[K]]
+      val keyExtractor = new KeySelector[I1, K] {
+        val cleanFun = op.input1.clean(fun)
+        def getKey(in: I1) = cleanFun(in)
+      }
+      new JoinPredicate[I1, I2](op, keyExtractor)
+    }
+
+    override def every(length: Long, timeUnit: TimeUnit): JoinWindow[I1, I2] = 
{
+      every(timeUnit.toMillis(length))
+    }
+
+    override def every(length: Long): JoinWindow[I1, I2] = {
+      op.slideInterval = length
+      this
+    }
+
+  }
+
+  class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, 
I2],
+    private[flink] val keys1: KeySelector[I1, _]) {
+    private[flink] var keys2: KeySelector[I2, _] = null
+    private[flink] val type2 = op.input2.getType()
+
+    /**
+     * Creates a temporal join transformation by defining the second join key.
+     * The returned transformation wrapes each joined element pair in a tuple2:
+     * (first, second)
+     * To define a custom wrapping, use JoinedStream.apply(...)
+     */
+    def equalTo(fields: Int*): JoinedStream[I1, I2] = {
+      finish(KeySelectorUtil.getSelectorForKeys(
+        new Keys.ExpressionKeys(fields.toArray, type2), type2))
+    }
+
+    /**
+     * Creates a temporal join transformation by defining the second join key.
+     * The returned transformation wrapes each joined element pair in a tuple2:
+     * (first, second)
+     * To define a custom wrapping, use JoinedStream.apply(...)
+     */
+    def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, 
I2] =
+      finish(KeySelectorUtil.getSelectorForKeys(
+        new Keys.ExpressionKeys(firstField +: otherFields.toArray, type2), 
type2))
+
+    /**
+     * Creates a temporal join transformation by defining the second join key.
+     * The returned transformation wrapes each joined element pair in a tuple2:
+     * (first, second)
+     * To define a custom wrapping, use JoinedStream.apply(...)
+     */
+    def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = {
+      val keyType = implicitly[TypeInformation[K]]
+      val keyExtractor = new KeySelector[I2, K] {
+        val cleanFun = op.input1.clean(fun)
+        def getKey(in: I2) = cleanFun(in)
+      }
+      finish(keyExtractor)
+    }
+
+    private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = {
+      this.keys2 = keys2
+      new JoinedStream[I1, I2](this, createJoinOperator())
+    }
+
+    private def createJoinOperator(): JavaStream[(I1, I2)] = {
+
+      val returnType = new CaseClassTypeInfo[(I1, I2)](
+
+        classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), 
Array("_1", "_2")) {
+
+        override def createSerializer: TypeSerializer[(I1, I2)] = {
+          val fieldSerializers: Array[TypeSerializer[_]] = new 
Array[TypeSerializer[_]](getArity)
+          for (i <- 0 until getArity) {
+            fieldSerializers(i) = types(i).createSerializer
+          }
+
+          new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], 
fieldSerializers) {
+            override def createInstance(fields: Array[AnyRef]) = {
+              (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
+            }
+          }
+        }
+      }
+
+      return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
+        .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
+          returnType, op.windowSize, op.slideInterval, op.timeStamp1, 
op.timeStamp2)
+    }
+  }
+
+  class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: 
JavaStream[(I1, I2)]) extends 
+  DataStream[(I1, I2)](javaStream) {
+
+    private val op = jp.op
+
+    /**
+     * Sets a wrapper for the joined elements. For each joined pair, the 
result of the
+     * udf call will be emitted.
+     */
+    def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] 
= {
+
+      val invokable = new CoWindowInvokable[I1, I2, R](
+        clean(getJoinWindowFunction(jp, fun)), op.windowSize, 
op.slideInterval, op.timeStamp1,
+        op.timeStamp2)
+
+      
javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(),
+        invokable)
+
+      javaStream.setType(implicitly[TypeInformation[R]])
+    }
+  }
+
+  private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, 
I2],
+    joinFunction: (I1, I2) => R) = {
+    Validate.notNull(joinFunction, "Join function must not be null.")
+
+    val joinFun = new JoinFunction[I1, I2, R] {
+
+      val cleanFun = jp.op.input1.clean(joinFunction)
+
+      override def join(first: I1, second: I2): R = {
+        cleanFun(first, second)
+      }
+    }
+
+    new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
new file mode 100644
index 0000000..fd3a4a9
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.datastream.temporaloperator.{ 
TemporalOperator => JTempOp }
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import 
org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow
+import org.apache.flink.streaming.api.windowing.helper.Timestamp
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment._
+
+abstract class TemporalOperator[I1, I2, OP <: TemporalWindow[OP]](
+  i1: JavaStream[I1], i2: JavaStream[I2]) extends JTempOp[I1, I2, OP](i1, i2) {
+
+  def onWindow(length: Long, ts1: I1 => Long, ts2: I2 => Long, startTime: Long 
= 0): OP = {
+    val timeStamp1 = getTS(ts1)
+    val timeStamp2 = getTS(ts2)
+    onWindow(length, timeStamp1, timeStamp2, startTime)
+  }
+
+  def getTS[R](ts: R => Long): Timestamp[R] = {
+    new Timestamp[R] {
+      val cleanFun = clean(ts, true)
+      def getTimestamp(in: R) = cleanFun(in)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
new file mode 100644
index 0000000..5c734bf
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import scala.Array.canBuildFrom
+import scala.collection.JavaConversions.iterableAsScalaIterable
+import scala.reflect.ClassTag
+
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
+import org.apache.flink.streaming.api.datastream.{WindowedDataStream => 
JavaWStream}
+import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
+import org.apache.flink.streaming.api.function.aggregation.SumFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
+import org.apache.flink.streaming.api.windowing.helper._
+import org.apache.flink.util.Collector
+
+class WindowedDataStream[T](javaStream: JavaWStream[T]) {
+
+  /**
+   * Defines the slide size (trigger frequency) for the windowed data stream.
+   * This controls how often the user defined function will be triggered on
+   * the window.
+   */
+  def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
+    javaStream.every(windowingHelper: _*)
+
+  /**
+   * Groups the elements of the WindowedDataStream using the given
+   * field positions. The window sizes (evictions) and slide sizes
+   * (triggers) will be calculated on the whole stream (in a central fashion),
+   * but the user defined functions will be applied on a per group basis.
+   * </br></br> To get windows and triggers on a per group basis apply the
+   * DataStream.window(...) operator on an already grouped data stream.
+   *
+   */
+  def groupBy(fields: Int*): WindowedDataStream[T] = 
javaStream.groupBy(fields: _*)
+
+  /**
+   * Groups the elements of the WindowedDataStream using the given
+   * field expressions. The window sizes (evictions) and slide sizes
+   * (triggers) will be calculated on the whole stream (in a central fashion),
+   * but the user defined functions will be applied on a per group basis.
+   * </br></br> To get windows and triggers on a per group basis apply the
+   * DataStream.window(...) operator on an already grouped data stream.
+   *
+   */
+  def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] 
=
+   javaStream.groupBy(firstField +: otherFields.toArray: _*)   
+    
+  /**
+   * Groups the elements of the WindowedDataStream using the given
+   * KeySelector function. The window sizes (evictions) and slide sizes
+   * (triggers) will be calculated on the whole stream (in a central fashion),
+   * but the user defined functions will be applied on a per group basis.
+   * </br></br> To get windows and triggers on a per group basis apply the
+   * DataStream.window(...) operator on an already grouped data stream.
+   *
+   */
+  def groupBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = {
+
+    val keyExtractor = new KeySelector[T, K] {
+      val cleanFun = clean(fun)
+      def getKey(in: T) = cleanFun(in)
+    }
+    javaStream.groupBy(keyExtractor)
+  }
+
+  /**
+   * Applies a reduce transformation on the windowed data stream by reducing
+   * the current window at every trigger.
+   *
+   */
+  def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
+    if (reducer == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    javaStream.reduce(reducer)
+  }
+
+  /**
+   * Applies a reduce transformation on the windowed data stream by reducing
+   * the current window at every trigger.
+   *
+   */
+  def reduce(fun: (T, T) => T): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val reducer = new ReduceFunction[T] {
+      val cleanFun = clean(fun)
+      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+    }
+    reduce(reducer)
+  }
+
+  /**
+   * Applies a reduceGroup transformation on the windowed data stream by 
reducing
+   * the current window at every trigger. In contrast with the simple binary 
reduce operator,
+   * groupReduce exposes the whole window through the Iterable interface.
+   * </br>
+   * </br>
+   * Whenever possible try to use reduce instead of groupReduce for increased 
efficiency
+   */
+  def reduceGroup[R: ClassTag: TypeInformation](reducer: 
GroupReduceFunction[T, R]):
+  DataStream[R] = {
+    if (reducer == null) {
+      throw new NullPointerException("GroupReduce function must not be null.")
+    }
+    javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]])
+  }
+
+  /**
+   * Applies a reduceGroup transformation on the windowed data stream by 
reducing
+   * the current window at every trigger. In contrast with the simple binary 
reduce operator,
+   * groupReduce exposes the whole window through the Iterable interface.
+   * </br>
+   * </br>
+   * Whenever possible try to use reduce instead of groupReduce for increased 
efficiency
+   */
+  def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], 
Collector[R]) => Unit):
+  DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("GroupReduce function must not be null.")
+    }
+    val reducer = new GroupReduceFunction[T, R] {
+      val cleanFun = clean(fun)
+      def reduce(in: java.lang.Iterable[T], out: Collector[R]) = { 
cleanFun(in, out) }
+    }
+    reduceGroup(reducer)
+  }
+
+  /**
+   * Applies an aggregation that that gives the maximum of the elements in the 
window at
+   * the given position.
+   *
+   */
+  def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, 
position)
+  
+  /**
+   * Applies an aggregation that that gives the maximum of the elements in the 
window at
+   * the given field.
+   *
+   */
+  def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
+
+  /**
+   * Applies an aggregation that that gives the minimum of the elements in the 
window at
+   * the given position.
+   *
+   */
+  def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, 
position)
+  
+  /**
+   * Applies an aggregation that that gives the minimum of the elements in the 
window at
+   * the given field.
+   *
+   */
+  def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
+
+  /**
+   * Applies an aggregation that sums the elements in the window at the given 
position.
+   *
+   */
+  def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, 
position)
+  
+  /**
+   * Applies an aggregation that sums the elements in the window at the given 
field.
+   *
+   */
+  def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
+
+  /**
+   * Applies an aggregation that that gives the maximum element of the window 
by
+   * the given position. When equality, returns the first.
+   *
+   */
+  def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY,
+    position)
+    
+  /**
+   * Applies an aggregation that that gives the maximum element of the window 
by
+   * the given field. When equality, returns the first.
+   *
+   */
+  def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY,
+    field)
+
+  /**
+   * Applies an aggregation that that gives the minimum element of the window 
by
+   * the given position. When equality, returns the first.
+   *
+   */
+  def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY,
+    position)
+    
+   /**
+   * Applies an aggregation that that gives the minimum element of the window 
by
+   * the given field. When equality, returns the first.
+   *
+   */
+  def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY,
+    field)
+    
+  private def aggregate(aggregationType: AggregationType, field: String): 
DataStream[T] = {
+    val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
+    aggregate(aggregationType, position)
+  }  
+
+  def aggregate(aggregationType: AggregationType, position: Int):
+  DataStream[T] = {
+
+    val jStream = javaStream.asInstanceOf[JavaWStream[Product]]
+    val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
+
+    val agg = new 
ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), 
position)
+
+    val reducer = aggregationType match {
+      case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(
+        outType.getTypeAt(position).getTypeClass()))
+      case _ => new agg.ProductComparableAggregator(aggregationType, true)
+    }
+
+    new 
DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
new file mode 100644
index 0000000..222eb6d
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api
+
+import _root_.scala.reflect.ClassTag
+import language.experimental.macros
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => 
JavaWStream }
+import org.apache.flink.streaming.api.datastream.{ SplitDataStream => 
SplitJavaStream }
+import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => 
JavaConStream }
+
+package object scala {
+  // We have this here so that we always have generated TypeInformationS when
+  // using the Scala API
+  implicit def createTypeInformation[T]: TypeInformation[T] = macro 
TypeUtils.createTypeInfo[T]
+
+  implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
+    new DataStream[R](javaStream)
+
+  implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): 
WindowedDataStream[R] =
+    new WindowedDataStream[R](javaWStream)
+
+  implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): 
SplitDataStream[R] =
+    new SplitDataStream[R](javaStream)
+
+  implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: 
JavaConStream[IN1, IN2]):
+  ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream)
+
+  implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: 
Seq[T]) : DataStream[T] =
+    StreamExecutionEnvironment.getExecutionEnvironment.fromCollection(scalaSeq)
+
+
+  private[flink] def fieldNames2Indices(
+      typeInfo: TypeInformation[_],
+      fields: Array[String]): Array[Int] = {
+    typeInfo match {
+      case ti: CaseClassTypeInfo[_] =>
+        val result = ti.getFieldIndices(fields)
+
+        if (result.contains(-1)) {
+          throw new IllegalArgumentException("Fields '" + fields.mkString(", 
") +
+            "' are not valid for '" + ti.toString + "'.")
+        }
+
+        result
+
+      case _ =>
+        throw new UnsupportedOperationException("Specifying fields by name is 
only" +
+          "supported on Case Classes (for now).")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
new file mode 100644
index 0000000..eedee0e
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.windowing
+
+import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta }
+import org.apache.commons.lang.Validate
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction
+
+object Delta {
+
+  /**
+   * Creates a delta helper representing a delta trigger or eviction policy.
+   * </br></br> This policy calculates a delta between the data point which
+   * triggered last and the currently arrived data point. It triggers if the
+   * delta is higher than a specified threshold. </br></br> In case it gets
+   * used for eviction, this policy starts from the first element of the
+   * buffer and removes all elements from the buffer which have a higher delta
+   * then the threshold. As soon as there is an element with a lower delta,
+   * the eviction stops.
+   */
+  def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): 
JavaDelta[T] = {
+    Validate.notNull(deltaFunction, "Delta function must not be null")
+    val df = new DeltaFunction[T] {
+      val cleanFun = clean(deltaFunction)
+      override def getDelta(first: T, second: T) = cleanFun(first, second)
+    }
+    JavaDelta.of(threshold, df, initVal)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
new file mode 100644
index 0000000..9a69369
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.windowing
+
+import java.util.concurrent.TimeUnit
+import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime }
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
+import org.apache.flink.streaming.api.windowing.helper.Timestamp
+import org.apache.commons.lang.Validate
+
+object Time {
+
+  /**
+   * Creates a helper representing a time trigger which triggers every given
+   * length (slide size) or a time eviction which evicts all elements older
+   * than length (window size) using System time.
+   *
+   */
+  def of(windowSize: Long, timeUnit: TimeUnit): JavaTime[_] =
+    JavaTime.of(windowSize, timeUnit)
+
+  /**
+   * Creates a helper representing a time trigger which triggers every given
+   * length (slide size) or a time eviction which evicts all elements older
+   * than length (window size) using a user defined timestamp extractor.
+   *
+   */
+  def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): 
JavaTime[R] = {
+    Validate.notNull(timestamp, "Timestamp must not be null.")
+    val ts = new Timestamp[R] {
+      val fun = clean(timestamp, true)
+      override def getTimestamp(in: R) = fun(in)
+    }
+    JavaTime.of(windowSize, ts, startTime)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/pom.xml 
b/flink-staging/flink-streaming/pom.xml
new file mode 100644
index 0000000..43b8181
--- /dev/null
+++ b/flink-staging/flink-streaming/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-staging</artifactId>
+               <version>0.9-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-streaming-parent</artifactId>
+       <name>flink-streaming</name>
+       <packaging>pom</packaging>
+
+       <modules>
+               <module>flink-streaming-core</module>
+               <module>flink-streaming-scala</module>
+               <module>flink-streaming-examples</module>
+               <module>flink-streaming-connectors</module>
+       </modules>
+       
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-compiler</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-tachyon/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/pom.xml 
b/flink-staging/flink-tachyon/pom.xml
new file mode 100644
index 0000000..65aa295
--- /dev/null
+++ b/flink-staging/flink-tachyon/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-staging</artifactId>
+               <version>0.9-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-tachyon</artifactId>
+       <name>flink-tachyon</name>
+
+       <packaging>jar</packaging>
+
+       <!--
+               This is a Hadoop2 only flink module.
+       -->
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java-examples</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.tachyonproject</groupId>
+                       <artifactId>tachyon</artifactId>
+                       <version>0.5.0</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.tachyonproject</groupId>
+                       <artifactId>tachyon</artifactId>
+                       <version>0.5.0</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.eclipse.jetty</groupId>
+                       <artifactId>jetty-util</artifactId>
+                       
<version>7.6.8.v20121106</version><!--$NO-MVN-MAN-VER$-->
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-common</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-hdfs</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-hdfs</artifactId>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+                       
<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-common</artifactId>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+                       
<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+               </dependency>
+       </dependencies>
+       <dependencyManagement>
+               <dependencies>
+                       <dependency>
+                               <groupId>org.eclipse.jetty</groupId>
+                               <artifactId>jetty-server</artifactId>
+                               <version>7.6.8.v20121106</version>
+                               <scope>test</scope>
+                       </dependency>
+                       <dependency>
+                               <groupId>org.eclipse.jetty</groupId>
+                               <artifactId>jetty-servlet</artifactId>
+                               <version>7.6.8.v20121106</version>
+                               <scope>test</scope>
+                       </dependency>
+               </dependencies>
+       </dependencyManagement>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
 
b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
new file mode 100644
index 0000000..7318894
--- /dev/null
+++ 
b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tachyon;
+
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.examples.java.wordcount.WordCount;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * This test should logically be located in the 'flink-runtime' tests. 
However, this project
+ * has already all dependencies required (flink-java-examples). Also, the 
DOPOneExecEnv is here.
+ */
+public class HDFSTest {
+
+       private String hdfsURI;
+       private MiniDFSCluster hdfsCluster;
+       private org.apache.hadoop.fs.Path hdPath;
+       private org.apache.hadoop.fs.FileSystem hdfs;
+
+       @Before
+       public void createHDFS() {
+               try {
+                       Configuration hdConf = new Configuration();
+
+                       File baseDir = new 
File("./target/hdfs/hdfsTest").getAbsoluteFile();
+                       FileUtil.fullyDelete(baseDir);
+                       hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+                       MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+                       hdfsCluster = builder.build();
+
+                       hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+
+                       hdPath = new org.apache.hadoop.fs.Path("/test");
+                       hdfs = hdPath.getFileSystem(hdConf);
+                       FSDataOutputStream stream = hdfs.create(hdPath);
+                       for(int i = 0; i < 10; i++) {
+                               stream.write("Hello HDFS\n".getBytes());
+                       }
+                       stream.close();
+
+               } catch(Throwable e) {
+                       e.printStackTrace();
+                       Assert.fail("Test failed " + e.getMessage());
+               }
+       }
+
+       @After
+       public void destroyHDFS() {
+               try {
+                       hdfs.delete(hdPath, false);
+                       hdfsCluster.shutdown();
+               } catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+
+       }
+
+       @Test
+       public void testHDFS() {
+
+               Path file = new Path(hdfsURI + hdPath);
+               org.apache.hadoop.fs.Path result = new 
org.apache.hadoop.fs.Path(hdfsURI + "/result");
+               try {
+                       FileSystem fs = file.getFileSystem();
+                       Assert.assertTrue("Must be HadoopFileSystem", fs 
instanceof HadoopFileSystem);
+                       new 
TachyonFileSystemWrapperTest.DopOneTestEnvironment();
+                       try {
+                               WordCount.main(new String[]{file.toString(), 
result.toString()});
+                       } catch(Throwable t) {
+                               t.printStackTrace();
+                               Assert.fail("Test failed with " + 
t.getMessage());
+                       }
+                       Assert.assertTrue("No result file present", 
hdfs.exists(result));
+                       // validate output:
+                       org.apache.hadoop.fs.FSDataInputStream inStream = 
hdfs.open(result);
+                       StringWriter writer = new StringWriter();
+                       IOUtils.copy(inStream, writer);
+                       String resultString = writer.toString();
+
+                       Assert.assertEquals("hdfs 10\n" +
+                                       "hello 10\n", resultString);
+                       inStream.close();
+
+               } catch (IOException e) {
+                       e.printStackTrace();
+                       Assert.fail("Error in test: " + e.getMessage() );
+               }
+       }
+
+       @Test
+       public void testAvroOut() {
+               String type = "one";
+               AvroOutputFormat<String> avroOut =
+                               new AvroOutputFormat<String>( String.class );
+
+               org.apache.hadoop.fs.Path result = new 
org.apache.hadoop.fs.Path(hdfsURI + "/avroTest");
+
+               avroOut.setOutputFilePath(new Path(result.toString()));
+               avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
+               
avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
+
+               try {
+                       avroOut.open(0, 2);
+                       avroOut.writeRecord(type);
+                       avroOut.close();
+
+                       avroOut.open(1, 2);
+                       avroOut.writeRecord(type);
+                       avroOut.close();
+
+
+                       Assert.assertTrue("No result file present", 
hdfs.exists(result));
+                       FileStatus[] files = hdfs.listStatus(result);
+                       Assert.assertEquals(2, files.length);
+                       for(FileStatus file : files) {
+                               
Assert.assertTrue("1.avro".equals(file.getPath().getName()) || 
"2.avro".equals(file.getPath().getName()));
+                       }
+
+               } catch (IOException e) {
+                       e.printStackTrace();
+                       Assert.fail(e.getMessage());
+               }
+       }
+}

Reply via email to