http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java new file mode 100644 index 0000000..1c273d3 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.aggregation; + +import org.apache.flink.api.common.functions.ReduceFunction; + +public abstract class AggregationFunction<T> implements ReduceFunction<T> { + private static final long serialVersionUID = 1L; + + public int position; + + public AggregationFunction(int pos) { + this.position = pos; + } + + public static enum AggregationType { + SUM, MIN, MAX, MINBY, MAXBY, + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java new file mode 100644 index 0000000..226c45a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.aggregation; + +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.List; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.PojoComparator; + +public abstract class ComparableAggregator<T> extends AggregationFunction<T> { + + private static final long serialVersionUID = 1L; + + public Comparator comparator; + public boolean byAggregate; + public boolean first; + + public ComparableAggregator(int pos, AggregationType aggregationType, boolean first) { + super(pos); + this.comparator = Comparator.getForAggregation(aggregationType); + this.byAggregate = (aggregationType == AggregationType.MAXBY) + || (aggregationType == AggregationType.MINBY); + this.first = first; + } + + public static <R> AggregationFunction<R> getAggregator(int positionToAggregate, + TypeInformation<R> typeInfo, AggregationType aggregationType) { + return getAggregator(positionToAggregate, typeInfo, aggregationType, false); + } + + public static <R> AggregationFunction<R> getAggregator(int positionToAggregate, + TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first) { + + if (typeInfo.isTupleType()) { + return new TupleComparableAggregator<R>(positionToAggregate, aggregationType, first); + } else if (typeInfo instanceof BasicArrayTypeInfo + || typeInfo instanceof PrimitiveArrayTypeInfo) { + return new ArrayComparableAggregator<R>(positionToAggregate, aggregationType, first); + } else { + return new SimpleComparableAggregator<R>(aggregationType); + } + } + + public static <R> AggregationFunction<R> getAggregator(String field, + TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first) { + + return new PojoComparableAggregator<R>(field, typeInfo, aggregationType, first); + } + + private static class TupleComparableAggregator<T> extends ComparableAggregator<T> { + + private static final long serialVersionUID = 1L; + + public TupleComparableAggregator(int pos, AggregationType aggregationType, boolean first) { + super(pos, aggregationType, first); + } + + @SuppressWarnings("unchecked") + @Override + public T reduce(T value1, T value2) throws Exception { + Tuple tuple1 = (Tuple) value1; + Tuple tuple2 = (Tuple) value2; + + Comparable<Object> o1 = tuple1.getField(position); + Object o2 = tuple2.getField(position); + + int c = comparator.isExtremal(o1, o2); + + if (byAggregate) { + if (c == 1) { + return (T) tuple1; + } + if (first) { + if (c == 0) { + return (T) tuple1; + } + } + + return (T) tuple2; + + } else { + if (c == 1) { + tuple2.setField(o1, position); + } + return (T) tuple2; + } + + } + } + + private static class ArrayComparableAggregator<T> extends ComparableAggregator<T> { + + private static final long serialVersionUID = 1L; + + public ArrayComparableAggregator(int pos, AggregationType aggregationType, boolean first) { + super(pos, aggregationType, first); + } + + @SuppressWarnings("unchecked") + @Override + public T reduce(T array1, T array2) throws Exception { + + Object v1 = Array.get(array1, position); + Object v2 = Array.get(array2, position); + + int c = comparator.isExtremal((Comparable<Object>) v1, v2); + + if (byAggregate) { + if (c == 1) { + return array1; + } + if (first) { + if (c == 0) { + return array1; + } + } + + return array2; + } else { + if (c == 1) { + Array.set(array2, position, v1); + } + + return array2; + } + } + + } + + private static class SimpleComparableAggregator<T> extends ComparableAggregator<T> { + + private static final long serialVersionUID = 1L; + + public SimpleComparableAggregator(AggregationType aggregationType) { + super(0, aggregationType, false); + } + + @SuppressWarnings("unchecked") + @Override + public T reduce(T value1, T value2) throws Exception { + + if (comparator.isExtremal((Comparable<Object>) value1, value2) == 1) { + return value1; + } else { + return value2; + } + } + + } + + private static class PojoComparableAggregator<T> extends ComparableAggregator<T> { + + private static final long serialVersionUID = 1L; + PojoComparator<T> pojoComparator; + + public PojoComparableAggregator(String field, TypeInformation<?> typeInfo, + AggregationType aggregationType, boolean first) { + super(0, aggregationType, first); + if (!(typeInfo instanceof CompositeType<?>)) { + throw new IllegalArgumentException( + "Key expressions are only supported on POJO types and Tuples. " + + "A type is considered a POJO if all its fields are public, or have both getters and setters defined"); + } + + @SuppressWarnings("unchecked") + CompositeType<T> cType = (CompositeType<T>) typeInfo; + + List<FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field); + int logicalKeyPosition = fieldDescriptors.get(0).getPosition(); + + if (cType instanceof PojoTypeInfo) { + pojoComparator = (PojoComparator<T>) cType.createComparator( + new int[] { logicalKeyPosition }, new boolean[] { false }, 0); + } else { + throw new IllegalArgumentException( + "Key expressions are only supported on POJO types. " + + "A type is considered a POJO if all its fields are public, or have both getters and setters defined"); + } + } + + @Override + public T reduce(T value1, T value2) throws Exception { + + Field[] keyFields = pojoComparator.getKeyFields(); + Object field1 = pojoComparator.accessField(keyFields[0], value1); + Object field2 = pojoComparator.accessField(keyFields[0], value2); + + @SuppressWarnings("unchecked") + int c = comparator.isExtremal((Comparable<Object>) field1, field2); + + if (byAggregate) { + if (c == 1) { + return value1; + } + if (first) { + if (c == 0) { + return value1; + } + } + + return value2; + } else { + if (c == 1) { + keyFields[0].set(value2, field1); + } + + return value2; + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java new file mode 100644 index 0000000..f56774b --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/Comparator.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.aggregation; + +import java.io.Serializable; + +import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType; + +public abstract class Comparator implements Serializable { + + private static final long serialVersionUID = 1L; + + public abstract <R> int isExtremal(Comparable<R> o1, R o2); + + public static Comparator getForAggregation(AggregationType type) { + switch (type) { + case MAX: + return new MaxComparator(); + case MIN: + return new MinComparator(); + case MINBY: + return new MinByComparator(); + case MAXBY: + return new MaxByComparator(); + default: + throw new IllegalArgumentException("Unsupported aggregation type."); + } + } + + private static class MaxComparator extends Comparator { + + private static final long serialVersionUID = 1L; + + @Override + public <R> int isExtremal(Comparable<R> o1, R o2) { + return o1.compareTo(o2) > 0 ? 1 : 0; + } + + } + + private static class MaxByComparator extends Comparator { + + private static final long serialVersionUID = 1L; + + @Override + public <R> int isExtremal(Comparable<R> o1, R o2) { + int c = o1.compareTo(o2); + if (c > 0) { + return 1; + } + if (c == 0) { + return 0; + } else { + return -1; + } + } + + } + + private static class MinByComparator extends Comparator { + + private static final long serialVersionUID = 1L; + + @Override + public <R> int isExtremal(Comparable<R> o1, R o2) { + int c = o1.compareTo(o2); + if (c < 0) { + return 1; + } + if (c == 0) { + return 0; + } else { + return -1; + } + } + + } + + private static class MinComparator extends Comparator { + + private static final long serialVersionUID = 1L; + + @Override + public <R> int isExtremal(Comparable<R> o1, R o2) { + return o1.compareTo(o2) < 0 ? 1 : 0; + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java new file mode 100644 index 0000000..142028b --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.aggregation; + +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.List; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.PojoComparator; + +public abstract class SumAggregator { + + public static <T> ReduceFunction<T> getSumFunction(int pos, Class<?> clazz, + TypeInformation<T> typeInfo) { + + if (typeInfo.isTupleType()) { + return new TupleSumAggregator<T>(pos, SumFunction.getForClass(clazz)); + } else if (typeInfo instanceof BasicArrayTypeInfo + || typeInfo instanceof PrimitiveArrayTypeInfo) { + return new ArraySumAggregator<T>(pos, SumFunction.getForClass(clazz)); + } else { + return new SimpleSumAggregator<T>(SumFunction.getForClass(clazz)); + } + + } + + public static <T> ReduceFunction<T> getSumFunction(String field, TypeInformation<T> typeInfo) { + + return new PojoSumAggregator<T>(field, typeInfo); + } + + private static class TupleSumAggregator<T> extends AggregationFunction<T> { + + private static final long serialVersionUID = 1L; + + SumFunction adder; + + public TupleSumAggregator(int pos, SumFunction adder) { + super(pos); + this.adder = adder; + } + + @SuppressWarnings("unchecked") + @Override + public T reduce(T value1, T value2) throws Exception { + + Tuple tuple1 = (Tuple) value1; + Tuple tuple2 = (Tuple) value2; + + tuple2.setField(adder.add(tuple1.getField(position), tuple2.getField(position)), + position); + + return (T) tuple2; + } + + } + + private static class ArraySumAggregator<T> extends AggregationFunction<T> { + + private static final long serialVersionUID = 1L; + + SumFunction adder; + + public ArraySumAggregator(int pos, SumFunction adder) { + super(pos); + this.adder = adder; + } + + @Override + public T reduce(T value1, T value2) throws Exception { + + Object v1 = Array.get(value1, position); + Object v2 = Array.get(value2, position); + Array.set(value2, position, adder.add(v1, v2)); + return value2; + } + + } + + private static class SimpleSumAggregator<T> extends AggregationFunction<T> { + + private static final long serialVersionUID = 1L; + + SumFunction adder; + + public SimpleSumAggregator(SumFunction adder) { + super(0); + this.adder = adder; + } + + @SuppressWarnings("unchecked") + @Override + public T reduce(T value1, T value2) throws Exception { + + return (T) adder.add(value1, value2); + } + + } + + private static class PojoSumAggregator<T> extends AggregationFunction<T> { + + private static final long serialVersionUID = 1L; + SumFunction adder; + PojoComparator<T> comparator; + + public PojoSumAggregator(String field, TypeInformation<?> type) { + super(0); + if (!(type instanceof CompositeType<?>)) { + throw new IllegalArgumentException( + "Key expressions are only supported on POJO types and Tuples. " + + "A type is considered a POJO if all its fields are public, or have both getters and setters defined"); + } + + @SuppressWarnings("unchecked") + CompositeType<T> cType = (CompositeType<T>) type; + + List<FlatFieldDescriptor> fieldDescriptors = cType.getFlatFields(field); + + int logicalKeyPosition = fieldDescriptors.get(0).getPosition(); + Class<?> keyClass = fieldDescriptors.get(0).getType().getTypeClass(); + + adder = SumFunction.getForClass(keyClass); + + if (cType instanceof PojoTypeInfo) { + comparator = (PojoComparator<T>) cType.createComparator( + new int[] { logicalKeyPosition }, new boolean[] { false }, 0); + } else { + throw new IllegalArgumentException( + "Key expressions are only supported on POJO types. " + + "A type is considered a POJO if all its fields are public, or have both getters and setters defined"); + } + } + + @Override + public T reduce(T value1, T value2) throws Exception { + + Field[] keyFields = comparator.getKeyFields(); + Object field1 = comparator.accessField(keyFields[0], value1); + Object field2 = comparator.accessField(keyFields[0], value2); + + keyFields[0].set(value2, adder.add(field1, field2)); + + return value2; + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java new file mode 100644 index 0000000..2aef19c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumFunction.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.aggregation; + +import java.io.Serializable; + +public abstract class SumFunction implements Serializable{ + + private static final long serialVersionUID = 1L; + + public abstract Object add(Object o1, Object o2); + + public static SumFunction getForClass(Class<?> clazz) { + + if (clazz == Integer.class) { + return new IntSum(); + } else if (clazz == Long.class) { + return new LongSum(); + } else if (clazz == Short.class) { + return new ShortSum(); + } else if (clazz == Double.class) { + return new DoubleSum(); + } else if (clazz == Float.class) { + return new FloatSum(); + } else if (clazz == Byte.class) { + return new ByteSum(); + } else { + throw new RuntimeException("DataStream cannot be summed because the class " + + clazz.getSimpleName() + " does not support the + operator."); + } + } + + public static class IntSum extends SumFunction { + private static final long serialVersionUID = 1L; + + @Override + public Object add(Object value1, Object value2) { + return (Integer) value1 + (Integer) value2; + } + } + + public static class LongSum extends SumFunction { + private static final long serialVersionUID = 1L; + + @Override + public Object add(Object value1, Object value2) { + return (Long) value1 + (Long) value2; + } + } + + public static class DoubleSum extends SumFunction { + + private static final long serialVersionUID = 1L; + + @Override + public Object add(Object value1, Object value2) { + return (Double) value1 + (Double) value2; + } + } + + public static class ShortSum extends SumFunction { + private static final long serialVersionUID = 1L; + + @Override + public Object add(Object value1, Object value2) { + return (Short) value1 + (Short) value2; + } + } + + public static class FloatSum extends SumFunction { + private static final long serialVersionUID = 1L; + + @Override + public Object add(Object value1, Object value2) { + return (Float) value1 + (Float) value2; + } + } + + public static class ByteSum extends SumFunction { + private static final long serialVersionUID = 1L; + + @Override + public Object add(Object value1, Object value2) { + return (Byte) value1 + (Byte) value2; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java new file mode 100644 index 0000000..10e8bac --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoFlatMapFunction.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.co; + +import java.io.Serializable; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.util.Collector; + +/** + * A CoFlatMapFunction represents a FlatMap transformation with two different + * input types. + * + * @param <IN1> + * Type of the first input. + * @param <IN2> + * Type of the second input. + * @param <OUT> + * Output type. + */ +public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable { + + public void flatMap1(IN1 value, Collector<OUT> out) throws Exception; + + public void flatMap2(IN2 value, Collector<OUT> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java new file mode 100644 index 0000000..2903828 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.co; + +import java.io.Serializable; + +import org.apache.flink.api.common.functions.Function; + +/** + * A CoMapFunction represents a Map transformation with two different input + * types. + * + * @param <IN1> + * Type of the first input. + * @param <IN2> + * Type of the second input. + * @param <OUT> + * Output type. + */ +public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable { + + public OUT map1(IN1 value); + + public OUT map2(IN2 value); +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java new file mode 100644 index 0000000..879a1b4 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoReduceFunction.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.co; + +import java.io.Serializable; + +import org.apache.flink.api.common.functions.Function; + +/** + * The CoReduceFunction interface represents a Reduce transformation with two + * different input streams. The reduce1 function combine groups of elements of + * the first input with the same key to a single value, while reduce2 combine + * groups of elements of the second input with the same key to a single value. + * Each produced values are mapped to the same type by map1 and map2, + * respectively, to form one output stream. + * + * The basic syntax for using a grouped ReduceFunction is as follows: + * + * <pre> + * <blockquote> + * ConnectedDataStream<X> input = ...; + * + * ConnectedDataStream<X> result = input.groupBy(keyPosition1, keyPosition2) + * .reduce(new MyCoReduceFunction(), keyPosition1, keyPosition2).addSink(...); + * </blockquote> + * </pre> + * <p> + * + * @param <IN1> + * Type of the first input. + * @param <IN2> + * Type of the second input. + * @param <OUT> + * Output type. + */ +public interface CoReduceFunction<IN1, IN2, OUT> extends Function, Serializable { + + /** + * The core method of CoReduceFunction, combining two values of the first + * input into one value of the same type. The reduce1 function is + * consecutively applied to all values of a group until only a single value + * remains. + * + * @param value1 + * The first value to combine. + * @param value2 + * The second value to combine. + * @return The combined value of both input values. + * + * @throws Exception + * This method may throw exceptions. Throwing an exception will + * cause the operation to fail and may trigger recovery. + */ + public IN1 reduce1(IN1 value1, IN1 value2); + + /** + * The core method of ReduceFunction, combining two values of the second + * input into one value of the same type. The reduce2 function is + * consecutively applied to all values of a group until only a single value + * remains. + * + * @param value1 + * The first value to combine. + * @param value2 + * The second value to combine. + * @return The combined value of both input values. + * + * @throws Exception + * This method may throw exceptions. Throwing an exception will + * cause the operation to fail and may trigger recovery. + */ + public IN2 reduce2(IN2 value1, IN2 value2); + + /** + * Maps the reduced first input to the output type. + * + * @param value + * Type of the first input. + * @return the output type. + */ + public OUT map1(IN1 value); + + /** + * Maps the reduced second input to the output type. + * + * @param value + * Type of the second input. + * @return the output type. + */ + public OUT map2(IN2 value); +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoWindowFunction.java new file mode 100644 index 0000000..2f2514f --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoWindowFunction.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.co; + +import java.io.Serializable; +import java.util.List; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.util.Collector; + +public interface CoWindowFunction<IN1, IN2, O> extends Function, Serializable { + + public void coWindow(List<IN1> first, List<IN2> second, Collector<O> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java new file mode 100644 index 0000000..9cafcd1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CrossWindowFunction.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.co; + +import java.util.List; + +import org.apache.flink.api.common.functions.CrossFunction; +import org.apache.flink.util.Collector; + +public class CrossWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> { + private static final long serialVersionUID = 1L; + + private CrossFunction<IN1, IN2, OUT> crossFunction; + + public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) { + this.crossFunction = crossFunction; + } + + @Override + public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception { + for (IN1 firstValue : first) { + for (IN2 secondValue : second) { + out.collect(crossFunction.cross(firstValue, secondValue)); + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java new file mode 100644 index 0000000..9b39f33 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/JoinWindowFunction.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.co; + +import java.util.List; + +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.util.Collector; + +public class JoinWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> { + private static final long serialVersionUID = 1L; + + private KeySelector<IN1, ?> keySelector1; + private KeySelector<IN2, ?> keySelector2; + private JoinFunction<IN1, IN2, OUT> joinFunction; + + public JoinWindowFunction(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2, + JoinFunction<IN1, IN2, OUT> joinFunction) { + this.keySelector1 = keySelector1; + this.keySelector2 = keySelector2; + this.joinFunction = joinFunction; + } + + @Override + public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out) throws Exception { + for (IN1 item1 : first) { + Object key1 = keySelector1.getKey(item1); + + for (IN2 item2 : second) { + Object key2 = keySelector2.getKey(item2); + + if (key1.equals(key2)) { + out.collect(joinFunction.join(item1, item2)); + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java new file mode 100644 index 0000000..2458f1b --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.co; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.RichFunction; + +/** + * A RichCoFlatMapFunction represents a FlatMap transformation with two different input + * types. In addition to that the user can use the features provided by the + * {@link RichFunction} interface. + * + * @param <IN1> + * Type of the first input. + * @param <IN2> + * Type of the second input. + * @param <OUT> + * Output type. + */ +public abstract class RichCoFlatMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements + CoFlatMapFunction<IN1, IN2, OUT> { + + private static final long serialVersionUID = 1L; +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java new file mode 100755 index 0000000..20d520c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.co; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.RichFunction; + +/** + * A RichCoMapFunction represents a Map transformation with two different input + * types. In addition to that the user can use the features provided by the + * {@link RichFunction} interface. + * + * @param <IN1> + * Type of the first input. + * @param <IN2> + * Type of the second input. + * @param <OUT> + * Output type. + */ +public abstract class RichCoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements + CoMapFunction<IN1, IN2, OUT> { + + private static final long serialVersionUID = 1L; +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java new file mode 100644 index 0000000..655923f --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.co; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.RichFunction; + +/** + * A RichCoReduceFunction represents a Reduce transformation with two different input + * types. In addition to that the user can use the features provided by the + * {@link RichFunction} interface. + * + * @param <IN1> + * Type of the first input. + * @param <IN2> + * Type of the second input. + * @param <OUT> + * Output type. + */ +public abstract class RichCoReduceFunction<IN1, IN2, OUT> extends AbstractRichFunction implements + CoReduceFunction<IN1, IN2, OUT> { + + private static final long serialVersionUID = 1L; +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java new file mode 100644 index 0000000..2709203 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoWindowFunction.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.co; + +import java.util.List; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.util.Collector; + +public abstract class RichCoWindowFunction<IN1, IN2, O> extends AbstractRichFunction implements + CoWindowFunction<IN1, IN2, O> { + + private static final long serialVersionUID = 1L; + + @Override + public abstract void coWindow(List<IN1> first, List<IN2> second, Collector<O> out) + throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java new file mode 100644 index 0000000..24beba1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.sink; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.flink.api.common.io.CleanupWhenUnsuccessful; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple implementation of the SinkFunction writing tuples in the specified + * OutputFormat format. Tuples are collected to a list and written to the file + * periodically. The target path and the overwrite mode are pre-packaged in + * format. + * + * @param <IN> + * Input type + */ +public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(FileSinkFunction.class); + protected ArrayList<IN> tupleList = new ArrayList<IN>(); + protected volatile OutputFormat<IN> format; + protected volatile boolean cleanupCalled = false; + protected int indexInSubtaskGroup; + protected int currentNumberOfSubtasks; + + public FileSinkFunction(OutputFormat<IN> format) { + this.format = format; + } + + @Override + public void open(Configuration parameters) throws Exception { + StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); + format.configure(context.getTaskStubParameters()); + indexInSubtaskGroup = context.getIndexOfThisSubtask(); + currentNumberOfSubtasks = context.getNumberOfParallelSubtasks(); + format.open(indexInSubtaskGroup, currentNumberOfSubtasks); + } + + @Override + public void invoke(IN record) throws Exception { + tupleList.add(record); + if (updateCondition()) { + flush(); + } + } + + @Override + public void close() throws IOException { + if (!tupleList.isEmpty()) { + flush(); + } + try { + format.close(); + } catch (Exception ex) { + try { + if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) { + cleanupCalled = true; + ((CleanupWhenUnsuccessful) format).tryCleanupOnError(); + } + } catch (Throwable t) { + LOG.error("Cleanup on error failed.", t); + } + } + } + + protected void flush() { + try { + for (IN rec : tupleList) { + format.writeRecord(rec); + } + } catch (Exception ex) { + try { + if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) { + cleanupCalled = true; + ((CleanupWhenUnsuccessful) format).tryCleanupOnError(); + } + } catch (Throwable t) { + LOG.error("Cleanup on error failed.", t); + } + } + resetParameters(); + } + + /** + * Condition for writing the contents of tupleList and clearing it. + * + * @return value of the updating condition + */ + protected abstract boolean updateCondition(); + + /** + * Statements to be executed after writing a batch goes here. + */ + protected abstract void resetParameters(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java new file mode 100644 index 0000000..f049a32 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunctionByMillis.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.sink; + +import org.apache.flink.api.common.io.OutputFormat; + +/** + * Implementation of FileSinkFunction. Writes tuples to file in every millis + * milliseconds. + * + * @param <IN> + * Input type + */ +public class FileSinkFunctionByMillis<IN> extends FileSinkFunction<IN> { + private static final long serialVersionUID = 1L; + + private final long millis; + private long lastTime; + + public FileSinkFunctionByMillis(OutputFormat<IN> format, long millis) { + super(format); + this.millis = millis; + lastTime = System.currentTimeMillis(); + } + + /** + * Condition for writing the contents of tupleList and clearing it. + * + * @return value of the updating condition + */ + @Override + protected boolean updateCondition() { + return System.currentTimeMillis() - lastTime >= millis; + } + + /** + * Statements to be executed after writing a batch goes here. + */ + @Override + protected void resetParameters() { + tupleList.clear(); + lastTime = System.currentTimeMillis(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java new file mode 100755 index 0000000..d460749 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.sink; + +import java.io.PrintStream; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; + +/** + * Implementation of the SinkFunction writing every tuple to the standard + * output or standard error stream. + * + * @param <IN> + * Input record type + */ +public class PrintSinkFunction<IN> extends RichSinkFunction<IN> { + private static final long serialVersionUID = 1L; + + private static final boolean STD_OUT = false; + private static final boolean STD_ERR = true; + + private boolean target; + private transient PrintStream stream; + private transient String prefix; + + /** + * Instantiates a print sink function that prints to standard out. + */ + public PrintSinkFunction() {} + + /** + * Instantiates a print sink function that prints to standard out. + * + * @param stdErr True, if the format should print to standard error instead of standard out. + */ + public PrintSinkFunction(boolean stdErr) { + target = stdErr; + } + + public void setTargetToStandardOut() { + target = STD_OUT; + } + + public void setTargetToStandardErr() { + target = STD_ERR; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); + // get the target stream + stream = target == STD_OUT ? System.out : System.err; + + // set the prefix if we have a >1 DOP + prefix = (context.getNumberOfParallelSubtasks() > 1) ? + ((context.getIndexOfThisSubtask() + 1) + "> ") : null; + } + + @Override + public void invoke(IN record) { + if (prefix != null) { + stream.println(prefix + record.toString()); + } + else { + stream.println(record.toString()); + } + } + + @Override + public void close() throws Exception { + this.stream = null; + this.prefix = null; + super.close(); + } + + @Override + public String toString() { + return "Print to " + (target == STD_OUT ? "System.out" : "System.err"); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java new file mode 100755 index 0000000..3b8a4db --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/RichSinkFunction.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.sink; + +import org.apache.flink.api.common.functions.AbstractRichFunction; + +public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> { + + private static final long serialVersionUID = 1L; + + public abstract void invoke(IN value) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java new file mode 100644 index 0000000..6097603 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.sink; + +import java.io.Serializable; + +import org.apache.flink.api.common.functions.Function; + +public interface SinkFunction<IN> extends Function, Serializable { + + public abstract void invoke(IN value) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java new file mode 100644 index 0000000..a606302 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.sink; + +import java.io.Serializable; +import java.util.ArrayList; + +/** + * Abstract class for formatting the output of the writeAsText and writeAsCsv + * functions. + * + * @param <IN> + * Input tuple type + */ +public abstract class WriteFormat<IN> implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * Writes the contents of tupleList to the file specified by path. + * + * @param path + * is the path to the location where the tuples are written + * @param tupleList + * is the list of tuples to be written + */ + protected abstract void write(String path, ArrayList<IN> tupleList); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java new file mode 100644 index 0000000..b22fd80 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.sink; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; + +/** + * Writes tuples in csv format. + * + * @param <IN> + * Input tuple type + */ +public class WriteFormatAsCsv<IN> extends WriteFormat<IN> { + private static final long serialVersionUID = 1L; + + @Override + protected void write(String path, ArrayList<IN> tupleList) { + try { + PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true))); + for (IN tupleToWrite : tupleList) { + String strTuple = tupleToWrite.toString(); + outStream.println(strTuple.substring(1, strTuple.length() - 1)); + } + outStream.close(); + } catch (IOException e) { + throw new RuntimeException("Exception occured while writing file " + path, e); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java new file mode 100644 index 0000000..5891104 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.sink; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; + +/** + * Writes tuples in text format. + * + * @param <IN> + * Input tuple type + */ +public class WriteFormatAsText<IN> extends WriteFormat<IN> { + private static final long serialVersionUID = 1L; + + @Override + public void write(String path, ArrayList<IN> tupleList) { + try { + PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true))); + for (IN tupleToWrite : tupleList) { + outStream.println(tupleToWrite); + } + outStream.close(); + } catch (IOException e) { + throw new RuntimeException("Exception occured while writing file " + path, e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java new file mode 100644 index 0000000..0c52afc --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.sink; + +import java.io.FileNotFoundException; +import java.io.PrintWriter; +import java.util.ArrayList; + +/** + * Simple implementation of the SinkFunction writing tuples as simple text to + * the file specified by path. Tuples are collected to a list and written to the + * file periodically. The file specified by path is created if it does not + * exist, cleared if it exists before the writing. + * + * @param <IN> + * Input tuple type + */ +public abstract class WriteSinkFunction<IN> implements SinkFunction<IN> { + private static final long serialVersionUID = 1L; + + protected final String path; + protected ArrayList<IN> tupleList = new ArrayList<IN>(); + protected WriteFormat<IN> format; + + public WriteSinkFunction(String path, WriteFormat<IN> format) { + this.path = path; + this.format = format; + cleanFile(path); + } + + /** + * Creates target file if it does not exist, cleans it if it exists. + * + * @param path + * is the path to the location where the tuples are written + */ + protected void cleanFile(String path) { + try { + PrintWriter writer; + writer = new PrintWriter(path); + writer.print(""); + writer.close(); + } catch (FileNotFoundException e) { + throw new RuntimeException("File not found " + path, e); + } + } + + /** + * Condition for writing the contents of tupleList and clearing it. + * + * @return value of the updating condition + */ + protected abstract boolean updateCondition(); + + /** + * Statements to be executed after writing a batch goes here. + */ + protected abstract void resetParameters(); + + /** + * Implementation of the invoke method of the SinkFunction class. Collects + * the incoming tuples in tupleList and appends the list to the end of the + * target file if updateCondition() is true or the current tuple is the + * endTuple. + */ + @Override + public void invoke(IN tuple) { + + tupleList.add(tuple); + if (updateCondition()) { + format.write(path, tupleList); + resetParameters(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java new file mode 100644 index 0000000..ee6df94 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.sink; + +/** + * Implementation of WriteSinkFunction. Writes tuples to file in every millis + * milliseconds. + * + * @param <IN> + * Input tuple type + */ +public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> { + private static final long serialVersionUID = 1L; + + private final long millis; + private long lastTime; + + public WriteSinkFunctionByMillis(String path, WriteFormat<IN> format, long millis) { + super(path, format); + this.millis = millis; + lastTime = System.currentTimeMillis(); + } + + @Override + protected boolean updateCondition() { + return System.currentTimeMillis() - lastTime >= millis; + } + + @Override + protected void resetParameters() { + tupleList.clear(); + lastTime = System.currentTimeMillis(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java new file mode 100644 index 0000000..05a2489 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.source; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Long, Long>> { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileMonitoringFunction.class); + + public enum WatchType { + ONLY_NEW_FILES, // Only new files will be processed. + REPROCESS_WITH_APPENDED, // When some files are appended, all contents of the files will be processed. + PROCESS_ONLY_APPENDED // When some files are appended, only appended contents will be processed. + } + + private String path; + private long interval; + private WatchType watchType; + + private FileSystem fileSystem; + private Map<String, Long> offsetOfFiles; + private Map<String, Long> modificationTimes; + + public FileMonitoringFunction(String path, long interval, WatchType watchType) { + this.path = path; + this.interval = interval; + this.watchType = watchType; + this.modificationTimes = new HashMap<String, Long>(); + this.offsetOfFiles = new HashMap<String, Long>(); + } + + @Override + public void invoke(Collector<Tuple3<String, Long, Long>> collector) throws Exception { + fileSystem = FileSystem.get(new URI(path)); + + while (true) { + List<String> files = listNewFiles(); + for (String filePath : files) { + if (watchType == WatchType.ONLY_NEW_FILES + || watchType == WatchType.REPROCESS_WITH_APPENDED) { + collector.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L)); + offsetOfFiles.put(filePath, -1L); + } else if (watchType == WatchType.PROCESS_ONLY_APPENDED) { + long offset = 0; + long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen(); + if (offsetOfFiles.containsKey(filePath)) { + offset = offsetOfFiles.get(filePath); + } + + collector.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize)); + offsetOfFiles.put(filePath, fileSize); + + LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize); + } + } + + Thread.sleep(interval); + } + } + + private List<String> listNewFiles() throws IOException { + List<String> files = new ArrayList<String>(); + + FileStatus[] statuses = fileSystem.listStatus(new Path(path)); + + for (FileStatus status : statuses) { + Path filePath = status.getPath(); + String fileName = filePath.getName(); + long modificationTime = status.getModificationTime(); + + if (!isFiltered(fileName, modificationTime)) { + files.add(filePath.toString()); + modificationTimes.put(fileName, modificationTime); + } + } + return files; + } + + private boolean isFiltered(String fileName, long modificationTime) { + + if ((watchType == WatchType.ONLY_NEW_FILES && modificationTimes.containsKey(fileName)) + || fileName.startsWith(".") || fileName.contains("_COPYING_")) { + return true; + } else { + Long lastModification = modificationTimes.get(fileName); + if (lastModification == null) { + return false; + } else { + return lastModification >= modificationTime; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java new file mode 100644 index 0000000..0882d9e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileReadFunction.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.source; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Collector; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.URI; + +public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, Long>, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Tuple3<String, Long, Long> value, Collector<String> out) throws Exception { + FSDataInputStream stream = FileSystem.get(new URI(value.f0)).open(new Path(value.f0)); + stream.seek(value.f1); + + BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); + String line; + + try { + while ((line = reader.readLine()) != null && (value.f2 == -1L || stream.getPos() <= value.f2)) { + out.collect(line); + } + } finally { + reader.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java new file mode 100644 index 0000000..5dfe4b2 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.source; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; +import org.apache.flink.util.Collector; + +public class FileSourceFunction extends RichSourceFunction<String> { + private static final long serialVersionUID = 1L; + + private InputSplitProvider provider; + + private InputFormat<String, ?> inputFormat; + + private TypeSerializerFactory<String> serializerFactory; + + public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) { + this.inputFormat = format; + this.serializerFactory = createSerializer(typeInfo); + } + + private static TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) { + TypeSerializer<String> serializer = typeInfo.createSerializer(); + + if (serializer.isStateful()) { + return new RuntimeStatefulSerializerFactory<String>(serializer, typeInfo.getTypeClass()); + } else { + return new RuntimeStatelessSerializerFactory<String>(serializer, + typeInfo.getTypeClass()); + } + } + + @Override + public void open(Configuration parameters) throws Exception { + StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); + this.provider = context.getInputSplitProvider(); + inputFormat.configure(context.getTaskStubParameters()); + } + + @Override + public void invoke(Collector<String> collector) throws Exception { + final TypeSerializer<String> serializer = serializerFactory.getSerializer(); + final Iterator<InputSplit> splitIterator = getInputSplits(); + @SuppressWarnings("unchecked") + final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.inputFormat; + try { + while (splitIterator.hasNext()) { + + final InputSplit split = splitIterator.next(); + String record = serializer.createInstance(); + + format.open(split); + try { + while (!format.reachedEnd()) { + if ((record = format.nextRecord(record)) != null) { + collector.collect(record); + } + } + } finally { + format.close(); + } + } + collector.close(); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + private Iterator<InputSplit> getInputSplits() { + + return new Iterator<InputSplit>() { + + private InputSplit nextSplit; + + private boolean exhausted; + + @Override + public boolean hasNext() { + if (exhausted) { + return false; + } + + if (nextSplit != null) { + return true; + } + + InputSplit split = provider.getNextInputSplit(); + + if (split != null) { + this.nextSplit = split; + return true; + } else { + exhausted = true; + return false; + } + } + + @Override + public InputSplit next() { + if (this.nextSplit == null && !hasNext()) { + throw new NoSuchElementException(); + } + + final InputSplit tmp = this.nextSplit; + this.nextSplit = null; + return tmp; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } +}