http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py new file mode 100644 index 0000000..2116d1f --- /dev/null +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py @@ -0,0 +1,264 @@ +# ############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +from flink.functions.MapFunction import MapFunction +from flink.functions.FlatMapFunction import FlatMapFunction +from flink.functions.FilterFunction import FilterFunction +from flink.functions.MapPartitionFunction import MapPartitionFunction +from flink.functions.ReduceFunction import ReduceFunction +from flink.functions.CrossFunction import CrossFunction +from flink.functions.JoinFunction import JoinFunction +from flink.functions.GroupReduceFunction import GroupReduceFunction +from flink.functions.CoGroupFunction import CoGroupFunction +from flink.plan.Constants import INT, STRING, FLOAT, BOOL, Order + + +class Mapper(MapFunction): + def map(self, value): + return value * value + + +class Filter(FilterFunction): + def __init__(self, limit): + super(Filter, self).__init__() + self.limit = limit + + def filter(self, value): + return value > self.limit + + +class FlatMap(FlatMapFunction): + def flat_map(self, value, collector): + collector.collect(value) + collector.collect(value * 2) + + +class MapPartition(MapPartitionFunction): + def map_partition(self, iterator, collector): + for value in iterator: + collector.collect(value * 2) + + +class Reduce(ReduceFunction): + def reduce(self, value1, value2): + return value1 + value2 + + +class Reduce2(ReduceFunction): + def reduce(self, value1, value2): + return (value1[0] + value2[0], value1[1] + value2[1], value1[2], value1[3] or value2[3]) + + +class Cross(CrossFunction): + def cross(self, value1, value2): + return (value1, value2[3]) + + +class MapperBcv(MapFunction): + def map(self, value): + factor = self.context.get_broadcast_variable("test")[0][0] + return value * factor + + +class Join(JoinFunction): + def join(self, value1, value2): + if value1[3]: + return value2[0] + str(value1[0]) + else: + return value2[0] + str(value1[1]) + + +class GroupReduce(GroupReduceFunction): + def reduce(self, iterator, collector): + if iterator.has_next(): + i, f, s, b = iterator.next() + for value in iterator: + i += value[0] + f += value[1] + b |= value[3] + collector.collect((i, f, s, b)) + + +class GroupReduce2(GroupReduceFunction): + def reduce(self, iterator, collector): + for value in iterator: + collector.collect(value) + + +class GroupReduce3(GroupReduceFunction): + def reduce(self, iterator, collector): + collector.collect(iterator.next()) + + def combine(self, iterator, collector): + if iterator.has_next(): + v1 = iterator.next() + if iterator.has_next(): + v2 = iterator.next() + if v1[0] < v2[0]: + collector.collect(v1) + else: + collector.collect(v2) + + +class CoGroup(CoGroupFunction): + def co_group(self, iterator1, iterator2, collector): + while iterator1.has_next() and iterator2.has_next(): + collector.collect((iterator1.next(), iterator2.next())) + + +class Id(MapFunction): + def map(self, value): + return value + + +class Verify(MapPartitionFunction): + def __init__(self, expected, name): + super(Verify, self).__init__() + self.expected = expected + self.name = name + + def map_partition(self, iterator, collector): + index = 0 + for value in iterator: + if value != self.expected[index]: + print(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) + raise Exception(self.name + " failed!") + index += 1 + collector.collect(self.name + " successful!") + + +class Verify2(MapPartitionFunction): + def __init__(self, expected, name): + super(Verify2, self).__init__() + self.expected = expected + self.name = name + + def map_partition(self, iterator, collector): + for value in iterator: + if value in self.expected: + try: + self.expected.remove(value) + except Exception: + raise Exception(self.name + " failed!") + collector.collect(self.name + " successful!") + + +if __name__ == "__main__": + env = get_environment() + + d1 = env.from_elements(1, 6, 12) + + d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False)) + + d3 = env.from_elements(("hello",), ("world",)) + + d4 = env.from_elements((1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)) + + d5 = env.from_elements((4.4, 4.3, 1), (4.3, 4.4, 1), (4.2, 4.1, 3), (4.1, 4.1, 3)) + + d1 \ + .map((lambda x: x * x), INT).map(Mapper(), INT) \ + .map_partition(Verify([1, 1296, 20736], "Map"), STRING).output() + + d1 \ + .map(Mapper(), INT).map((lambda x: x * x), INT) \ + .map_partition(Verify([1, 1296, 20736], "Chained Lambda"), STRING).output() + + d1 \ + .filter(Filter(5)).filter(Filter(8)) \ + .map_partition(Verify([12], "Filter"), STRING).output() + + d1 \ + .flat_map(FlatMap(), INT).flat_map(FlatMap(), INT) \ + .map_partition(Verify([1, 2, 2, 4, 6, 12, 12, 24, 12, 24, 24, 48], "FlatMap"), STRING).output() + + d1 \ + .map_partition(MapPartition(), INT) \ + .map_partition(Verify([2, 12, 24], "MapPartition"), STRING).output() + + d1 \ + .reduce(Reduce()) \ + .map_partition(Verify([19], "AllReduce"), STRING).output() + + d4 \ + .group_by(2).reduce(Reduce2()) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineReduce"), STRING).output() + + d4 \ + .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce(Reduce2()) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedReduce"), STRING).output() + + d1 \ + .map(MapperBcv(), INT).with_broadcast_set("test", d2) \ + .map_partition(Verify([1, 6, 12], "Broadcast"), STRING).output() + + d1 \ + .cross(d2).using(Cross(), (INT, BOOL)) \ + .map_partition(Verify([(1, True), (1, False), (6, True), (6, False), (12, True), (12, False)], "Cross"), STRING).output() + + d1 \ + .cross(d3) \ + .map_partition(Verify([(1, ("hello",)), (1, ("world",)), (6, ("hello",)), (6, ("world",)), (12, ("hello",)), (12, ("world",))], "Default Cross"), STRING).output() + + d2 \ + .cross(d3).project_second(0).project_first(0, 1) \ + .map_partition(Verify([("hello", 1, 0.5), ("world", 1, 0.5), ("hello", 2, 0.4), ("world", 2, 0.4)], "Project Cross"), STRING).output() + + d2 \ + .join(d3).where(2).equal_to(0).using(Join(), STRING) \ + .map_partition(Verify(["hello1", "world0.4"], "Join"), STRING).output() + + d2 \ + .join(d3).where(2).equal_to(0).project_first(0, 3).project_second(0) \ + .map_partition(Verify([(1, True, "hello"), (2, False, "world")], "Project Join"), STRING).output() + + d2 \ + .join(d3).where(2).equal_to(0) \ + .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 0.4, "world", False), ("world",))], "Default Join"), STRING).output() + + d2 \ + .project(0, 1, 2) \ + .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], "Project"), STRING).output() + + d2 \ + .union(d4) \ + .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)], "Union"), STRING).output() + + d4 \ + .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=False) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "AllGroupReduce"), STRING).output() + + d4 \ + .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedGroupReduce"), STRING).output() + + d4 \ + .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineGroupReduce"), STRING).output() + + d5 \ + .group_by(2).sort_group(0, Order.DESCENDING).sort_group(1, Order.ASCENDING).reduce_group(GroupReduce3(), (FLOAT, FLOAT, INT), combinable=True) \ + .map_partition(Verify([(4.3, 4.4, 1), (4.1, 4.1, 3)], "ChainedSortedGroupReduce"), STRING).output() + + d4 \ + .co_group(d5).where(0).equal_to(2).using(CoGroup(), ((INT, FLOAT, STRING, BOOL), (FLOAT, FLOAT, INT))) \ + .map_partition(Verify([((1, 0.5, "hello", True), (4.4, 4.3, 1)), ((1, 0.4, "hello", False), (4.3, 4.4, 1))], "CoGroup"), STRING).output() + + env.set_degree_of_parallelism(1) + + env.execute(local=True)
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py new file mode 100644 index 0000000..1f90587 --- /dev/null +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py @@ -0,0 +1,30 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +from flink.plan.Constants import WriteMode + +if __name__ == "__main__": + env = get_environment() + + d1 = env.read_text("src/test/python/org/apache/flink/python/api/data_text") + + d1.write_text("/tmp/flink/result", WriteMode.OVERWRITE) + + env.set_degree_of_parallelism(1) + + env.execute(local=True) http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py new file mode 100644 index 0000000..a3b8d07 --- /dev/null +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py @@ -0,0 +1,63 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +from flink.plan.Constants import _Fields +from flink.plan.Constants import INT, STRING, BOOL, FLOAT +import sys + +if __name__ == "__main__": + env = get_environment() + + d1 = env.from_elements(("hello", 4, 3.2, True)) + + d2 = env.from_elements("world") + + direct_from_source = d1.filter(lambda x:True) + + if direct_from_source._info[_Fields.TYPES] != ("hello", 4, 3.2, True): + sys.exit("Error deducting type directly from source.") + + from_common_udf = d1.map(lambda x: x[3], BOOL).filter(lambda x:True) + + if from_common_udf._info[_Fields.TYPES] != BOOL: + sys.exit("Error deducting type from common udf.") + + through_projection = d1.project(3, 2).filter(lambda x:True) + + if through_projection._info[_Fields.TYPES] != (True, 3.2): + sys.exit("Error deducting type through projection.") + + through_default_op = d1.cross(d2).filter(lambda x:True) + + if through_default_op._info[_Fields.TYPES] != (("hello", 4, 3.2, True), "world"): + sys.exit("Error deducting type through default J/C." +str(through_default_op._info[_Fields.TYPES])) + + through_prj_op = d1.cross(d2).project_first(1, 0).project_second().project_first(3, 2).filter(lambda x:True) + + if through_prj_op._info[_Fields.TYPES] != (4, "hello", "world", True, 3.2): + sys.exit("Error deducting type through projection J/C. "+str(through_prj_op._info[_Fields.TYPES])) + + + env = get_environment() + + msg = env.from_elements("Type deduction test successful.") + + msg.output() + + env.execute() + http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py new file mode 100644 index 0000000..f5f3ee4 --- /dev/null +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py @@ -0,0 +1,70 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +from flink.functions.MapFunction import MapFunction +from flink.functions.MapPartitionFunction import MapPartitionFunction +from flink.plan.Constants import BOOL, INT, FLOAT, STRING, BYTES + + +class Verify(MapPartitionFunction): + def __init__(self, expected, name): + super(Verify, self).__init__() + self.expected = expected + self.name = name + + def map_partition(self, iterator, collector): + index = 0 + for value in iterator: + if value != self.expected[index]: + print(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) + raise Exception(self.name + " failed!") + index += 1 + collector.collect(self.name + " successful!") + + +class Id(MapFunction): + def map(self, value): + return value + + +if __name__ == "__main__": + env = get_environment() + + d1 = env.from_elements(bytearray(b"hello"), bytearray(b"world")) + + d1.map(Id(), BYTES).map_partition(Verify([bytearray(b"hello"), bytearray(b"world")], "Byte"), STRING).output() + + d2 = env.from_elements(1,2,3,4,5) + + d2.map(Id(), INT).map_partition(Verify([1,2,3,4,5], "Int"), STRING).output() + + d3 = env.from_elements(True, True, False) + + d3.map(Id(), BOOL).map_partition(Verify([True, True, False], "Bool"), STRING).output() + + d4 = env.from_elements(1.4, 1.7, 12312.23) + + d4.map(Id(), FLOAT).map_partition(Verify([1.4, 1.7, 12312.23], "Float"), STRING).output() + + d5 = env.from_elements("hello", "world") + + d5.map(Id(), STRING).map_partition(Verify(["hello", "world"], "String"), STRING).output() + + env.set_degree_of_parallelism(1) + + env.execute(local=True) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml index 065127b..e1dc2d9 100644 --- a/flink-libraries/pom.xml +++ b/flink-libraries/pom.xml @@ -36,5 +36,6 @@ under the License. <modules> <module>flink-gelly</module> <module>flink-gelly-scala</module> + <module>flink-python</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml b/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml deleted file mode 100644 index 7a8f8af..0000000 --- a/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml +++ /dev/null @@ -1,61 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-language-binding-parent</artifactId> - <version>1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-language-binding-generic</artifactId> - <name>flink-language-binding-generic</name> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-optimizer</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java deleted file mode 100644 index bdb0444..0000000 --- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java +++ /dev/null @@ -1,242 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.languagebinding.api.java.common; - -import java.io.IOException; -import java.util.Arrays; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.tuple.Tuple; -import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject; -import org.apache.flink.core.fs.FileSystem.WriteMode; -import org.apache.flink.languagebinding.api.java.common.PlanBinder.Operation; -import static org.apache.flink.languagebinding.api.java.common.PlanBinder.normalizeKeys; -import static org.apache.flink.languagebinding.api.java.common.PlanBinder.toIntArray; -import org.apache.flink.languagebinding.api.java.common.streaming.Receiver; - -/** - * Container for all generic information related to operations. This class contains the absolute minimum fields that are - * required for all operations. This class should be extended to contain any additional fields required on a - * per-language basis. - */ -public class OperationInfo { - public int parentID; //DataSet that an operation is applied on - public int otherID; //secondary DataSet - public int setID; //ID for new DataSet - public String[] keys; - public String[] keys1; //join/cogroup keys - public String[] keys2; //join/cogroup keys - public TypeInformation<?> types; //typeinformation about output type - public AggregationEntry[] aggregates; - public ProjectionEntry[] projections; //projectFirst/projectSecond - public Object[] values; - public int count; - public int field; - public int[] fields; - public Order order; - public String path; - public String fieldDelimiter; - public String lineDelimiter; - public long from; - public long to; - public WriteMode writeMode; - public boolean toError; - public String name; - - public OperationInfo() { - } - - public OperationInfo(Receiver receiver, Operation identifier) throws IOException { - Object tmpType; - switch (identifier) { - case SOURCE_CSV: - setID = (Integer) receiver.getRecord(true); - path = (String) receiver.getRecord(); - fieldDelimiter = (String) receiver.getRecord(); - lineDelimiter = (String) receiver.getRecord(); - tmpType = (Tuple) receiver.getRecord(); - types = tmpType == null ? null : getForObject(tmpType); - return; - case SOURCE_TEXT: - setID = (Integer) receiver.getRecord(true); - path = (String) receiver.getRecord(); - return; - case SOURCE_VALUE: - setID = (Integer) receiver.getRecord(true); - int valueCount = (Integer) receiver.getRecord(true); - values = new Object[valueCount]; - for (int x = 0; x < valueCount; x++) { - values[x] = receiver.getRecord(); - } - return; - case SOURCE_SEQ: - setID = (Integer) receiver.getRecord(true); - from = (Long) receiver.getRecord(); - to = (Long) receiver.getRecord(); - return; - case SINK_CSV: - parentID = (Integer) receiver.getRecord(true); - path = (String) receiver.getRecord(); - fieldDelimiter = (String) receiver.getRecord(); - lineDelimiter = (String) receiver.getRecord(); - writeMode = ((Integer) receiver.getRecord(true)) == 1 - ? WriteMode.OVERWRITE - : WriteMode.NO_OVERWRITE; - return; - case SINK_TEXT: - parentID = (Integer) receiver.getRecord(true); - path = (String) receiver.getRecord(); - writeMode = ((Integer) receiver.getRecord(true)) == 1 - ? WriteMode.OVERWRITE - : WriteMode.NO_OVERWRITE; - return; - case SINK_PRINT: - parentID = (Integer) receiver.getRecord(true); - toError = (Boolean) receiver.getRecord(); - return; - case BROADCAST: - parentID = (Integer) receiver.getRecord(true); - otherID = (Integer) receiver.getRecord(true); - name = (String) receiver.getRecord(); - return; - } - setID = (Integer) receiver.getRecord(true); - parentID = (Integer) receiver.getRecord(true); - switch (identifier) { - case AGGREGATE: - count = (Integer) receiver.getRecord(true); - aggregates = new AggregationEntry[count]; - for (int x = 0; x < count; x++) { - int encodedAgg = (Integer) receiver.getRecord(true); - int field = (Integer) receiver.getRecord(true); - aggregates[x] = new AggregationEntry(encodedAgg, field); - } - return; - case FIRST: - count = (Integer) receiver.getRecord(true); - return; - case DISTINCT: - case GROUPBY: - case PARTITION_HASH: - keys = normalizeKeys(receiver.getRecord(true)); - return; - case PROJECTION: - fields = toIntArray(receiver.getRecord(true)); - return; - case REBALANCE: - return; - case SORT: - field = (Integer) receiver.getRecord(true); - int encodedOrder = (Integer) receiver.getRecord(true); - switch (encodedOrder) { - case 0: - order = Order.NONE; - break; - case 1: - order = Order.ASCENDING; - break; - case 2: - order = Order.DESCENDING; - break; - case 3: - order = Order.ANY; - break; - default: - order = Order.NONE; - break; - } - return; - case UNION: - otherID = (Integer) receiver.getRecord(true); - return; - } - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("SetID: ").append(setID).append("\n"); - sb.append("ParentID: ").append(parentID).append("\n"); - sb.append("OtherID: ").append(otherID).append("\n"); - sb.append("Name: ").append(name).append("\n"); - sb.append("Types: ").append(types).append("\n"); - sb.append("Keys1: ").append(Arrays.toString(keys1)).append("\n"); - sb.append("Keys2: ").append(Arrays.toString(keys2)).append("\n"); - sb.append("Keys: ").append(Arrays.toString(keys)).append("\n"); - sb.append("Aggregates: ").append(Arrays.toString(aggregates)).append("\n"); - sb.append("Projections: ").append(Arrays.toString(projections)).append("\n"); - sb.append("Count: ").append(count).append("\n"); - sb.append("Field: ").append(field).append("\n"); - sb.append("Order: ").append(order.toString()).append("\n"); - sb.append("Path: ").append(path).append("\n"); - sb.append("FieldDelimiter: ").append(fieldDelimiter).append("\n"); - sb.append("LineDelimiter: ").append(lineDelimiter).append("\n"); - sb.append("From: ").append(from).append("\n"); - sb.append("To: ").append(to).append("\n"); - sb.append("WriteMode: ").append(writeMode).append("\n"); - sb.append("toError: ").append(toError).append("\n"); - return sb.toString(); - } - - public static class AggregationEntry { - public Aggregations agg; - public int field; - - public AggregationEntry(int encodedAgg, int field) { - switch (encodedAgg) { - case 0: - agg = Aggregations.MAX; - break; - case 1: - agg = Aggregations.MIN; - break; - case 2: - agg = Aggregations.SUM; - break; - } - this.field = field; - } - - @Override - public String toString() { - return agg.toString() + " - " + field; - } - } - - public static class ProjectionEntry { - public ProjectionSide side; - public int[] keys; - - public ProjectionEntry(ProjectionSide side, int[] keys) { - this.side = side; - this.keys = keys; - } - - @Override - public String toString() { - return side + " - " + Arrays.toString(keys); - } - } - - public enum ProjectionSide { - FIRST, - SECOND - } - - public enum DatasizeHint { - NONE, - TINY, - HUGE - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java deleted file mode 100644 index ca252f8..0000000 --- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java +++ /dev/null @@ -1,582 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.languagebinding.api.java.common; - -import java.io.IOException; -import java.util.HashMap; - -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvInputFormat; -import org.apache.flink.api.java.io.PrintingOutputFormat; -import org.apache.flink.api.java.operators.AggregateOperator; -import org.apache.flink.api.java.operators.CrossOperator.DefaultCross; -import org.apache.flink.api.java.operators.CrossOperator.ProjectCross; -import org.apache.flink.api.java.operators.Grouping; -import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin; -import org.apache.flink.api.java.operators.JoinOperator.ProjectJoin; -import org.apache.flink.api.java.operators.SortedGrouping; -import org.apache.flink.api.java.operators.UdfOperator; -import org.apache.flink.api.java.operators.UnsortedGrouping; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; -import org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint; -import static org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.HUGE; -import static org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.NONE; -import static org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.TINY; -import org.apache.flink.languagebinding.api.java.common.OperationInfo.ProjectionEntry; -import org.apache.flink.languagebinding.api.java.common.streaming.Receiver; - -/** - * Generic class to construct a Flink plan based on external data. - * - * @param <INFO> - */ -public abstract class PlanBinder<INFO extends OperationInfo> { - public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT"; - public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_"; - - protected static String FLINK_HDFS_PATH = "hdfs:/tmp"; - public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + "/flink_data"; - - public static boolean DEBUG = false; - - protected HashMap<Integer, Object> sets = new HashMap(); - public static ExecutionEnvironment env; - protected Receiver receiver; - - public static final int MAPPED_FILE_SIZE = 1024 * 1024 * 64; - - //====Plan========================================================================================================== - protected void receivePlan() throws IOException { - receiveParameters(); - receiveOperations(); - } - - //====Environment=================================================================================================== - /** - * This enum contains the identifiers for all supported environment parameters. - */ - private enum Parameters { - DOP, - MODE, - RETRY, - DEBUG - } - - private void receiveParameters() throws IOException { - Integer parameterCount = (Integer) receiver.getRecord(true); - - for (int x = 0; x < parameterCount; x++) { - Tuple value = (Tuple) receiver.getRecord(true); - switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) { - case DOP: - Integer dop = (Integer) value.getField(1); - env.setParallelism(dop); - break; - case MODE: - FLINK_HDFS_PATH = (Boolean) value.getField(1) ? "file:/tmp/flink" : "hdfs:/tmp/flink"; - break; - case RETRY: - int retry = (Integer) value.getField(1); - env.setNumberOfExecutionRetries(retry); - break; - case DEBUG: - DEBUG = (Boolean) value.getField(1); - break; - } - } - if (env.getParallelism() < 0) { - env.setParallelism(1); - } - } - - //====Operations==================================================================================================== - /** - * This enum contains the identifiers for all supported non-UDF DataSet operations. - */ - protected enum Operation { - SOURCE_CSV, SOURCE_TEXT, SOURCE_VALUE, SOURCE_SEQ, SINK_CSV, SINK_TEXT, SINK_PRINT, - PROJECTION, SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE, - REBALANCE, PARTITION_HASH, - BROADCAST - } - - /** - * This enum contains the identifiers for all supported UDF DataSet operations. - */ - protected enum AbstractOperation { - COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, JOIN, JOIN_H, JOIN_T, MAP, REDUCE, MAPPARTITION, - } - - protected void receiveOperations() throws IOException { - Integer operationCount = (Integer) receiver.getRecord(true); - for (int x = 0; x < operationCount; x++) { - String identifier = (String) receiver.getRecord(); - Operation op = null; - AbstractOperation aop = null; - try { - op = Operation.valueOf(identifier.toUpperCase()); - } catch (IllegalArgumentException iae) { - try { - aop = AbstractOperation.valueOf(identifier.toUpperCase()); - } catch (IllegalArgumentException iae2) { - throw new IllegalArgumentException("Invalid operation specified: " + identifier); - } - } - if (op != null) { - switch (op) { - case SOURCE_CSV: - createCsvSource(createOperationInfo(op)); - break; - case SOURCE_TEXT: - createTextSource(createOperationInfo(op)); - break; - case SOURCE_VALUE: - createValueSource(createOperationInfo(op)); - break; - case SOURCE_SEQ: - createSequenceSource(createOperationInfo(op)); - break; - case SINK_CSV: - createCsvSink(createOperationInfo(op)); - break; - case SINK_TEXT: - createTextSink(createOperationInfo(op)); - break; - case SINK_PRINT: - createPrintSink(createOperationInfo(op)); - break; - case BROADCAST: - createBroadcastVariable(createOperationInfo(op)); - break; - case AGGREGATE: - createAggregationOperation(createOperationInfo(op)); - break; - case DISTINCT: - createDistinctOperation(createOperationInfo(op)); - break; - case FIRST: - createFirstOperation(createOperationInfo(op)); - break; - case PARTITION_HASH: - createHashPartitionOperation(createOperationInfo(op)); - break; - case PROJECTION: - createProjectOperation(createOperationInfo(op)); - break; - case REBALANCE: - createRebalanceOperation(createOperationInfo(op)); - break; - case GROUPBY: - createGroupOperation(createOperationInfo(op)); - break; - case SORT: - createSortOperation(createOperationInfo(op)); - break; - case UNION: - createUnionOperation(createOperationInfo(op)); - break; - } - } - if (aop != null) { - switch (aop) { - case COGROUP: - createCoGroupOperation(createOperationInfo(aop)); - break; - case CROSS: - createCrossOperation(NONE, createOperationInfo(aop)); - break; - case CROSS_H: - createCrossOperation(HUGE, createOperationInfo(aop)); - break; - case CROSS_T: - createCrossOperation(TINY, createOperationInfo(aop)); - break; - case FILTER: - createFilterOperation(createOperationInfo(aop)); - break; - case FLATMAP: - createFlatMapOperation(createOperationInfo(aop)); - break; - case GROUPREDUCE: - createGroupReduceOperation(createOperationInfo(aop)); - break; - case JOIN: - createJoinOperation(NONE, createOperationInfo(aop)); - break; - case JOIN_H: - createJoinOperation(HUGE, createOperationInfo(aop)); - break; - case JOIN_T: - createJoinOperation(TINY, createOperationInfo(aop)); - break; - case MAP: - createMapOperation(createOperationInfo(aop)); - break; - case MAPPARTITION: - createMapPartitionOperation(createOperationInfo(aop)); - break; - case REDUCE: - createReduceOperation(createOperationInfo(aop)); - break; - } - } - } - } - - /** - * This method creates an OperationInfo object based on the operation-identifier passed. - * - * @param operationIdentifier - * @return - * @throws IOException - */ - protected OperationInfo createOperationInfo(Operation operationIdentifier) throws IOException { - return new OperationInfo(receiver, operationIdentifier); - } - - /** - * This method creates an OperationInfo object based on the operation-identifier passed. - * - * @param operationIdentifier - * @return - * @throws IOException - */ - protected abstract INFO createOperationInfo(AbstractOperation operationIdentifier) throws IOException; - - private void createCsvSource(OperationInfo info) throws IOException { - if (!(info.types instanceof CompositeType)) { - throw new RuntimeException("The output type of a csv source has to be a tuple or a " + - "pojo type. The derived type is " + info); - } - - sets.put(info.setID, env.createInput(new CsvInputFormat(new Path(info.path), - info.lineDelimiter, info.fieldDelimiter, (CompositeType)info.types), info.types) - .name("CsvSource")); - } - - private void createTextSource(OperationInfo info) throws IOException { - sets.put(info.setID, env.readTextFile(info.path).name("TextSource")); - } - - private void createValueSource(OperationInfo info) throws IOException { - sets.put(info.setID, env.fromElements(info.values).name("ValueSource")); - } - - private void createSequenceSource(OperationInfo info) throws IOException { - sets.put(info.setID, env.generateSequence(info.from, info.to).name("SequenceSource")); - } - - private void createCsvSink(OperationInfo info) throws IOException { - DataSet parent = (DataSet) sets.get(info.parentID); - parent.writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).name("CsvSink"); - } - - private void createTextSink(OperationInfo info) throws IOException { - DataSet parent = (DataSet) sets.get(info.parentID); - parent.writeAsText(info.path, info.writeMode).name("TextSink"); - } - - private void createPrintSink(OperationInfo info) throws IOException { - DataSet parent = (DataSet) sets.get(info.parentID); - parent.output(new PrintingOutputFormat(info.toError)); - } - - private void createBroadcastVariable(OperationInfo info) throws IOException { - UdfOperator op1 = (UdfOperator) sets.get(info.parentID); - DataSet op2 = (DataSet) sets.get(info.otherID); - - op1.withBroadcastSet(op2, info.name); - Configuration c = ((UdfOperator) op1).getParameters(); - - if (c == null) { - c = new Configuration(); - } - - int count = c.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0); - c.setInteger(PLANBINDER_CONFIG_BCVAR_COUNT, count + 1); - c.setString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + count, info.name); - - op1.withParameters(c); - } - - private void createAggregationOperation(OperationInfo info) throws IOException { - DataSet op = (DataSet) sets.get(info.parentID); - AggregateOperator ao = op.aggregate(info.aggregates[0].agg, info.aggregates[0].field); - - for (int x = 1; x < info.count; x++) { - ao = ao.and(info.aggregates[x].agg, info.aggregates[x].field); - } - - sets.put(info.setID, ao.name("Aggregation")); - } - - private void createDistinctOperation(OperationInfo info) throws IOException { - DataSet op = (DataSet) sets.get(info.parentID); - sets.put(info.setID, info.keys.length == 0 ? op.distinct() : op.distinct(info.keys).name("Distinct")); - } - - private void createFirstOperation(OperationInfo info) throws IOException { - DataSet op = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op.first(info.count).name("First")); - } - - private void createGroupOperation(OperationInfo info) throws IOException { - DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op1.groupBy(info.keys)); - } - - private void createHashPartitionOperation(OperationInfo info) throws IOException { - DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op1.partitionByHash(info.keys)); - } - - private void createProjectOperation(OperationInfo info) throws IOException { - DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op1.project(info.fields).name("Projection")); - } - - private void createRebalanceOperation(OperationInfo info) throws IOException { - DataSet op = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op.rebalance().name("Rebalance")); - } - - private void createSortOperation(OperationInfo info) throws IOException { - Grouping op1 = (Grouping) sets.get(info.parentID); - if (op1 instanceof UnsortedGrouping) { - sets.put(info.setID, ((UnsortedGrouping) op1).sortGroup(info.field, info.order)); - return; - } - if (op1 instanceof SortedGrouping) { - sets.put(info.setID, ((SortedGrouping) op1).sortGroup(info.field, info.order)); - } - } - - private void createUnionOperation(OperationInfo info) throws IOException { - DataSet op1 = (DataSet) sets.get(info.parentID); - DataSet op2 = (DataSet) sets.get(info.otherID); - sets.put(info.setID, op1.union(op2).name("Union")); - } - - private void createCoGroupOperation(INFO info) { - DataSet op1 = (DataSet) sets.get(info.parentID); - DataSet op2 = (DataSet) sets.get(info.otherID); - sets.put(info.setID, applyCoGroupOperation(op1, op2, info.keys1, info.keys2, info)); - } - - private void createCrossOperation(DatasizeHint mode, INFO info) { - DataSet op1 = (DataSet) sets.get(info.parentID); - DataSet op2 = (DataSet) sets.get(info.otherID); - - if (info.types != null && (info.projections == null || info.projections.length == 0)) { - sets.put(info.setID, applyCrossOperation(op1, op2, mode, info)); - } else { - DefaultCross defaultResult; - switch (mode) { - case NONE: - defaultResult = op1.cross(op2); - break; - case HUGE: - defaultResult = op1.crossWithHuge(op2); - break; - case TINY: - defaultResult = op1.crossWithTiny(op2); - break; - default: - throw new IllegalArgumentException("Invalid Cross mode specified: " + mode); - } - if (info.projections.length == 0) { - sets.put(info.setID, defaultResult.name("DefaultCross")); - } else { - ProjectCross project = null; - for (ProjectionEntry pe : info.projections) { - switch (pe.side) { - case FIRST: - project = project == null ? defaultResult.projectFirst(pe.keys) : project.projectFirst(pe.keys); - break; - case SECOND: - project = project == null ? defaultResult.projectSecond(pe.keys) : project.projectSecond(pe.keys); - break; - } - } - sets.put(info.setID, project.name("ProjectCross")); - } - } - } - - private void createFilterOperation(INFO info) { - DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, applyFilterOperation(op1, info)); - } - - private void createFlatMapOperation(INFO info) { - DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, applyFlatMapOperation(op1, info)); - } - - private void createGroupReduceOperation(INFO info) { - Object op1 = sets.get(info.parentID); - if (op1 instanceof DataSet) { - sets.put(info.setID, applyGroupReduceOperation((DataSet) op1, info)); - return; - } - if (op1 instanceof UnsortedGrouping) { - sets.put(info.setID, applyGroupReduceOperation((UnsortedGrouping) op1, info)); - return; - } - if (op1 instanceof SortedGrouping) { - sets.put(info.setID, applyGroupReduceOperation((SortedGrouping) op1, info)); - } - } - - private void createJoinOperation(DatasizeHint mode, INFO info) { - DataSet op1 = (DataSet) sets.get(info.parentID); - DataSet op2 = (DataSet) sets.get(info.otherID); - - if (info.types != null && (info.projections == null || info.projections.length == 0)) { - sets.put(info.setID, applyJoinOperation(op1, op2, info.keys1, info.keys2, mode, info)); - } else { - DefaultJoin defaultResult = createDefaultJoin(op1, op2, info.keys1, info.keys2, mode); - if (info.projections.length == 0) { - sets.put(info.setID, defaultResult.name("DefaultJoin")); - } else { - ProjectJoin project = null; - for (ProjectionEntry pe : info.projections) { - switch (pe.side) { - case FIRST: - project = project == null ? defaultResult.projectFirst(pe.keys) : project.projectFirst(pe.keys); - break; - case SECOND: - project = project == null ? defaultResult.projectSecond(pe.keys) : project.projectSecond(pe.keys); - break; - } - } - sets.put(info.setID, project.name("ProjectJoin")); - } - } - } - - protected DefaultJoin createDefaultJoin(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, DatasizeHint mode) { - switch (mode) { - case NONE: - return op1.join(op2).where(firstKeys).equalTo(secondKeys); - case HUGE: - return op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys); - case TINY: - return op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys); - default: - throw new IllegalArgumentException("Invalid join mode specified."); - } - } - - private void createMapOperation(INFO info) { - DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, applyMapOperation(op1, info)); - } - - private void createMapPartitionOperation(INFO info) { - DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, applyMapPartitionOperation(op1, info)); - } - - private void createReduceOperation(INFO info) { - Object op1 = sets.get(info.parentID); - if (op1 instanceof DataSet) { - sets.put(info.setID, applyReduceOperation((DataSet) op1, info)); - return; - } - if (op1 instanceof UnsortedGrouping) { - sets.put(info.setID, applyReduceOperation((UnsortedGrouping) op1, info)); - } - } - - protected abstract DataSet applyCoGroupOperation(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, INFO info); - - protected abstract DataSet applyCrossOperation(DataSet op1, DataSet op2, DatasizeHint mode, INFO info); - - protected abstract DataSet applyFilterOperation(DataSet op1, INFO info); - - protected abstract DataSet applyFlatMapOperation(DataSet op1, INFO info); - - protected abstract DataSet applyGroupReduceOperation(DataSet op1, INFO info); - - protected abstract DataSet applyGroupReduceOperation(UnsortedGrouping op1, INFO info); - - protected abstract DataSet applyGroupReduceOperation(SortedGrouping op1, INFO info); - - protected abstract DataSet applyJoinOperation(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, DatasizeHint mode, INFO info); - - protected abstract DataSet applyMapOperation(DataSet op1, INFO info); - - protected abstract DataSet applyMapPartitionOperation(DataSet op1, INFO info); - - protected abstract DataSet applyReduceOperation(DataSet op1, INFO info); - - protected abstract DataSet applyReduceOperation(UnsortedGrouping op1, INFO info); - - //====Utility======================================================================================================= - protected static String[] normalizeKeys(Object keys) { - if (keys instanceof Tuple) { - Tuple tupleKeys = (Tuple) keys; - if (tupleKeys.getArity() == 0) { - return new String[0]; - } - if (tupleKeys.getField(0) instanceof Integer) { - String[] stringKeys = new String[tupleKeys.getArity()]; - for (int x = 0; x < stringKeys.length; x++) { - stringKeys[x] = "f" + (Integer) tupleKeys.getField(x); - } - return stringKeys; - } - if (tupleKeys.getField(0) instanceof String) { - return tupleToStringArray(tupleKeys); - } - throw new RuntimeException("Key argument contains field that is neither an int nor a String."); - } - if (keys instanceof int[]) { - int[] intKeys = (int[]) keys; - String[] stringKeys = new String[intKeys.length]; - for (int x = 0; x < stringKeys.length; x++) { - stringKeys[x] = "f" + intKeys[x]; - } - return stringKeys; - } - throw new RuntimeException("Key argument is neither an int[] nor a Tuple."); - } - - protected static int[] toIntArray(Object key) { - if (key instanceof Tuple) { - Tuple tuple = (Tuple) key; - int[] keys = new int[tuple.getArity()]; - for (int y = 0; y < tuple.getArity(); y++) { - keys[y] = (Integer) tuple.getField(y); - } - return keys; - } - if (key instanceof int[]) { - return (int[]) key; - } - throw new RuntimeException("Key argument is neither an int[] nor a Tuple."); - } - - protected static String[] tupleToStringArray(Tuple tuple) { - String[] keys = new String[tuple.getArity()]; - for (int y = 0; y < tuple.getArity(); y++) { - keys[y] = (String) tuple.getField(y); - } - return keys; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java deleted file mode 100644 index 59ed20c..0000000 --- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java +++ /dev/null @@ -1,360 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.languagebinding.api.java.common.streaming; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.io.Serializable; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import org.apache.flink.api.common.functions.AbstractRichFunction; -//CHECKSTYLE.OFF: AvoidStarImport - tuple imports -import org.apache.flink.api.java.tuple.*; -import static org.apache.flink.languagebinding.api.java.common.streaming.Sender.*; -//CHECKSTYLE.ON: AvoidStarImport -import static org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR; -import static org.apache.flink.languagebinding.api.java.common.PlanBinder.MAPPED_FILE_SIZE; -import org.apache.flink.util.Collector; - -/** - * General-purpose class to read data from memory-mapped files. - */ -public class Receiver implements Serializable { - private static final long serialVersionUID = -2474088929850009968L; - - private final AbstractRichFunction function; - - private File inputFile; - private RandomAccessFile inputRAF; - private FileChannel inputChannel; - private MappedByteBuffer fileBuffer; - - private Deserializer<?> deserializer = null; - - public Receiver(AbstractRichFunction function) { - this.function = function; - } - - //=====Setup======================================================================================================== - public void open(String path) throws IOException { - setupMappedFile(path); - } - - private void setupMappedFile(String inputFilePath) throws FileNotFoundException, IOException { - File x = new File(FLINK_TMP_DATA_DIR); - x.mkdirs(); - - inputFile = new File(inputFilePath); - if (inputFile.exists()) { - inputFile.delete(); - } - inputFile.createNewFile(); - inputRAF = new RandomAccessFile(inputFilePath, "rw"); - inputRAF.setLength(MAPPED_FILE_SIZE); - inputRAF.seek(MAPPED_FILE_SIZE - 1); - inputRAF.writeByte(0); - inputRAF.seek(0); - inputChannel = inputRAF.getChannel(); - fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE); - } - - public void close() throws IOException { - closeMappedFile(); - } - - private void closeMappedFile() throws IOException { - inputChannel.close(); - inputRAF.close(); - } - - //=====Record-API=================================================================================================== - /** - * Loads a buffer from the memory-mapped file. The records contained within the buffer can be accessed using - * collectRecord(). These records do not necessarily have to be of the same type. This method requires external - * synchronization. - * - * @throws IOException - */ - private void loadBuffer() throws IOException { - int count = 0; - while (fileBuffer.get(0) == 0 && count < 10) { - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { - } - fileBuffer.load(); - count++; - } - if (fileBuffer.get(0) == 0) { - throw new RuntimeException("External process not responding."); - } - fileBuffer.position(1); - } - - /** - * Returns a record from the buffer. Note: This method cannot be replaced with specific methods like readInt() or - * similar. The PlanBinder requires a method that can return any kind of object. - * - * @return read record - * @throws IOException - */ - public Object getRecord() throws IOException { - return getRecord(false); - } - - /** - * Returns a record from the buffer. Note: This method cannot be replaced with specific methods like readInt() or - * similar. The PlanBinder requires a method that can return any kind of object. - * - * @param normalized flag indicating whether certain types should be normalized - * @return read record - * @throws IOException - */ - public Object getRecord(boolean normalized) throws IOException { - if (fileBuffer.position() == 0) { - loadBuffer(); - } - return receiveField(normalized); - } - - /** - * Reads a single primitive value or tuple from the buffer. - * - * @return primitive value or tuple - * @throws IOException - */ - private Object receiveField(boolean normalized) throws IOException { - byte type = fileBuffer.get(); - switch (type) { - case TYPE_TUPLE: - int tupleSize = fileBuffer.get(); - Tuple tuple = createTuple(tupleSize); - for (int x = 0; x < tupleSize; x++) { - tuple.setField(receiveField(normalized), x); - } - return tuple; - case TYPE_BOOLEAN: - return fileBuffer.get() == 1; - case TYPE_BYTE: - return fileBuffer.get(); - case TYPE_SHORT: - if (normalized) { - return (int) fileBuffer.getShort(); - } else { - return fileBuffer.getShort(); - } - case TYPE_INTEGER: - return fileBuffer.getInt(); - case TYPE_LONG: - if (normalized) { - return new Long(fileBuffer.getLong()).intValue(); - } else { - return fileBuffer.getLong(); - } - case TYPE_FLOAT: - if (normalized) { - return (double) fileBuffer.getFloat(); - } else { - return fileBuffer.getFloat(); - } - case TYPE_DOUBLE: - return fileBuffer.getDouble(); - case TYPE_STRING: - int stringSize = fileBuffer.getInt(); - byte[] buffer = new byte[stringSize]; - fileBuffer.get(buffer); - return new String(buffer); - case TYPE_BYTES: - int bytessize = fileBuffer.getInt(); - byte[] bytebuffer = new byte[bytessize]; - fileBuffer.get(bytebuffer); - return bytebuffer; - case TYPE_NULL: - return null; - default: - throw new IllegalArgumentException("Unknown TypeID encountered: " + type); - } - } - - //=====Buffered-API================================================================================================= - /** - * Reads a buffer of the given size from the memory-mapped file, and collects all records contained. This method - * assumes that all values in the buffer are of the same type. This method does NOT take care of synchronization. - * The user must guarantee that the buffer was completely written before calling this method. - * - * @param c Collector to collect records - * @param bufferSize size of the buffer - * @throws IOException - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void collectBuffer(Collector c, int bufferSize) throws IOException { - fileBuffer.position(0); - - if (deserializer == null) { - byte type = fileBuffer.get(); - deserializer = getDeserializer(type); - } - while (fileBuffer.position() < bufferSize) { - c.collect(deserializer.deserialize()); - } - } - - //=====Deserializer================================================================================================= - private Deserializer<?> getDeserializer(byte type) { - switch (type) { - case TYPE_TUPLE: - return new TupleDeserializer(); - case TYPE_BOOLEAN: - return new BooleanDeserializer(); - case TYPE_BYTE: - return new ByteDeserializer(); - case TYPE_BYTES: - return new BytesDeserializer(); - case TYPE_SHORT: - return new ShortDeserializer(); - case TYPE_INTEGER: - return new IntDeserializer(); - case TYPE_LONG: - return new LongDeserializer(); - case TYPE_STRING: - return new StringDeserializer(); - case TYPE_FLOAT: - return new FloatDeserializer(); - case TYPE_DOUBLE: - return new DoubleDeserializer(); - case TYPE_NULL: - return new NullDeserializer(); - default: - throw new IllegalArgumentException("Unknown TypeID encountered: " + type); - - } - } - - private interface Deserializer<T> { - public T deserialize(); - - } - - private class BooleanDeserializer implements Deserializer<Boolean> { - @Override - public Boolean deserialize() { - return fileBuffer.get() == 1; - } - } - - private class ByteDeserializer implements Deserializer<Byte> { - @Override - public Byte deserialize() { - return fileBuffer.get(); - } - } - - private class ShortDeserializer implements Deserializer<Short> { - @Override - public Short deserialize() { - return fileBuffer.getShort(); - } - } - - private class IntDeserializer implements Deserializer<Integer> { - @Override - public Integer deserialize() { - return fileBuffer.getInt(); - } - } - - private class LongDeserializer implements Deserializer<Long> { - @Override - public Long deserialize() { - return fileBuffer.getLong(); - } - } - - private class FloatDeserializer implements Deserializer<Float> { - @Override - public Float deserialize() { - return fileBuffer.getFloat(); - } - } - - private class DoubleDeserializer implements Deserializer<Double> { - @Override - public Double deserialize() { - return fileBuffer.getDouble(); - } - } - - private class StringDeserializer implements Deserializer<String> { - private int size; - - @Override - public String deserialize() { - size = fileBuffer.getInt(); - byte[] buffer = new byte[size]; - fileBuffer.get(buffer); - return new String(buffer); - } - } - - private class NullDeserializer implements Deserializer<Object> { - @Override - public Object deserialize() { - return null; - } - } - - private class BytesDeserializer implements Deserializer<byte[]> { - @Override - public byte[] deserialize() { - int length = fileBuffer.getInt(); - byte[] result = new byte[length]; - fileBuffer.get(result); - return result; - } - - } - - private class TupleDeserializer implements Deserializer<Tuple> { - Deserializer<?>[] deserializer = null; - Tuple reuse; - - public TupleDeserializer() { - int size = fileBuffer.getInt(); - reuse = createTuple(size); - deserializer = new Deserializer[size]; - for (int x = 0; x < deserializer.length; x++) { - deserializer[x] = getDeserializer(fileBuffer.get()); - } - } - - @Override - public Tuple deserialize() { - for (int x = 0; x < deserializer.length; x++) { - reuse.setField(deserializer[x].deserialize(), x); - } - return reuse; - } - } - - public static Tuple createTuple(int size) { - try { - return Tuple.getTupleClass(size).newInstance(); - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java deleted file mode 100644 index 3e0c317..0000000 --- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java +++ /dev/null @@ -1,411 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.languagebinding.api.java.common.streaming; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.java.tuple.Tuple; -import static org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR; -import static org.apache.flink.languagebinding.api.java.common.PlanBinder.MAPPED_FILE_SIZE; - -/** - * General-purpose class to write data to memory-mapped files. - */ -public class Sender implements Serializable { - public static final byte TYPE_TUPLE = (byte) 11; - public static final byte TYPE_BOOLEAN = (byte) 10; - public static final byte TYPE_BYTE = (byte) 9; - public static final byte TYPE_SHORT = (byte) 8; - public static final byte TYPE_INTEGER = (byte) 7; - public static final byte TYPE_LONG = (byte) 6; - public static final byte TYPE_DOUBLE = (byte) 4; - public static final byte TYPE_FLOAT = (byte) 5; - public static final byte TYPE_CHAR = (byte) 3; - public static final byte TYPE_STRING = (byte) 2; - public static final byte TYPE_BYTES = (byte) 1; - public static final byte TYPE_NULL = (byte) 0; - - private final AbstractRichFunction function; - - private File outputFile; - private RandomAccessFile outputRAF; - private FileChannel outputChannel; - private MappedByteBuffer fileBuffer; - - private final ByteBuffer[] saved = new ByteBuffer[2]; - - private final Serializer[] serializer = new Serializer[2]; - - public Sender(AbstractRichFunction function) { - this.function = function; - } - - //=====Setup======================================================================================================== - public void open(String path) throws IOException { - setupMappedFile(path); - } - - private void setupMappedFile(String outputFilePath) throws FileNotFoundException, IOException { - File x = new File(FLINK_TMP_DATA_DIR); - x.mkdirs(); - - outputFile = new File(outputFilePath); - if (outputFile.exists()) { - outputFile.delete(); - } - outputFile.createNewFile(); - outputRAF = new RandomAccessFile(outputFilePath, "rw"); - outputRAF.setLength(MAPPED_FILE_SIZE); - outputRAF.seek(MAPPED_FILE_SIZE - 1); - outputRAF.writeByte(0); - outputRAF.seek(0); - outputChannel = outputRAF.getChannel(); - fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE); - } - - public void close() throws IOException { - closeMappedFile(); - } - - private void closeMappedFile() throws IOException { - outputChannel.close(); - outputRAF.close(); - } - - /** - * Resets this object to the post-configuration state. - */ - public void reset() { - serializer[0] = null; - serializer[1] = null; - fileBuffer.clear(); - } - - //=====Serialization================================================================================================ - /** - * Writes a single record to the memory-mapped file. This method does NOT take care of synchronization. The user - * must guarantee that the file may be written to before calling this method. This method essentially reserves the - * whole buffer for one record. As such it imposes some performance restrictions and should only be used when - * absolutely necessary. - * - * @param value record to send - * @return size of the written buffer - * @throws IOException - */ - public int sendRecord(Object value) throws IOException { - fileBuffer.clear(); - int group = 0; - - serializer[group] = getSerializer(value); - ByteBuffer bb = serializer[group].serialize(value); - if (bb.remaining() > MAPPED_FILE_SIZE) { - throw new RuntimeException("Serialized object does not fit into a single buffer."); - } - fileBuffer.put(bb); - - int size = fileBuffer.position(); - - reset(); - return size; - } - - public boolean hasRemaining(int group) { - return saved[group] != null; - } - - /** - * Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values - * in the iterator are of the same type. This method does NOT take care of synchronization. The caller must - * guarantee that the file may be written to before calling this method. - * - * @param i iterator containing records - * @param group group to which the iterator belongs, most notably used by CoGroup-functions. - * @return size of the written buffer - * @throws IOException - */ - public int sendBuffer(Iterator i, int group) throws IOException { - fileBuffer.clear(); - - Object value; - ByteBuffer bb; - if (serializer[group] == null) { - value = i.next(); - serializer[group] = getSerializer(value); - bb = serializer[group].serialize(value); - if (bb.remaining() > MAPPED_FILE_SIZE) { - throw new RuntimeException("Serialized object does not fit into a single buffer."); - } - fileBuffer.put(bb); - - } - if (saved[group] != null) { - fileBuffer.put(saved[group]); - saved[group] = null; - } - while (i.hasNext() && saved[group] == null) { - value = i.next(); - bb = serializer[group].serialize(value); - if (bb.remaining() > MAPPED_FILE_SIZE) { - throw new RuntimeException("Serialized object does not fit into a single buffer."); - } - if (bb.remaining() <= fileBuffer.remaining()) { - fileBuffer.put(bb); - } else { - saved[group] = bb; - } - } - - int size = fileBuffer.position(); - return size; - } - - private enum SupportedTypes { - TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, OTHER, NULL - } - - //=====Serializer=================================================================================================== - private Serializer getSerializer(Object value) throws IOException { - String className = value.getClass().getSimpleName().toUpperCase(); - if (className.startsWith("TUPLE")) { - className = "TUPLE"; - } - if (className.startsWith("BYTE[]")) { - className = "BYTES"; - } - SupportedTypes type = SupportedTypes.valueOf(className); - switch (type) { - case TUPLE: - fileBuffer.put(TYPE_TUPLE); - fileBuffer.putInt(((Tuple) value).getArity()); - return new TupleSerializer((Tuple) value); - case BOOLEAN: - fileBuffer.put(TYPE_BOOLEAN); - return new BooleanSerializer(); - case BYTE: - fileBuffer.put(TYPE_BYTE); - return new ByteSerializer(); - case BYTES: - fileBuffer.put(TYPE_BYTES); - return new BytesSerializer(); - case CHARACTER: - fileBuffer.put(TYPE_CHAR); - return new CharSerializer(); - case SHORT: - fileBuffer.put(TYPE_SHORT); - return new ShortSerializer(); - case INTEGER: - fileBuffer.put(TYPE_INTEGER); - return new IntSerializer(); - case LONG: - fileBuffer.put(TYPE_LONG); - return new LongSerializer(); - case STRING: - fileBuffer.put(TYPE_STRING); - return new StringSerializer(); - case FLOAT: - fileBuffer.put(TYPE_FLOAT); - return new FloatSerializer(); - case DOUBLE: - fileBuffer.put(TYPE_DOUBLE); - return new DoubleSerializer(); - case NULL: - fileBuffer.put(TYPE_NULL); - return new NullSerializer(); - default: - throw new IllegalArgumentException("Unknown Type encountered: " + type); - } - } - - private abstract class Serializer<T> { - protected ByteBuffer buffer; - - public Serializer(int capacity) { - buffer = ByteBuffer.allocate(capacity); - } - - public ByteBuffer serialize(T value) { - buffer.clear(); - serializeInternal(value); - buffer.flip(); - return buffer; - } - - public abstract void serializeInternal(T value); - } - - private class ByteSerializer extends Serializer<Byte> { - public ByteSerializer() { - super(1); - } - - @Override - public void serializeInternal(Byte value) { - buffer.put(value); - } - } - - private class BooleanSerializer extends Serializer<Boolean> { - public BooleanSerializer() { - super(1); - } - - @Override - public void serializeInternal(Boolean value) { - buffer.put(value ? (byte) 1 : (byte) 0); - } - } - - private class CharSerializer extends Serializer<Character> { - public CharSerializer() { - super(4); - } - - @Override - public void serializeInternal(Character value) { - buffer.put((value + "").getBytes()); - } - } - - private class ShortSerializer extends Serializer<Short> { - public ShortSerializer() { - super(2); - } - - @Override - public void serializeInternal(Short value) { - buffer.putShort(value); - } - } - - private class IntSerializer extends Serializer<Integer> { - public IntSerializer() { - super(4); - } - - @Override - public void serializeInternal(Integer value) { - buffer.putInt(value); - } - } - - private class LongSerializer extends Serializer<Long> { - public LongSerializer() { - super(8); - } - - @Override - public void serializeInternal(Long value) { - buffer.putLong(value); - } - } - - private class StringSerializer extends Serializer<String> { - public StringSerializer() { - super(0); - } - - @Override - public void serializeInternal(String value) { - byte[] bytes = value.getBytes(); - buffer = ByteBuffer.allocate(bytes.length + 4); - buffer.putInt(bytes.length); - buffer.put(bytes); - } - } - - private class FloatSerializer extends Serializer<Float> { - public FloatSerializer() { - super(4); - } - - @Override - public void serializeInternal(Float value) { - buffer.putFloat(value); - } - } - - private class DoubleSerializer extends Serializer<Double> { - public DoubleSerializer() { - super(8); - } - - @Override - public void serializeInternal(Double value) { - buffer.putDouble(value); - } - } - - private class NullSerializer extends Serializer<Object> { - public NullSerializer() { - super(0); - } - - @Override - public void serializeInternal(Object value) { - } - } - - private class BytesSerializer extends Serializer<byte[]> { - public BytesSerializer() { - super(0); - } - - @Override - public void serializeInternal(byte[] value) { - buffer = ByteBuffer.allocate(4 + value.length); - buffer.putInt(value.length); - buffer.put(value); - } - } - - private class TupleSerializer extends Serializer<Tuple> { - private final Serializer[] serializer; - private final List<ByteBuffer> buffers; - - public TupleSerializer(Tuple value) throws IOException { - super(0); - serializer = new Serializer[value.getArity()]; - buffers = new ArrayList(); - for (int x = 0; x < serializer.length; x++) { - serializer[x] = getSerializer(value.getField(x)); - } - } - - @Override - public void serializeInternal(Tuple value) { - int length = 0; - for (int x = 0; x < serializer.length; x++) { - serializer[x].buffer.clear(); - serializer[x].serializeInternal(value.getField(x)); - length += serializer[x].buffer.position(); - buffers.add(serializer[x].buffer); - } - buffer = ByteBuffer.allocate(length); - for (ByteBuffer b : buffers) { - b.flip(); - buffer.put(b); - } - buffers.clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java deleted file mode 100644 index 1ad0606..0000000 --- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.languagebinding.api.java.common.streaming; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; - -/** - * Simple utility class to print all contents of an inputstream to stdout. - */ -public class StreamPrinter extends Thread { - private final BufferedReader reader; - private final boolean wrapInException; - private StringBuilder msg; - - public StreamPrinter(InputStream stream) { - this(stream, false, null); - } - - public StreamPrinter(InputStream stream, boolean wrapInException, StringBuilder msg) { - this.reader = new BufferedReader(new InputStreamReader(stream)); - this.wrapInException = wrapInException; - this.msg = msg; - } - - @Override - public void run() { - String line; - try { - if (wrapInException) { - while ((line = reader.readLine()) != null) { - msg.append("\n" + line); - } - } else { - while ((line = reader.readLine()) != null) { - System.out.println(line); - System.out.flush(); - } - } - } catch (IOException ex) { - } - } -}