http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala new file mode 100644 index 0000000..177a9ee --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, + SingleOutputStreamOperator, GroupedDataStream} +import scala.reflect.ClassTag +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.streaming.api.invokable.operator.MapInvokable +import org.apache.flink.util.Collector +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.streaming.api.invokable.StreamInvokable +import org.apache.flink.streaming.api.invokable.operator.{ GroupedReduceInvokable, StreamReduceInvokable } +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.streaming.api.function.sink.SinkFunction +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean +import org.apache.flink.streaming.api.windowing.helper.WindowingHelper +import org.apache.flink.streaming.api.windowing.policy.{ EvictionPolicy, TriggerPolicy } +import org.apache.flink.streaming.api.collector.OutputSelector +import scala.collection.JavaConversions._ +import java.util.HashMap +import org.apache.flink.streaming.api.function.aggregation.SumFunction +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase +import org.apache.flink.streaming.api.function.aggregation.AggregationFunction +import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator +import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy + +class DataStream[T](javaStream: JavaStream[T]) { + + /** + * Gets the underlying java DataStream object. + */ + def getJavaStream: JavaStream[T] = javaStream + + /** + * Sets the degree of parallelism of this operation. This must be greater than 1. + */ + def setParallelism(dop: Int): DataStream[T] = { + javaStream match { + case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop) + case _ => + throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot " + + "have " + + "parallelism.") + } + this + } + + /** + * Returns the degree of parallelism of this operation. + */ + def getParallelism: Int = javaStream match { + case op: SingleOutputStreamOperator[_, _] => op.getParallelism + case _ => + throw new UnsupportedOperationException("Operator " + javaStream.toString + " does not have" + + " " + + "parallelism.") + } + + def setChainingStrategy(strategy: ChainingStrategy): DataStream[T] = { + javaStream match { + case ds: SingleOutputStreamOperator[_, _] => ds.setChainingStrategy(strategy) + case _ => + throw new UnsupportedOperationException("Only supported for operators.") + } + this + } + + /** + * Creates a new DataStream by merging DataStream outputs of + * the same type with each other. The DataStreams merged using this operator + * will be transformed simultaneously. + * + */ + def merge(dataStreams: DataStream[T]*): DataStream[T] = + javaStream.merge(dataStreams.map(_.getJavaStream): _*) + + /** + * Creates a new ConnectedDataStream by connecting + * DataStream outputs of different type with each other. The + * DataStreams connected using this operators can be used with CoFunctions. + * + */ + def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] = + javaStream.connect(dataStream.getJavaStream) + + /** + * Groups the elements of a DataStream by the given key positions (for tuple/array types) to + * be used with grouped operators like grouped reduce or grouped aggregations + * + */ + def groupBy(fields: Int*): DataStream[T] = javaStream.groupBy(fields: _*) + + /** + * Groups the elements of a DataStream by the given field expressions to + * be used with grouped operators like grouped reduce or grouped aggregations + * + */ + def groupBy(firstField: String, otherFields: String*): DataStream[T] = + javaStream.groupBy(firstField +: otherFields.toArray: _*) + + /** + * Groups the elements of a DataStream by the given K key to + * be used with grouped operators like grouped reduce or grouped aggregations + * + */ + def groupBy[K: TypeInformation](fun: T => K): DataStream[T] = { + + val keyExtractor = new KeySelector[T, K] { + val cleanFun = clean(fun) + def getKey(in: T) = cleanFun(in) + } + javaStream.groupBy(keyExtractor) + } + + /** + * Sets the partitioning of the DataStream so that the output tuples + * are broadcasted to every parallel instance of the next component. This + * setting only effects the how the outputs will be distributed between the + * parallel instances of the next processing operator. + * + */ + def broadcast: DataStream[T] = javaStream.broadcast() + + /** + * Sets the partitioning of the DataStream so that the output values all go to + * the first instance of the next processing operator. Use this setting with care + * since it might cause a serious performance bottleneck in the application. + */ + def global: DataStream[T] = javaStream.global() + + /** + * Sets the partitioning of the DataStream so that the output tuples + * are shuffled to the next component. This setting only effects the how the + * outputs will be distributed between the parallel instances of the next + * processing operator. + * + */ + def shuffle: DataStream[T] = javaStream.shuffle() + + /** + * Sets the partitioning of the DataStream so that the output tuples + * are forwarded to the local subtask of the next component (whenever + * possible). This is the default partitioner setting. This setting only + * effects the how the outputs will be distributed between the parallel + * instances of the next processing operator. + * + */ + def forward: DataStream[T] = javaStream.forward() + + /** + * Sets the partitioning of the DataStream so that the output tuples + * are distributed evenly to the next component.This setting only effects + * the how the outputs will be distributed between the parallel instances of + * the next processing operator. + * + */ + def distribute: DataStream[T] = javaStream.distribute() + + /** + * Initiates an iterative part of the program that creates a loop by feeding + * back data streams. To create a streaming iteration the user needs to define + * a transformation that creates two DataStreams.The first one one is the output + * that will be fed back to the start of the iteration and the second is the output + * stream of the iterative part. + * <p> + * stepfunction: initialStream => (feedback, output) + * <p> + * A common pattern is to use output splitting to create feedback and output DataStream. + * Please refer to the .split(...) method of the DataStream + * <p> + * By default a DataStream with iteration will never terminate, but the user + * can use the maxWaitTime parameter to set a max waiting time for the iteration head. + * If no data received in the set time the stream terminates. + * + * + */ + def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]), + maxWaitTimeMillis:Long = 0): DataStream[R] = { + val iterativeStream = javaStream.iterate(maxWaitTimeMillis) + + val (feedback, output) = stepFunction(new DataStream[T](iterativeStream)) + iterativeStream.closeWith(feedback.getJavaStream) + output + } + + /** + * Applies an aggregation that that gives the current maximum of the data stream at + * the given position. + * + */ + def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position) + + /** + * Applies an aggregation that that gives the current maximum of the data stream at + * the given field. + * + */ + def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field) + + /** + * Applies an aggregation that that gives the current minimum of the data stream at + * the given position. + * + */ + def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position) + + /** + * Applies an aggregation that that gives the current minimum of the data stream at + * the given field. + * + */ + def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field) + + /** + * Applies an aggregation that sums the data stream at the given position. + * + */ + def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position) + + /** + * Applies an aggregation that sums the data stream at the given field. + * + */ + def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field) + + /** + * Applies an aggregation that that gives the current minimum element of the data stream by + * the given position. When equality, the first element is returned with the minimal value. + * + */ + def minBy(position: Int): DataStream[T] = aggregate(AggregationType + .MINBY, position) + + /** + * Applies an aggregation that that gives the current minimum element of the data stream by + * the given field. When equality, the first element is returned with the minimal value. + * + */ + def minBy(field: String): DataStream[T] = aggregate(AggregationType + .MINBY, field ) + + /** + * Applies an aggregation that that gives the current maximum element of the data stream by + * the given position. When equality, the first element is returned with the maximal value. + * + */ + def maxBy(position: Int): DataStream[T] = + aggregate(AggregationType.MAXBY, position) + + /** + * Applies an aggregation that that gives the current maximum element of the data stream by + * the given field. When equality, the first element is returned with the maximal value. + * + */ + def maxBy(field: String): DataStream[T] = + aggregate(AggregationType.MAXBY, field) + + private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = { + val position = fieldNames2Indices(javaStream.getType(), Array(field))(0) + aggregate(aggregationType, position) + } + + private def aggregate(aggregationType: AggregationType, position: Int): + DataStream[T] = { + + val jStream = javaStream.asInstanceOf[JavaStream[Product]] + val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]] + + val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position) + + val reducer = aggregationType match { + case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position). + getTypeClass())) + case _ => new agg.ProductComparableAggregator(aggregationType, true) + } + + val invokable = jStream match { + case groupedStream: GroupedDataStream[_] => new GroupedReduceInvokable(reducer, + groupedStream.getKeySelector()) + case _ => new StreamReduceInvokable(reducer) + } + new DataStream[Product](jStream.transform("aggregation", jStream.getType(), + invokable)).asInstanceOf[DataStream[T]] + } + + /** + * Creates a new DataStream containing the current number (count) of + * received records. + * + */ + def count: DataStream[Long] = new DataStream[java.lang.Long]( + javaStream.count()).asInstanceOf[DataStream[Long]] + + /** + * Creates a new DataStream by applying the given function to every element of this DataStream. + */ + def map[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] = { + if (fun == null) { + throw new NullPointerException("Map function must not be null.") + } + val mapper = new MapFunction[T, R] { + val cleanFun = clean(fun) + def map(in: T): R = cleanFun(in) + } + + javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper)) + } + + /** + * Creates a new DataStream by applying the given function to every element of this DataStream. + */ + def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataStream[R] = { + if (mapper == null) { + throw new NullPointerException("Map function must not be null.") + } + + javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper)) + } + + /** + * Creates a new DataStream by applying the given function to every element and flattening + * the results. + */ + def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataStream[R] = { + if (flatMapper == null) { + throw new NullPointerException("FlatMap function must not be null.") + } + javaStream.transform("flatMap", implicitly[TypeInformation[R]], + new FlatMapInvokable[T, R](flatMapper)) + } + + /** + * Creates a new DataStream by applying the given function to every element and flattening + * the results. + */ + def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataStream[R] = { + if (fun == null) { + throw new NullPointerException("FlatMap function must not be null.") + } + val flatMapper = new FlatMapFunction[T, R] { + val cleanFun = clean(fun) + def flatMap(in: T, out: Collector[R]) { cleanFun(in, out) } + } + flatMap(flatMapper) + } + + /** + * Creates a new DataStream by applying the given function to every element and flattening + * the results. + */ + def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataStream[R] = { + if (fun == null) { + throw new NullPointerException("FlatMap function must not be null.") + } + val flatMapper = new FlatMapFunction[T, R] { + val cleanFun = clean(fun) + def flatMap(in: T, out: Collector[R]) { cleanFun(in) foreach out.collect } + } + flatMap(flatMapper) + } + + /** + * Creates a new [[DataStream]] by reducing the elements of this DataStream + * using an associative reduce function. + */ + def reduce(reducer: ReduceFunction[T]): DataStream[T] = { + if (reducer == null) { + throw new NullPointerException("Reduce function must not be null.") + } + javaStream match { + case ds: GroupedDataStream[_] => javaStream.transform("reduce", + javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector())) + case _ => javaStream.transform("reduce", javaStream.getType(), + new StreamReduceInvokable[T](reducer)) + } + } + + /** + * Creates a new [[DataStream]] by reducing the elements of this DataStream + * using an associative reduce function. + */ + def reduce(fun: (T, T) => T): DataStream[T] = { + if (fun == null) { + throw new NullPointerException("Reduce function must not be null.") + } + val reducer = new ReduceFunction[T] { + val cleanFun = clean(fun) + def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } + } + reduce(reducer) + } + + /** + * Creates a new DataStream that contains only the elements satisfying the given filter predicate. + */ + def filter(filter: FilterFunction[T]): DataStream[T] = { + if (filter == null) { + throw new NullPointerException("Filter function must not be null.") + } + javaStream.filter(filter) + } + + /** + * Creates a new DataStream that contains only the elements satisfying the given filter predicate. + */ + def filter(fun: T => Boolean): DataStream[T] = { + if (fun == null) { + throw new NullPointerException("Filter function must not be null.") + } + val filter = new FilterFunction[T] { + val cleanFun = clean(fun) + def filter(in: T) = cleanFun(in) + } + this.filter(filter) + } + + /** + * Create a WindowedDataStream that can be used to apply + * transformation like .reduce(...) or aggregations on + * preset chunks(windows) of the data stream. To define the windows one or + * more WindowingHelper-s such as Time, Count and + * Delta can be used.</br></br> When applied to a grouped data + * stream, the windows (evictions) and slide sizes (triggers) will be + * computed on a per group basis. </br></br> For more advanced control over + * the trigger and eviction policies please use to + * window(List(triggers), List(evicters)) + */ + def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = + javaStream.window(windowingHelper: _*) + + /** + * Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s. + * Windowing can be used to apply transformation like .reduce(...) or aggregations on + * preset chunks(windows) of the data stream.</br></br>For most common + * use-cases please refer to window(WindowingHelper[_]*) + * + */ + def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]): + WindowedDataStream[T] = javaStream.window(triggers, evicters) + + /** + * + * Operator used for directing tuples to specific named outputs using an + * OutputSelector. Calling this method on an operator creates a new + * SplitDataStream. + */ + def split(selector: OutputSelector[T]): SplitDataStream[T] = javaStream.split(selector) + + /** + * Creates a new SplitDataStream that contains only the elements satisfying the + * given output selector predicate. + */ + def split(fun: T => String): SplitDataStream[T] = { + if (fun == null) { + throw new NullPointerException("OutputSelector must not be null.") + } + val selector = new OutputSelector[T] { + val cleanFun = clean(fun) + def select(in: T): java.lang.Iterable[String] = { + List(cleanFun(in)) + } + } + split(selector) + } + + /** + * Initiates a temporal Join transformation that joins the elements of two + * data streams on key equality over a specified time window. + * + * This method returns a StreamJoinOperator on which the + * .onWindow(..) should be called to define the + * window, and then the .where(..) and .equalTo(..) methods can be used to defin + * the join keys.</p> The user can also use the apply method of the returned JoinedStream + * to use custom join function. + * + */ + def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = + new StreamJoinOperator[T, R](javaStream, stream.getJavaStream) + + /** + * Initiates a temporal cross transformation that builds all pair + * combinations of elements of both DataStreams, i.e., it builds a Cartesian + * product. + * + * This method returns a StreamJoinOperator on which the + * .onWindow(..) should be called to define the + * window, and then the .where(..) and .equalTo(..) methods can be used to defin + * the join keys.</p> The user can also use the apply method of the returned JoinedStream + * to use custom join function. + * + */ + def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] = + new StreamCrossOperator[T, R](javaStream, stream.getJavaStream) + + /** + * Writes a DataStream to the standard output stream (stdout). For each + * element of the DataStream the result of .toString is + * written. + * + */ + def print(): DataStream[T] = javaStream.print() + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically, in every millis milliseconds. For + * every element of the DataStream the result of .toString + * is written. + * + */ + def writeAsText(path: String, millis: Long = 0): DataStream[T] = + javaStream.writeAsText(path, millis) + + /** + * Writes a DataStream to the file specified by path in text format. The + * writing is performed periodically, in every millis milliseconds. For + * every element of the DataStream the result of .toString + * is written. + * + */ + def writeAsCsv(path: String, millis: Long = 0): DataStream[T] = + javaStream.writeAsCsv(path, millis) + + /** + * Adds the given sink to this DataStream. Only streams with sinks added + * will be executed once the StreamExecutionEnvironment.execute(...) + * method is called. + * + */ + def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = + javaStream.addSink(sinkFuntion) + + /** + * Adds the given sink to this DataStream. Only streams with sinks added + * will be executed once the StreamExecutionEnvironment.execute(...) + * method is called. + * + */ + def addSink(fun: T => Unit): DataStream[T] = { + if (fun == null) { + throw new NullPointerException("Sink function must not be null.") + } + val sinkFunction = new SinkFunction[T] { + val cleanFun = clean(fun) + def invoke(in: T) = cleanFun(in) + } + this.addSink(sinkFunction) + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala new file mode 100644 index 0000000..9e33f80 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitDataStream.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } + +/** + * The SplitDataStream represents an operator that has been split using an + * {@link OutputSelector}. Named outputs can be selected using the + * {@link #select} function. To apply a transformation on the whole output simply call + * the appropriate method on this stream. + * + * @param <OUT> + * The type of the output. + */ +class SplitDataStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){ + + /** + * Sets the output names for which the next operator will receive values. + */ + def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala new file mode 100644 index 0000000..a408ec0 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import scala.reflect.ClassTag +import org.apache.commons.lang.Validate +import org.apache.flink.api.common.functions.CrossFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.scala.typeutils.CaseClassSerializer +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream} +import org.apache.flink.streaming.api.function.co.CrossWindowFunction +import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean +import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow +import java.util.concurrent.TimeUnit + +class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends + TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) { + + override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, I2] = { + + val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this, + (l: I1, r: I2) => (l, r)) + + val returnType = new CaseClassTypeInfo[(I1, I2)]( + + classOf[(I1, I2)], Seq(input1.getType, input2.getType), Array("_1", "_2")) { + + override def createSerializer: TypeSerializer[(I1, I2)] = { + val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) + for (i <- 0 until getArity) { + fieldSerializers(i) = types(i).createSerializer + } + + new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) { + override def createInstance(fields: Array[AnyRef]) = { + (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2]) + } + } + } + } + + val javaStream = input1.connect(input2).addGeneralWindowCombine( + crossWindowFunction, + returnType, windowSize, + slideInterval, timeStamp1, timeStamp2) + + new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream) + } +} +object StreamCrossOperator { + + private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2], + javaStream: JavaStream[(I1, I2)]) extends + DataStream[(I1, I2)](javaStream) with TemporalWindow[CrossWindow[I1, I2]] { + + /** + * Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf + * call will be emitted. + * + */ + def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = { + + val invokable = new CoWindowInvokable[I1, I2, R]( + clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1, + op.timeStamp2) + + javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(), + invokable) + + javaStream.setType(implicitly[TypeInformation[R]]) + } + + override def every(length: Long, timeUnit: TimeUnit): CrossWindow[I1, I2] = { + every(timeUnit.toMillis(length)) + } + + override def every(length: Long): CrossWindow[I1, I2] = { + val builder = javaStream.getExecutionEnvironment().getStreamGraph() + val invokable = builder.getInvokable(javaStream.getId()) + invokable.asInstanceOf[CoWindowInvokable[_,_,_]].setSlideSize(length) + this + } + } + + private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2], + crossFunction: (I1, I2) => R): + CrossWindowFunction[I1, I2, R] = { + Validate.notNull(crossFunction, "Join function must not be null.") + + val crossFun = new CrossFunction[I1, I2, R] { + val cleanFun = op.input1.clean(crossFunction) + + override def cross(first: I1, second: I2): R = { + cleanFun(first, second) + } + } + + new CrossWindowFunction[I1, I2, R](crossFun) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala new file mode 100644 index 0000000..394673c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import scala.reflect.ClassTag +import org.apache.commons.lang.Validate +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} +import org.apache.flink.streaming.api.function.source.{ FromElementsFunction, SourceFunction } +import org.apache.flink.util.Collector +import org.apache.flink.api.scala.ClosureCleaner +import org.apache.flink.streaming.api.function.source.FileMonitoringFunction.WatchType + +class StreamExecutionEnvironment(javaEnv: JavaEnv) { + + /** + * Sets the degree of parallelism (DOP) for operations executed through this environment. + * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with + * x parallel instances. This value can be overridden by specific operations using + * [[DataStream.setParallelism]]. + */ + def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = { + javaEnv.setDegreeOfParallelism(degreeOfParallelism) + } + + /** + * Returns the default degree of parallelism for this execution environment. Note that this + * value can be overridden by individual operations using [[DataStream.setParallelism]] + */ + def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism + + /** + * Sets the maximum time frequency (milliseconds) for the flushing of the + * output buffers. By default the output buffers flush frequently to provide + * low latency and to aid smooth developer experience. Setting the parameter + * can result in three logical modes: + * + * <ul> + * <li> + * A positive integer triggers flushing periodically by that integer</li> + * <li> + * 0 triggers flushing after every record thus minimizing latency</li> + * <li> + * -1 triggers flushing only when the output buffer is full thus maximizing + * throughput</li> + * </ul> + * + */ + def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = { + javaEnv.setBufferTimeout(timeoutMillis) + this + } + + /** + * Gets the default buffer timeout set for this environment + */ + def getBufferTimout: Long = javaEnv.getBufferTimeout() + + /** + * Creates a DataStream that represents the Strings produced by reading the + * given file line wise. The file will be read with the system's default + * character set. + * + */ + def readTextFile(filePath: String): DataStream[String] = + javaEnv.readTextFile(filePath) + + /** + * Creates a DataStream that contains the contents of file created while + * system watches the given path. The file will be read with the system's + * default character set. The user can check the monitoring interval in milliseconds, + * and the way file modifications are handled. By default it checks for only new files + * every 100 milliseconds. + * + */ + def readFileStream(StreamPath: String, intervalMillis: Long = 100, watchType: WatchType = + WatchType.ONLY_NEW_FILES): DataStream[String] = + javaEnv.readFileStream(StreamPath, intervalMillis, watchType) + + /** + * Creates a new DataStream that contains the strings received infinitely + * from socket. Received strings are decoded by the system's default + * character set. + * + */ + def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] = + javaEnv.socketTextStream(hostname, port, delimiter) + + /** + * Creates a new DataStream that contains the strings received infinitely + * from socket. Received strings are decoded by the system's default + * character set, uses '\n' as delimiter. + * + */ + def socketTextStream(hostname: String, port: Int): DataStream[String] = + javaEnv.socketTextStream(hostname, port) + + /** + * Creates a new DataStream that contains a sequence of numbers. + * + */ + def generateSequence(from: Long, to: Long): DataStream[Long] = { + new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)). + asInstanceOf[DataStream[Long]] + } + + /** + * Creates a DataStream that contains the given elements. The elements must all be of the + * same type and must be serializable. + * + * * Note that this operation will result in a non-parallel data source, i.e. a data source with + * a degree of parallelism of one. + */ + def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = { + val typeInfo = implicitly[TypeInformation[T]] + fromCollection(data)(implicitly[ClassTag[T]], typeInfo) + } + + /** + * Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable + * because the framework may move the elements into the cluster if needed. + * + * Note that this operation will result in a non-parallel data source, i.e. a data source with + * a degree of parallelism of one. + */ + def fromCollection[T: ClassTag: TypeInformation]( + data: Seq[T]): DataStream[T] = { + Validate.notNull(data, "Data must not be null.") + val typeInfo = implicitly[TypeInformation[T]] + + val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions + .asJavaCollection(data)) + + javaEnv.addSource(sourceFunction, typeInfo) + } + + /** + * Create a DataStream using a user defined source function for arbitrary + * source functionality. By default sources have a parallelism of 1. + * To enable parallel execution, the user defined source should implement + * ParallelSourceFunction or extend RichParallelSourceFunction. + * In these cases the resulting source will have the parallelism of the environment. + * To change this afterwards call DataStreamSource.setParallelism(int) + * + */ + def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = { + Validate.notNull(function, "Function must not be null.") + val cleanFun = StreamExecutionEnvironment.clean(function) + val typeInfo = implicitly[TypeInformation[T]] + javaEnv.addSource(cleanFun, typeInfo) + } + + /** + * Create a DataStream using a user defined source function for arbitrary + * source functionality. + * + */ + def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = { + Validate.notNull(function, "Function must not be null.") + val sourceFunction = new SourceFunction[T] { + val cleanFun = StreamExecutionEnvironment.clean(function) + override def invoke(out: Collector[T]) { + cleanFun(out) + } + } + addSource(sourceFunction) + } + + /** + * Triggers the program execution. The environment will execute all parts of + * the program that have resulted in a "sink" operation. Sink operations are + * for example printing results or forwarding them to a message queue. + * <p> + * The program execution will be logged and displayed with a generated + * default name. + * + */ + def execute() = javaEnv.execute() + + /** + * Triggers the program execution. The environment will execute all parts of + * the program that have resulted in a "sink" operation. Sink operations are + * for example printing results or forwarding them to a message queue. + * <p> + * The program execution will be logged and displayed with the provided name + * + */ + def execute(jobName: String) = javaEnv.execute(jobName) + + /** + * Creates the plan with which the system will execute the program, and + * returns it as a String using a JSON representation of the execution data + * flow graph. Note that this needs to be called, before the plan is + * executed. + * + */ + def getExecutionPlan() = javaEnv.getStreamGraph.getStreamingPlanAsJSON + +} + +object StreamExecutionEnvironment { + + private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { + ClosureCleaner.clean(f, checkSerializable) + f + } + + /** + * Creates an execution environment that represents the context in which the program is + * currently executed. If the program is invoked standalone, this method returns a local + * execution environment. If the program is invoked from within the command line client + * to be submitted to a cluster, this method returns the execution environment of this cluster. + */ + def getExecutionEnvironment: StreamExecutionEnvironment = { + new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment) + } + + /** + * Creates a local execution environment. The local execution environment will run the program in + * a multi-threaded fashion in the same JVM as the environment was created in. The default degree + * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads). + */ + def createLocalEnvironment( + degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()): + StreamExecutionEnvironment = { + new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism)) + } + + /** + * Creates a remote execution environment. The remote environment sends (parts of) the program to + * a cluster for execution. Note that all file paths used in the program must be accessible from + * the cluster. The execution will use the cluster's default degree of parallelism, unless the + * parallelism is set explicitly via [[StreamExecutionEnvironment.setDegreeOfParallelism()]]. + * + * @param host The host name or address of the master (JobManager), + * where the program should be executed. + * @param port The port of the master (JobManager), where the program should be executed. + * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the + * program uses + * user-defined functions, user-defined input formats, or any libraries, + * those must be + * provided in the JAR files. + */ + def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): + StreamExecutionEnvironment = { + new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)) + } + + /** + * Creates a remote execution environment. The remote environment sends (parts of) the program + * to a cluster for execution. Note that all file paths used in the program must be accessible + * from the cluster. The execution will use the specified degree of parallelism. + * + * @param host The host name or address of the master (JobManager), + * where the program should be executed. + * @param port The port of the master (JobManager), where the program should be executed. + * @param degreeOfParallelism The degree of parallelism to use during the execution. + * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the + * program uses + * user-defined functions, user-defined input formats, or any libraries, + * those must be + * provided in the JAR files. + */ + def createRemoteEnvironment( + host: String, + port: Int, + degreeOfParallelism: Int, + jarFiles: String*): StreamExecutionEnvironment = { + val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*) + javaEnv.setDegreeOfParallelism(degreeOfParallelism) + new StreamExecutionEnvironment(javaEnv) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala new file mode 100644 index 0000000..1bd1bfb --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import scala.Array.canBuildFrom +import scala.reflect.ClassTag +import org.apache.commons.lang.Validate +import org.apache.flink.api.common.functions.JoinFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.operators.Keys +import org.apache.flink.api.scala.typeutils.CaseClassSerializer +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } +import org.apache.flink.streaming.api.function.co.JoinWindowFunction +import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean +import org.apache.flink.streaming.util.keys.KeySelectorUtil +import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow +import java.util.concurrent.TimeUnit + +class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends +TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) { + + override def createNextWindowOperator() = { + new StreamJoinOperator.JoinWindow[I1, I2](this) + } +} + +object StreamJoinOperator { + + class JoinWindow[I1, I2](private[flink]op: StreamJoinOperator[I1, I2]) extends + TemporalWindow[JoinWindow[I1, I2]] { + + private[flink] val type1 = op.input1.getType() + + /** + * Continues a temporal Join transformation by defining + * the fields in the first stream to be used as keys for the join. + * The resulting incomplete join can be completed by JoinPredicate.equalTo() + * to define the second key. + */ + def where(fields: Int*) = { + new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys( + new Keys.ExpressionKeys(fields.toArray, type1), type1)) + } + + /** + * Continues a temporal Join transformation by defining + * the fields in the first stream to be used as keys for the join. + * The resulting incomplete join can be completed by JoinPredicate.equalTo() + * to define the second key. + */ + def where(firstField: String, otherFields: String*) = + new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys( + new Keys.ExpressionKeys(firstField +: otherFields.toArray, type1), type1)) + + /** + * Continues a temporal Join transformation by defining + * the keyselector function that will be used to extract keys from the first stream + * for the join. + * The resulting incomplete join can be completed by JoinPredicate.equalTo() + * to define the second key. + */ + def where[K: TypeInformation](fun: (I1) => K) = { + val keyType = implicitly[TypeInformation[K]] + val keyExtractor = new KeySelector[I1, K] { + val cleanFun = op.input1.clean(fun) + def getKey(in: I1) = cleanFun(in) + } + new JoinPredicate[I1, I2](op, keyExtractor) + } + + override def every(length: Long, timeUnit: TimeUnit): JoinWindow[I1, I2] = { + every(timeUnit.toMillis(length)) + } + + override def every(length: Long): JoinWindow[I1, I2] = { + op.slideInterval = length + this + } + + } + + class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], + private[flink] val keys1: KeySelector[I1, _]) { + private[flink] var keys2: KeySelector[I2, _] = null + private[flink] val type2 = op.input2.getType() + + /** + * Creates a temporal join transformation by defining the second join key. + * The returned transformation wrapes each joined element pair in a tuple2: + * (first, second) + * To define a custom wrapping, use JoinedStream.apply(...) + */ + def equalTo(fields: Int*): JoinedStream[I1, I2] = { + finish(KeySelectorUtil.getSelectorForKeys( + new Keys.ExpressionKeys(fields.toArray, type2), type2)) + } + + /** + * Creates a temporal join transformation by defining the second join key. + * The returned transformation wrapes each joined element pair in a tuple2: + * (first, second) + * To define a custom wrapping, use JoinedStream.apply(...) + */ + def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = + finish(KeySelectorUtil.getSelectorForKeys( + new Keys.ExpressionKeys(firstField +: otherFields.toArray, type2), type2)) + + /** + * Creates a temporal join transformation by defining the second join key. + * The returned transformation wrapes each joined element pair in a tuple2: + * (first, second) + * To define a custom wrapping, use JoinedStream.apply(...) + */ + def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = { + val keyType = implicitly[TypeInformation[K]] + val keyExtractor = new KeySelector[I2, K] { + val cleanFun = op.input1.clean(fun) + def getKey(in: I2) = cleanFun(in) + } + finish(keyExtractor) + } + + private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = { + this.keys2 = keys2 + new JoinedStream[I1, I2](this, createJoinOperator()) + } + + private def createJoinOperator(): JavaStream[(I1, I2)] = { + + val returnType = new CaseClassTypeInfo[(I1, I2)]( + + classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), Array("_1", "_2")) { + + override def createSerializer: TypeSerializer[(I1, I2)] = { + val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) + for (i <- 0 until getArity) { + fieldSerializers(i) = types(i).createSerializer + } + + new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) { + override def createInstance(fields: Array[AnyRef]) = { + (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2]) + } + } + } + } + + return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)) + .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)), + returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) + } + } + + class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends + DataStream[(I1, I2)](javaStream) { + + private val op = jp.op + + /** + * Sets a wrapper for the joined elements. For each joined pair, the result of the + * udf call will be emitted. + */ + def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = { + + val invokable = new CoWindowInvokable[I1, I2, R]( + clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1, + op.timeStamp2) + + javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(), + invokable) + + javaStream.setType(implicitly[TypeInformation[R]]) + } + } + + private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2], + joinFunction: (I1, I2) => R) = { + Validate.notNull(joinFunction, "Join function must not be null.") + + val joinFun = new JoinFunction[I1, I2, R] { + + val cleanFun = jp.op.input1.clean(joinFunction) + + override def join(first: I1, second: I2): R = { + cleanFun(first, second) + } + } + + new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala new file mode 100644 index 0000000..fd3a4a9 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.streaming.api.datastream.temporaloperator.{ TemporalOperator => JTempOp } +import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } +import org.apache.flink.streaming.api.datastream.temporaloperator.TemporalWindow +import org.apache.flink.streaming.api.windowing.helper.Timestamp +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment._ + +abstract class TemporalOperator[I1, I2, OP <: TemporalWindow[OP]]( + i1: JavaStream[I1], i2: JavaStream[I2]) extends JTempOp[I1, I2, OP](i1, i2) { + + def onWindow(length: Long, ts1: I1 => Long, ts2: I2 => Long, startTime: Long = 0): OP = { + val timeStamp1 = getTS(ts1) + val timeStamp2 = getTS(ts2) + onWindow(length, timeStamp1, timeStamp2, startTime) + } + + def getTS[R](ts: R => Long): Timestamp[R] = { + new Timestamp[R] { + val cleanFun = clean(ts, true) + def getTimestamp(in: R) = cleanFun(in) + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala new file mode 100644 index 0000000..5c734bf --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import scala.Array.canBuildFrom +import scala.collection.JavaConversions.iterableAsScalaIterable +import scala.reflect.ClassTag + +import org.apache.flink.api.common.functions.GroupReduceFunction +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase +import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator +import org.apache.flink.streaming.api.datastream.{WindowedDataStream => JavaWStream} +import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType +import org.apache.flink.streaming.api.function.aggregation.SumFunction +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean +import org.apache.flink.streaming.api.windowing.helper.WindowingHelper +import org.apache.flink.streaming.api.windowing.helper._ +import org.apache.flink.util.Collector + +class WindowedDataStream[T](javaStream: JavaWStream[T]) { + + /** + * Defines the slide size (trigger frequency) for the windowed data stream. + * This controls how often the user defined function will be triggered on + * the window. + */ + def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = + javaStream.every(windowingHelper: _*) + + /** + * Groups the elements of the WindowedDataStream using the given + * field positions. The window sizes (evictions) and slide sizes + * (triggers) will be calculated on the whole stream (in a central fashion), + * but the user defined functions will be applied on a per group basis. + * </br></br> To get windows and triggers on a per group basis apply the + * DataStream.window(...) operator on an already grouped data stream. + * + */ + def groupBy(fields: Int*): WindowedDataStream[T] = javaStream.groupBy(fields: _*) + + /** + * Groups the elements of the WindowedDataStream using the given + * field expressions. The window sizes (evictions) and slide sizes + * (triggers) will be calculated on the whole stream (in a central fashion), + * but the user defined functions will be applied on a per group basis. + * </br></br> To get windows and triggers on a per group basis apply the + * DataStream.window(...) operator on an already grouped data stream. + * + */ + def groupBy(firstField: String, otherFields: String*): WindowedDataStream[T] = + javaStream.groupBy(firstField +: otherFields.toArray: _*) + + /** + * Groups the elements of the WindowedDataStream using the given + * KeySelector function. The window sizes (evictions) and slide sizes + * (triggers) will be calculated on the whole stream (in a central fashion), + * but the user defined functions will be applied on a per group basis. + * </br></br> To get windows and triggers on a per group basis apply the + * DataStream.window(...) operator on an already grouped data stream. + * + */ + def groupBy[K: TypeInformation](fun: T => K): WindowedDataStream[T] = { + + val keyExtractor = new KeySelector[T, K] { + val cleanFun = clean(fun) + def getKey(in: T) = cleanFun(in) + } + javaStream.groupBy(keyExtractor) + } + + /** + * Applies a reduce transformation on the windowed data stream by reducing + * the current window at every trigger. + * + */ + def reduce(reducer: ReduceFunction[T]): DataStream[T] = { + if (reducer == null) { + throw new NullPointerException("Reduce function must not be null.") + } + javaStream.reduce(reducer) + } + + /** + * Applies a reduce transformation on the windowed data stream by reducing + * the current window at every trigger. + * + */ + def reduce(fun: (T, T) => T): DataStream[T] = { + if (fun == null) { + throw new NullPointerException("Reduce function must not be null.") + } + val reducer = new ReduceFunction[T] { + val cleanFun = clean(fun) + def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } + } + reduce(reducer) + } + + /** + * Applies a reduceGroup transformation on the windowed data stream by reducing + * the current window at every trigger. In contrast with the simple binary reduce operator, + * groupReduce exposes the whole window through the Iterable interface. + * </br> + * </br> + * Whenever possible try to use reduce instead of groupReduce for increased efficiency + */ + def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]): + DataStream[R] = { + if (reducer == null) { + throw new NullPointerException("GroupReduce function must not be null.") + } + javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]]) + } + + /** + * Applies a reduceGroup transformation on the windowed data stream by reducing + * the current window at every trigger. In contrast with the simple binary reduce operator, + * groupReduce exposes the whole window through the Iterable interface. + * </br> + * </br> + * Whenever possible try to use reduce instead of groupReduce for increased efficiency + */ + def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit): + DataStream[R] = { + if (fun == null) { + throw new NullPointerException("GroupReduce function must not be null.") + } + val reducer = new GroupReduceFunction[T, R] { + val cleanFun = clean(fun) + def reduce(in: java.lang.Iterable[T], out: Collector[R]) = { cleanFun(in, out) } + } + reduceGroup(reducer) + } + + /** + * Applies an aggregation that that gives the maximum of the elements in the window at + * the given position. + * + */ + def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position) + + /** + * Applies an aggregation that that gives the maximum of the elements in the window at + * the given field. + * + */ + def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field) + + /** + * Applies an aggregation that that gives the minimum of the elements in the window at + * the given position. + * + */ + def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position) + + /** + * Applies an aggregation that that gives the minimum of the elements in the window at + * the given field. + * + */ + def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field) + + /** + * Applies an aggregation that sums the elements in the window at the given position. + * + */ + def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position) + + /** + * Applies an aggregation that sums the elements in the window at the given field. + * + */ + def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field) + + /** + * Applies an aggregation that that gives the maximum element of the window by + * the given position. When equality, returns the first. + * + */ + def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY, + position) + + /** + * Applies an aggregation that that gives the maximum element of the window by + * the given field. When equality, returns the first. + * + */ + def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY, + field) + + /** + * Applies an aggregation that that gives the minimum element of the window by + * the given position. When equality, returns the first. + * + */ + def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY, + position) + + /** + * Applies an aggregation that that gives the minimum element of the window by + * the given field. When equality, returns the first. + * + */ + def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY, + field) + + private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = { + val position = fieldNames2Indices(javaStream.getType(), Array(field))(0) + aggregate(aggregationType, position) + } + + def aggregate(aggregationType: AggregationType, position: Int): + DataStream[T] = { + + val jStream = javaStream.asInstanceOf[JavaWStream[Product]] + val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]] + + val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position) + + val reducer = aggregationType match { + case AggregationType.SUM => new agg.Sum(SumFunction.getForClass( + outType.getTypeAt(position).getTypeClass())) + case _ => new agg.ProductComparableAggregator(aggregationType, true) + } + + new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]] + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala new file mode 100644 index 0000000..222eb6d --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api + +import _root_.scala.reflect.ClassTag +import language.experimental.macros +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils} +import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } +import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream } +import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } +import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream } + +package object scala { + // We have this here so that we always have generated TypeInformationS when + // using the Scala API + implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T] + + implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] = + new DataStream[R](javaStream) + + implicit def javaToScalaWindowedStream[R](javaWStream: JavaWStream[R]): WindowedDataStream[R] = + new WindowedDataStream[R](javaWStream) + + implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitDataStream[R] = + new SplitDataStream[R](javaStream) + + implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]): + ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream) + + implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: Seq[T]) : DataStream[T] = + StreamExecutionEnvironment.getExecutionEnvironment.fromCollection(scalaSeq) + + + private[flink] def fieldNames2Indices( + typeInfo: TypeInformation[_], + fields: Array[String]): Array[Int] = { + typeInfo match { + case ti: CaseClassTypeInfo[_] => + val result = ti.getFieldIndices(fields) + + if (result.contains(-1)) { + throw new IllegalArgumentException("Fields '" + fields.mkString(", ") + + "' are not valid for '" + ti.toString + "'.") + } + + result + + case _ => + throw new UnsupportedOperationException("Specifying fields by name is only" + + "supported on Case Classes (for now).") + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala new file mode 100644 index 0000000..eedee0e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.windowing + +import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta } +import org.apache.commons.lang.Validate +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean +import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction + +object Delta { + + /** + * Creates a delta helper representing a delta trigger or eviction policy. + * </br></br> This policy calculates a delta between the data point which + * triggered last and the currently arrived data point. It triggers if the + * delta is higher than a specified threshold. </br></br> In case it gets + * used for eviction, this policy starts from the first element of the + * buffer and removes all elements from the buffer which have a higher delta + * then the threshold. As soon as there is an element with a lower delta, + * the eviction stops. + */ + def of[T](threshold: Double, deltaFunction: (T, T) => Double, initVal: T): JavaDelta[T] = { + Validate.notNull(deltaFunction, "Delta function must not be null") + val df = new DeltaFunction[T] { + val cleanFun = clean(deltaFunction) + override def getDelta(first: T, second: T) = cleanFun(first, second) + } + JavaDelta.of(threshold, df, initVal) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala new file mode 100644 index 0000000..9a69369 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Time.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.windowing + +import java.util.concurrent.TimeUnit +import org.apache.flink.streaming.api.windowing.helper.{ Time => JavaTime } + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean +import org.apache.flink.streaming.api.windowing.helper.Timestamp +import org.apache.commons.lang.Validate + +object Time { + + /** + * Creates a helper representing a time trigger which triggers every given + * length (slide size) or a time eviction which evicts all elements older + * than length (window size) using System time. + * + */ + def of(windowSize: Long, timeUnit: TimeUnit): JavaTime[_] = + JavaTime.of(windowSize, timeUnit) + + /** + * Creates a helper representing a time trigger which triggers every given + * length (slide size) or a time eviction which evicts all elements older + * than length (window size) using a user defined timestamp extractor. + * + */ + def of[R](windowSize: Long, timestamp: R => Long, startTime: Long = 0): JavaTime[R] = { + Validate.notNull(timestamp, "Timestamp must not be null.") + val ts = new Timestamp[R] { + val fun = clean(timestamp, true) + override def getTimestamp(in: R) = fun(in) + } + JavaTime.of(windowSize, ts, startTime) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/pom.xml b/flink-staging/flink-streaming/pom.xml new file mode 100644 index 0000000..43b8181 --- /dev/null +++ b/flink-staging/flink-streaming/pom.xml @@ -0,0 +1,75 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-staging</artifactId> + <version>0.9-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-streaming-parent</artifactId> + <name>flink-streaming</name> + <packaging>pom</packaging> + + <modules> + <module>flink-streaming-core</module> + <module>flink-streaming-scala</module> + <module>flink-streaming-examples</module> + <module>flink-streaming-connectors</module> + </modules> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-compiler</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-tachyon/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tachyon/pom.xml b/flink-staging/flink-tachyon/pom.xml new file mode 100644 index 0000000..65aa295 --- /dev/null +++ b/flink-staging/flink-tachyon/pom.xml @@ -0,0 +1,117 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-staging</artifactId> + <version>0.9-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-tachyon</artifactId> + <name>flink-tachyon</name> + + <packaging>jar</packaging> + + <!-- + This is a Hadoop2 only flink module. + --> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java-examples</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.tachyonproject</groupId> + <artifactId>tachyon</artifactId> + <version>0.5.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.tachyonproject</groupId> + <artifactId>tachyon</artifactId> + <version>0.5.0</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + <version>7.6.8.v20121106</version><!--$NO-MVN-MAN-VER$--> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + <type>test-jar</type> + <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$--> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$--> + </dependency> + </dependencies> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <version>7.6.8.v20121106</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + <version>7.6.8.v20121106</version> + <scope>test</scope> + </dependency> + </dependencies> + </dependencyManagement> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java new file mode 100644 index 0000000..7318894 --- /dev/null +++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tachyon; + + +import org.apache.commons.io.IOUtils; +import org.apache.flink.api.common.io.FileOutputFormat; +import org.apache.flink.api.java.io.AvroOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.examples.java.wordcount.WordCount; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; + +/** + * This test should logically be located in the 'flink-runtime' tests. However, this project + * has already all dependencies required (flink-java-examples). Also, the DOPOneExecEnv is here. + */ +public class HDFSTest { + + private String hdfsURI; + private MiniDFSCluster hdfsCluster; + private org.apache.hadoop.fs.Path hdPath; + private org.apache.hadoop.fs.FileSystem hdfs; + + @Before + public void createHDFS() { + try { + Configuration hdConf = new Configuration(); + + File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); + hdfsCluster = builder.build(); + + hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/"; + + hdPath = new org.apache.hadoop.fs.Path("/test"); + hdfs = hdPath.getFileSystem(hdConf); + FSDataOutputStream stream = hdfs.create(hdPath); + for(int i = 0; i < 10; i++) { + stream.write("Hello HDFS\n".getBytes()); + } + stream.close(); + + } catch(Throwable e) { + e.printStackTrace(); + Assert.fail("Test failed " + e.getMessage()); + } + } + + @After + public void destroyHDFS() { + try { + hdfs.delete(hdPath, false); + hdfsCluster.shutdown(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + + @Test + public void testHDFS() { + + Path file = new Path(hdfsURI + hdPath); + org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result"); + try { + FileSystem fs = file.getFileSystem(); + Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem); + new TachyonFileSystemWrapperTest.DopOneTestEnvironment(); + try { + WordCount.main(new String[]{file.toString(), result.toString()}); + } catch(Throwable t) { + t.printStackTrace(); + Assert.fail("Test failed with " + t.getMessage()); + } + Assert.assertTrue("No result file present", hdfs.exists(result)); + // validate output: + org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result); + StringWriter writer = new StringWriter(); + IOUtils.copy(inStream, writer); + String resultString = writer.toString(); + + Assert.assertEquals("hdfs 10\n" + + "hello 10\n", resultString); + inStream.close(); + + } catch (IOException e) { + e.printStackTrace(); + Assert.fail("Error in test: " + e.getMessage() ); + } + } + + @Test + public void testAvroOut() { + String type = "one"; + AvroOutputFormat<String> avroOut = + new AvroOutputFormat<String>( String.class ); + + org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/avroTest"); + + avroOut.setOutputFilePath(new Path(result.toString())); + avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE); + avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS); + + try { + avroOut.open(0, 2); + avroOut.writeRecord(type); + avroOut.close(); + + avroOut.open(1, 2); + avroOut.writeRecord(type); + avroOut.close(); + + + Assert.assertTrue("No result file present", hdfs.exists(result)); + FileStatus[] files = hdfs.listStatus(result); + Assert.assertEquals(2, files.length); + for(FileStatus file : files) { + Assert.assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName())); + } + + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } +}