http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java deleted file mode 100644 index 1d51216..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ /dev/null @@ -1,406 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang3.SerializationException; -import org.apache.commons.lang3.SerializationUtils; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.collector.OutputSelector; -import org.apache.flink.streaming.api.invokable.StreamInvokable; -import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; -import org.apache.flink.streaming.api.streamvertex.StreamVertexException; -import org.apache.flink.streaming.partitioner.ShufflePartitioner; -import org.apache.flink.streaming.partitioner.StreamPartitioner; -import org.apache.flink.streaming.state.OperatorState; -import org.apache.flink.util.InstantiationUtil; - -public class StreamConfig implements Serializable { - - private static final long serialVersionUID = 1L; - - private static final String INPUT_TYPE = "inputType_"; - private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs"; - private static final String NUMBER_OF_INPUTS = "numberOfInputs"; - private static final String CHAINED_OUTPUTS = "chainedOutputs"; - private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_"; - private static final String IS_CHAINED_VERTEX = "isChainedSubtask"; - private static final String OUTPUT_NAME = "outputName_"; - private static final String PARTITIONER_OBJECT = "partitionerObject_"; - private static final String VERTEX_NAME = "vertexName"; - private static final String ITERATION_ID = "iteration-id"; - private static final String OUTPUT_SELECTOR = "outputSelector"; - private static final String DIRECTED_EMIT = "directedEmit"; - private static final String SERIALIZEDUDF = "serializedudf"; - private static final String USER_FUNCTION = "userfunction"; - private static final String BUFFER_TIMEOUT = "bufferTimeout"; - private static final String OPERATOR_STATES = "operatorStates"; - private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1"; - private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2"; - private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1"; - private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2"; - private static final String ITERATON_WAIT = "iterationWait"; - private static final String OUTPUTS = "outVertexNames"; - private static final String EDGES_IN_ORDER = "rwOrder"; - - // DEFAULT VALUES - - private static final long DEFAULT_TIMEOUT = 100; - - // CONFIG METHODS - - private Configuration config; - - public StreamConfig(Configuration config) { - this.config = config; - } - - public Configuration getConfiguration() { - return config; - } - - public void setVertexName(String vertexName) { - config.setString(VERTEX_NAME, vertexName); - } - - public String getTaskName() { - return config.getString(VERTEX_NAME, "Missing"); - } - - public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) { - setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer); - } - - public void setTypeSerializerIn2(StreamRecordSerializer<?> serializer) { - setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer); - } - - public void setTypeSerializerOut1(StreamRecordSerializer<?> serializer) { - setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer); - } - - public void setTypeSerializerOut2(StreamRecordSerializer<?> serializer) { - setTypeSerializer(TYPE_SERIALIZER_OUT_2, serializer); - } - - @SuppressWarnings("unchecked") - public <T> StreamRecordSerializer<T> getTypeSerializerIn1(ClassLoader cl) { - try { - return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config, - TYPE_SERIALIZER_IN_1, cl); - } catch (Exception e) { - throw new RuntimeException("Could not instantiate serializer."); - } - } - - @SuppressWarnings("unchecked") - public <T> StreamRecordSerializer<T> getTypeSerializerIn2(ClassLoader cl) { - try { - return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config, - TYPE_SERIALIZER_IN_2, cl); - } catch (Exception e) { - throw new RuntimeException("Could not instantiate serializer."); - } - } - - @SuppressWarnings("unchecked") - public <T> StreamRecordSerializer<T> getTypeSerializerOut1(ClassLoader cl) { - try { - return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config, - TYPE_SERIALIZER_OUT_1, cl); - } catch (Exception e) { - throw new RuntimeException("Could not instantiate serializer."); - } - } - - @SuppressWarnings("unchecked") - public <T> StreamRecordSerializer<T> getTypeSerializerOut2(ClassLoader cl) { - try { - return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config, - TYPE_SERIALIZER_OUT_2, cl); - } catch (Exception e) { - throw new RuntimeException("Could not instantiate serializer."); - } - } - - private void setTypeSerializer(String key, StreamRecordSerializer<?> typeWrapper) { - config.setBytes(key, SerializationUtils.serialize(typeWrapper)); - } - - public void setBufferTimeout(long timeout) { - config.setLong(BUFFER_TIMEOUT, timeout); - } - - public long getBufferTimeout() { - return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT); - } - - public void setUserInvokable(StreamInvokable<?, ?> invokableObject) { - if (invokableObject != null) { - config.setClass(USER_FUNCTION, invokableObject.getClass()); - - try { - config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(invokableObject)); - } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize invokable object " - + invokableObject.getClass(), e); - } - } - } - - @SuppressWarnings({ "unchecked" }) - public <T> T getUserInvokable(ClassLoader cl) { - try { - return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl); - } catch (Exception e) { - throw new StreamVertexException("Cannot instantiate user function", e); - } - } - - public void setDirectedEmit(boolean directedEmit) { - config.setBoolean(DIRECTED_EMIT, directedEmit); - } - - public boolean isDirectedEmit() { - return config.getBoolean(DIRECTED_EMIT, false); - } - - - public void setOutputSelectors(List<OutputSelector<?>> outputSelector) { - try { - if (outputSelector != null) { - setDirectedEmit(true); - config.setBytes(OUTPUT_SELECTOR, SerializationUtils.serialize((Serializable) outputSelector)); - } - } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize OutputSelector"); - } - } - - @SuppressWarnings("unchecked") - public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader cl) { - try { - return (List<OutputSelector<T>>) InstantiationUtil.readObjectFromConfig(this.config, - OUTPUT_SELECTOR, cl); - } catch (Exception e) { - throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector", e); - } - } - - public void setIterationId(Integer iterationId) { - config.setInteger(ITERATION_ID, iterationId); - } - - public Integer getIterationId() { - return config.getInteger(ITERATION_ID, 0); - } - - public void setIterationWaitTime(long time) { - config.setLong(ITERATON_WAIT, time); - } - - public long getIterationWaitTime() { - return config.getLong(ITERATON_WAIT, 0); - } - - public <T> void setPartitioner(String output, StreamPartitioner<T> partitionerObject) { - - config.setBytes(PARTITIONER_OBJECT + output, - SerializationUtils.serialize(partitionerObject)); - } - - @SuppressWarnings("unchecked") - public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, String output) { - StreamPartitioner<T> partitioner = null; - try { - partitioner = (StreamPartitioner<T>) InstantiationUtil.readObjectFromConfig( - this.config, PARTITIONER_OBJECT + output, cl); - } catch (Exception e) { - throw new RuntimeException("Partitioner could not be instantiated."); - } - if (partitioner != null) { - return partitioner; - } else { - return new ShufflePartitioner<T>(); - } - } - - public void setSelectedNames(String output, List<String> selected) { - if (selected != null) { - config.setBytes(OUTPUT_NAME + output, - SerializationUtils.serialize((Serializable) selected)); - } else { - config.setBytes(OUTPUT_NAME + output, - SerializationUtils.serialize((Serializable) new ArrayList<String>())); - } - } - - @SuppressWarnings("unchecked") - public List<String> getSelectedNames(String output) { - return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output, - null)); - } - - public void setNumberOfInputs(int numberOfInputs) { - config.setInteger(NUMBER_OF_INPUTS, numberOfInputs); - } - - public int getNumberOfInputs() { - return config.getInteger(NUMBER_OF_INPUTS, 0); - } - - public void setNumberOfOutputs(int numberOfOutputs) { - config.setInteger(NUMBER_OF_OUTPUTS, numberOfOutputs); - } - - public int getNumberOfOutputs() { - return config.getInteger(NUMBER_OF_OUTPUTS, 0); - } - - public void setOutputs(List<String> outputVertexNames) { - config.setBytes(OUTPUTS, SerializationUtils.serialize((Serializable) outputVertexNames)); - } - - @SuppressWarnings("unchecked") - public List<String> getOutputs(ClassLoader cl) { - try { - return (List<String>) InstantiationUtil.readObjectFromConfig(this.config, OUTPUTS, cl); - } catch (Exception e) { - throw new RuntimeException("Could not instantiate outputs."); - } - } - - public void setOutEdgesInOrder(List<Tuple2<String, String>> outEdgeList) { - - config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList)); - } - - @SuppressWarnings("unchecked") - public List<Tuple2<String, String>> getOutEdgesInOrder(ClassLoader cl) { - try { - return (List<Tuple2<String, String>>) InstantiationUtil.readObjectFromConfig( - this.config, EDGES_IN_ORDER, cl); - } catch (Exception e) { - throw new RuntimeException("Could not instantiate outputs."); - } - } - - public void setInputIndex(int inputNumber, Integer inputTypeNumber) { - config.setInteger(INPUT_TYPE + inputNumber++, inputTypeNumber); - } - - public int getInputIndex(int inputNumber) { - return config.getInteger(INPUT_TYPE + inputNumber, 0); - } - - public void setOperatorStates(Map<String, OperatorState<?>> states) { - config.setBytes(OPERATOR_STATES, SerializationUtils.serialize((Serializable) states)); - } - - @SuppressWarnings("unchecked") - public Map<String, OperatorState<?>> getOperatorStates(ClassLoader cl) { - try { - return (Map<String, OperatorState<?>>) InstantiationUtil.readObjectFromConfig( - this.config, OPERATOR_STATES, cl); - } catch (Exception e) { - throw new RuntimeException("Could not load operator state"); - } - } - - public void setChainedOutputs(List<String> chainedOutputs) { - config.setBytes(CHAINED_OUTPUTS, - SerializationUtils.serialize((Serializable) chainedOutputs)); - } - - @SuppressWarnings("unchecked") - public List<String> getChainedOutputs(ClassLoader cl) { - try { - return (List<String>) InstantiationUtil.readObjectFromConfig(this.config, - CHAINED_OUTPUTS, cl); - } catch (Exception e) { - throw new RuntimeException("Could not instantiate chained outputs."); - } - } - - public void setTransitiveChainedTaskConfigs(Map<String, StreamConfig> chainedTaskConfigs) { - config.setBytes(CHAINED_TASK_CONFIG, - SerializationUtils.serialize((Serializable) chainedTaskConfigs)); - } - - @SuppressWarnings("unchecked") - public Map<String, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) { - try { - - Map<String, StreamConfig> confs = (Map<String, StreamConfig>) InstantiationUtil - .readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl); - - return confs == null ? new HashMap<String, StreamConfig>() : confs; - } catch (Exception e) { - throw new RuntimeException("Could not instantiate configuration."); - } - } - - public void setChainStart() { - config.setBoolean(IS_CHAINED_VERTEX, true); - } - - public boolean isChainStart() { - return config.getBoolean(IS_CHAINED_VERTEX, false); - } - - @Override - public String toString() { - - ClassLoader cl = getClass().getClassLoader(); - - StringBuilder builder = new StringBuilder(); - builder.append("\n======================="); - builder.append("Stream Config"); - builder.append("======================="); - builder.append("\nTask name: " + getTaskName()); - builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs()); - builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs()); - builder.append("\nOutput names: " + getOutputs(cl)); - builder.append("\nPartitioning:"); - for (String outputname : getOutputs(cl)) { - builder.append("\n\t" + outputname + ": " - + getPartitioner(cl, outputname).getClass().getSimpleName()); - } - - builder.append("\nChained subtasks: " + getChainedOutputs(cl)); - - try { - builder.append("\nInvokable: " + getUserInvokable(cl).getClass().getSimpleName()); - } catch (Exception e) { - builder.append("\nInvokable: Missing"); - } - builder.append("\nBuffer timeout: " + getBufferTimeout()); - if (isChainStart() && getChainedOutputs(cl).size() > 0) { - builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n"); - builder.append(getTransitiveChainedTaskConfigs(cl)).toString(); - } - - return builder.toString(); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java deleted file mode 100644 index b5e43af..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ /dev/null @@ -1,620 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.compiler.plan.StreamingPlan; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.streaming.api.collector.OutputSelector; -import org.apache.flink.streaming.api.invokable.StreamInvokable; -import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; -import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; -import org.apache.flink.streaming.api.streamvertex.CoStreamVertex; -import org.apache.flink.streaming.api.streamvertex.StreamIterationHead; -import org.apache.flink.streaming.api.streamvertex.StreamIterationTail; -import org.apache.flink.streaming.api.streamvertex.StreamVertex; -import org.apache.flink.streaming.partitioner.StreamPartitioner; -import org.apache.flink.streaming.state.OperatorState; -import org.apache.sling.commons.json.JSONArray; -import org.apache.sling.commons.json.JSONException; -import org.apache.sling.commons.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Object for building Apache Flink stream processing graphs - */ -public class StreamGraph extends StreamingPlan { - - private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class); - private final static String DEAFULT_JOB_NAME = "Flink Streaming Job"; - - protected boolean chaining = true; - private String jobName = DEAFULT_JOB_NAME; - - // Graph attributes - private Map<String, Integer> operatorParallelisms; - private Map<String, Long> bufferTimeouts; - private Map<String, List<String>> outEdgeLists; - private Map<String, List<Integer>> outEdgeTypes; - private Map<String, List<List<String>>> selectedNames; - private Map<String, List<String>> inEdgeLists; - private Map<String, List<StreamPartitioner<?>>> outputPartitioners; - private Map<String, String> operatorNames; - private Map<String, StreamInvokable<?, ?>> invokableObjects; - private Map<String, StreamRecordSerializer<?>> typeSerializersIn1; - private Map<String, StreamRecordSerializer<?>> typeSerializersIn2; - private Map<String, StreamRecordSerializer<?>> typeSerializersOut1; - private Map<String, StreamRecordSerializer<?>> typeSerializersOut2; - private Map<String, Class<? extends AbstractInvokable>> jobVertexClasses; - private Map<String, List<OutputSelector<?>>> outputSelectors; - private Map<String, Integer> iterationIds; - private Map<Integer, String> iterationIDtoHeadName; - private Map<Integer, String> iterationIDtoTailName; - private Map<String, Integer> iterationTailCount; - private Map<String, Long> iterationTimeouts; - private Map<String, Map<String, OperatorState<?>>> operatorStates; - private Map<String, InputFormat<String, ?>> inputFormatLists; - - private Set<String> sources; - - public StreamGraph() { - - initGraph(); - - if (LOG.isDebugEnabled()) { - LOG.debug("StreamGraph created"); - } - } - - public void initGraph() { - operatorParallelisms = new HashMap<String, Integer>(); - bufferTimeouts = new HashMap<String, Long>(); - outEdgeLists = new HashMap<String, List<String>>(); - outEdgeTypes = new HashMap<String, List<Integer>>(); - selectedNames = new HashMap<String, List<List<String>>>(); - inEdgeLists = new HashMap<String, List<String>>(); - outputPartitioners = new HashMap<String, List<StreamPartitioner<?>>>(); - operatorNames = new HashMap<String, String>(); - invokableObjects = new HashMap<String, StreamInvokable<?, ?>>(); - typeSerializersIn1 = new HashMap<String, StreamRecordSerializer<?>>(); - typeSerializersIn2 = new HashMap<String, StreamRecordSerializer<?>>(); - typeSerializersOut1 = new HashMap<String, StreamRecordSerializer<?>>(); - typeSerializersOut2 = new HashMap<String, StreamRecordSerializer<?>>(); - outputSelectors = new HashMap<String, List<OutputSelector<?>>>(); - jobVertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>(); - iterationIds = new HashMap<String, Integer>(); - iterationIDtoHeadName = new HashMap<Integer, String>(); - iterationIDtoTailName = new HashMap<Integer, String>(); - iterationTailCount = new HashMap<String, Integer>(); - iterationTimeouts = new HashMap<String, Long>(); - operatorStates = new HashMap<String, Map<String, OperatorState<?>>>(); - inputFormatLists = new HashMap<String, InputFormat<String, ?>>(); - sources = new HashSet<String>(); - } - - /** - * Adds a vertex to the streaming graph with the given parameters - * - * @param vertexName - * Name of the vertex - * @param invokableObject - * User defined operator - * @param inTypeInfo - * Input type for serialization - * @param outTypeInfo - * Output type for serialization - * @param operatorName - * Operator type - * @param parallelism - * Number of parallel instances created - */ - public <IN, OUT> void addStreamVertex(String vertexName, - StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo, - TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) { - - addVertex(vertexName, StreamVertex.class, invokableObject, operatorName, parallelism); - - StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>( - inTypeInfo) : null; - StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>( - outTypeInfo) : null; - - addTypeSerializers(vertexName, inSerializer, null, outSerializer, null); - - if (LOG.isDebugEnabled()) { - LOG.debug("Vertex: {}", vertexName); - } - } - - public <IN, OUT> void addSourceVertex(String vertexName, - StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo, - TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) { - addStreamVertex(vertexName, invokableObject, inTypeInfo, outTypeInfo, operatorName, - parallelism); - sources.add(vertexName); - } - - /** - * Adds a vertex for the iteration head to the {@link JobGraph}. The - * iterated values will be fed from this vertex back to the graph. - * - * @param vertexName - * Name of the vertex - * @param iterationHead - * Id of the iteration head - * @param iterationID - * ID of iteration for multiple iterations - * @param parallelism - * Number of parallel instances created - * @param waitTime - * Max wait time for next record - */ - public void addIterationHead(String vertexName, String iterationHead, Integer iterationID, - int parallelism, long waitTime) { - - addVertex(vertexName, StreamIterationHead.class, null, null, parallelism); - - chaining = false; - - iterationIds.put(vertexName, iterationID); - iterationIDtoHeadName.put(iterationID, vertexName); - - setSerializersFrom(iterationHead, vertexName); - - setEdge(vertexName, iterationHead, - outputPartitioners.get(inEdgeLists.get(iterationHead).get(0)).get(0), 0, - new ArrayList<String>()); - - iterationTimeouts.put(iterationIDtoHeadName.get(iterationID), waitTime); - - if (LOG.isDebugEnabled()) { - LOG.debug("ITERATION SOURCE: {}", vertexName); - } - - sources.add(vertexName); - } - - /** - * Adds a vertex for the iteration tail to the {@link JobGraph}. The values - * intended to be iterated will be sent to this sink from the iteration - * head. - * - * @param vertexName - * Name of the vertex - * @param iterationTail - * Id of the iteration tail - * @param iterationID - * ID of iteration for mulitple iterations - * @param parallelism - * Number of parallel instances created - * @param waitTime - * Max waiting time for next record - */ - public void addIterationTail(String vertexName, String iterationTail, Integer iterationID, - long waitTime) { - - if (bufferTimeouts.get(iterationTail) == 0) { - throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported."); - } - - addVertex(vertexName, StreamIterationTail.class, null, null, getParallelism(iterationTail)); - - iterationIds.put(vertexName, iterationID); - iterationIDtoTailName.put(iterationID, vertexName); - - setSerializersFrom(iterationTail, vertexName); - iterationTimeouts.put(iterationIDtoTailName.get(iterationID), waitTime); - - setParallelism(iterationIDtoHeadName.get(iterationID), getParallelism(iterationTail)); - setBufferTimeout(iterationIDtoHeadName.get(iterationID), bufferTimeouts.get(iterationTail)); - - if (LOG.isDebugEnabled()) { - LOG.debug("ITERATION SINK: {}", vertexName); - } - - } - - public <IN1, IN2, OUT> void addCoTask(String vertexName, - CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeInformation<IN1> in1TypeInfo, - TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, - String operatorName, int parallelism) { - - addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism); - - addTypeSerializers(vertexName, new StreamRecordSerializer<IN1>(in1TypeInfo), - new StreamRecordSerializer<IN2>(in2TypeInfo), new StreamRecordSerializer<OUT>( - outTypeInfo), null); - - if (LOG.isDebugEnabled()) { - LOG.debug("CO-TASK: {}", vertexName); - } - } - - /** - * Sets vertex parameters in the JobGraph - * - * @param vertexName - * Name of the vertex - * @param vertexClass - * The class of the vertex - * @param invokableObjectject - * The user defined invokable object - * @param operatorName - * Type of the user defined operator - * @param parallelism - * Number of parallel instances created - */ - private void addVertex(String vertexName, Class<? extends AbstractInvokable> vertexClass, - StreamInvokable<?, ?> invokableObject, String operatorName, int parallelism) { - - jobVertexClasses.put(vertexName, vertexClass); - setParallelism(vertexName, parallelism); - invokableObjects.put(vertexName, invokableObject); - operatorNames.put(vertexName, operatorName); - outEdgeLists.put(vertexName, new ArrayList<String>()); - outEdgeTypes.put(vertexName, new ArrayList<Integer>()); - selectedNames.put(vertexName, new ArrayList<List<String>>()); - outputSelectors.put(vertexName, new ArrayList<OutputSelector<?>>()); - inEdgeLists.put(vertexName, new ArrayList<String>()); - outputPartitioners.put(vertexName, new ArrayList<StreamPartitioner<?>>()); - iterationTailCount.put(vertexName, 0); - } - - /** - * Connects two vertices in the JobGraph using the selected partitioner - * settings - * - * @param upStreamVertexName - * Name of the upstream(output) vertex - * @param downStreamVertexName - * Name of the downstream(input) vertex - * @param partitionerObject - * Partitioner object - * @param typeNumber - * Number of the type (used at co-functions) - * @param outputNames - * User defined names of the out edge - */ - public void setEdge(String upStreamVertexName, String downStreamVertexName, - StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames) { - outEdgeLists.get(upStreamVertexName).add(downStreamVertexName); - outEdgeTypes.get(upStreamVertexName).add(typeNumber); - inEdgeLists.get(downStreamVertexName).add(upStreamVertexName); - outputPartitioners.get(upStreamVertexName).add(partitionerObject); - selectedNames.get(upStreamVertexName).add(outputNames); - } - - private void addTypeSerializers(String vertexName, StreamRecordSerializer<?> in1, - StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out1, - StreamRecordSerializer<?> out2) { - typeSerializersIn1.put(vertexName, in1); - typeSerializersIn2.put(vertexName, in2); - typeSerializersOut1.put(vertexName, out1); - typeSerializersOut2.put(vertexName, out2); - } - - /** - * Sets the number of parallel instances created for the given vertex. - * - * @param vertexName - * Name of the vertex - * @param parallelism - * Number of parallel instances created - */ - public void setParallelism(String vertexName, int parallelism) { - operatorParallelisms.put(vertexName, parallelism); - } - - public int getParallelism(String vertexName) { - return operatorParallelisms.get(vertexName); - } - - /** - * Sets the input format for the given vertex. - * - * @param vertexName - * Name of the vertex - * @param inputFormat - * input format of the file source associated with the given - * vertex - */ - public void setInputFormat(String vertexName, InputFormat<String, ?> inputFormat) { - inputFormatLists.put(vertexName, inputFormat); - } - - public void setBufferTimeout(String vertexName, long bufferTimeout) { - this.bufferTimeouts.put(vertexName, bufferTimeout); - } - - public long getBufferTimeout(String vertexName) { - return this.bufferTimeouts.get(vertexName); - } - - public void addOperatorState(String veretxName, String stateName, OperatorState<?> state) { - Map<String, OperatorState<?>> states = operatorStates.get(veretxName); - if (states == null) { - states = new HashMap<String, OperatorState<?>>(); - states.put(stateName, state); - } else { - if (states.containsKey(stateName)) { - throw new RuntimeException("State has already been registered with this name: " - + stateName); - } else { - states.put(stateName, state); - } - } - operatorStates.put(veretxName, states); - } - - /** - * Sets a user defined {@link OutputSelector} for the given operator. Used - * for directed emits. - * - * @param vertexName - * Name of the vertex for which the output selector will be set - * @param outputSelector - * The user defined output selector. - */ - public <T> void setOutputSelector(String vertexName, OutputSelector<T> outputSelector) { - outputSelectors.get(vertexName).add(outputSelector); - - if (LOG.isDebugEnabled()) { - LOG.debug("Outputselector set for {}", vertexName); - } - - } - - public <IN, OUT> void setInvokable(String vertexName, StreamInvokable<IN, OUT> invokableObject) { - invokableObjects.put(vertexName, invokableObject); - } - - public <OUT> void setOutType(String id, TypeInformation<OUT> outType) { - StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType); - typeSerializersOut1.put(id, serializer); - } - - public StreamInvokable<?, ?> getInvokable(String vertexName) { - return invokableObjects.get(vertexName); - } - - @SuppressWarnings("unchecked") - public <OUT> StreamRecordSerializer<OUT> getOutSerializer1(String vertexName) { - return (StreamRecordSerializer<OUT>) typeSerializersOut1.get(vertexName); - } - - @SuppressWarnings("unchecked") - public <OUT> StreamRecordSerializer<OUT> getOutSerializer2(String vertexName) { - return (StreamRecordSerializer<OUT>) typeSerializersOut2.get(vertexName); - } - - @SuppressWarnings("unchecked") - public <IN> StreamRecordSerializer<IN> getInSerializer1(String vertexName) { - return (StreamRecordSerializer<IN>) typeSerializersIn1.get(vertexName); - } - - @SuppressWarnings("unchecked") - public <IN> StreamRecordSerializer<IN> getInSerializer2(String vertexName) { - return (StreamRecordSerializer<IN>) typeSerializersIn2.get(vertexName); - } - - /** - * Sets TypeSerializerWrapper from one vertex to another, used with some - * sinks. - * - * @param from - * from - * @param to - * to - */ - public void setSerializersFrom(String from, String to) { - operatorNames.put(to, operatorNames.get(from)); - - typeSerializersIn1.put(to, typeSerializersOut1.get(from)); - typeSerializersIn2.put(to, typeSerializersOut2.get(from)); - typeSerializersOut1.put(to, typeSerializersOut1.get(from)); - typeSerializersOut2.put(to, typeSerializersOut2.get(from)); - } - - /** - * Gets the assembled {@link JobGraph} and adds a default name for it. - */ - public JobGraph getJobGraph() { - return getJobGraph(jobName); - } - - /** - * Gets the assembled {@link JobGraph} and adds a user specified name for - * it. - * - * @param jobGraphName - * name of the jobGraph - */ - public JobGraph getJobGraph(String jobGraphName) { - - this.jobName = jobGraphName; - StreamingJobGraphGenerator optimizer = new StreamingJobGraphGenerator(this); - - return optimizer.createJobGraph(jobGraphName); - } - - public void setJobName(String jobName) { - this.jobName = jobName; - } - - public void setChaining(boolean chaining) { - this.chaining = chaining; - } - - public Collection<String> getSources() { - return sources; - } - - public List<String> getOutEdges(String vertexName) { - return outEdgeLists.get(vertexName); - } - - public List<String> getInEdges(String vertexName) { - return inEdgeLists.get(vertexName); - } - - public List<Integer> getOutEdgeTypes(String vertexName) { - - return outEdgeTypes.get(vertexName); - } - - public StreamPartitioner<?> getOutPartitioner(String upStreamVertex, String downStreamVertex) { - return outputPartitioners.get(upStreamVertex).get( - outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex)); - } - - public List<String> getSelectedNames(String upStreamVertex, String downStreamVertex) { - - return selectedNames.get(upStreamVertex).get( - outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex)); - } - - public Collection<Integer> getIterationIDs() { - return new HashSet<Integer>(iterationIds.values()); - } - - public String getIterationTail(int iterID) { - return iterationIDtoTailName.get(iterID); - } - - public String getIterationHead(int iterID) { - return iterationIDtoHeadName.get(iterID); - } - - public Class<? extends AbstractInvokable> getJobVertexClass(String vertexName) { - return jobVertexClasses.get(vertexName); - } - - public InputFormat<String, ?> getInputFormat(String vertexName) { - return inputFormatLists.get(vertexName); - } - - public List<OutputSelector<?>> getOutputSelector(String vertexName) { - return outputSelectors.get(vertexName); - } - - public Map<String, OperatorState<?>> getState(String vertexName) { - return operatorStates.get(vertexName); - } - - public Integer getIterationID(String vertexName) { - return iterationIds.get(vertexName); - } - - public long getIterationTimeout(String vertexName) { - return iterationTimeouts.get(vertexName); - } - - public String getOperatorName(String vertexName) { - return operatorNames.get(vertexName); - } - - @Override - public String getStreamingPlanAsJSON() { - - try { - JSONObject json = new JSONObject(); - JSONArray nodes = new JSONArray(); - - json.put("nodes", nodes); - - for (String id : operatorNames.keySet()) { - JSONObject node = new JSONObject(); - nodes.put(node); - - node.put("id", Integer.valueOf(id)); - node.put("type", getOperatorName(id)); - - if (sources.contains(id)) { - node.put("pact", "Data Source"); - } else { - node.put("pact", "Data Stream"); - } - - node.put("contents", getOperatorName(id) + " at " - + getInvokable(id).getUserFunction().getClass().getSimpleName()); - node.put("parallelism", getParallelism(id)); - - int numIn = getInEdges(id).size(); - if (numIn > 0) { - - JSONArray inputs = new JSONArray(); - node.put("predecessors", inputs); - - for (int i = 0; i < numIn; i++) { - - String inID = getInEdges(id).get(i); - - JSONObject input = new JSONObject(); - inputs.put(input); - - input.put("id", Integer.valueOf(inID)); - input.put("ship_strategy", getOutPartitioner(inID, id).getStrategy()); - if (i == 0) { - input.put("side", "first"); - } else if (i == 1) { - input.put("side", "second"); - } - } - } - - } - return json.toString(); - } catch (JSONException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("JSON plan creation failed: {}", e); - } - return ""; - } - - } - - @Override - public void dumpStreamingPlanAsJSON(File file) throws IOException { - PrintWriter pw = null; - try { - pw = new PrintWriter(new FileOutputStream(file), false); - pw.write(getStreamingPlanAsJSON()); - pw.flush(); - - } finally { - if (pw != null) { - pw.close(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java deleted file mode 100644 index 24ebf46..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java +++ /dev/null @@ -1,293 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.AbstractJobVertex; -import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.ScheduleMode; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.streaming.api.invokable.StreamInvokable; -import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy; -import org.apache.flink.streaming.api.streamvertex.StreamIterationHead; -import org.apache.flink.streaming.api.streamvertex.StreamIterationTail; -import org.apache.flink.streaming.partitioner.StreamPartitioner; -import org.apache.flink.streaming.partitioner.StreamPartitioner.PartitioningStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class StreamingJobGraphGenerator { - - private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class); - - private StreamGraph streamGraph; - - private Map<String, AbstractJobVertex> streamVertices; - private JobGraph jobGraph; - private Collection<String> builtNodes; - - private Map<String, Map<String, StreamConfig>> chainedConfigs; - private Map<String, StreamConfig> vertexConfigs; - private Map<String, String> chainedNames; - - public StreamingJobGraphGenerator(StreamGraph streamGraph) { - this.streamGraph = streamGraph; - } - - private void init() { - this.streamVertices = new HashMap<String, AbstractJobVertex>(); - this.builtNodes = new HashSet<String>(); - this.chainedConfigs = new HashMap<String, Map<String, StreamConfig>>(); - this.vertexConfigs = new HashMap<String, StreamConfig>(); - this.chainedNames = new HashMap<String, String>(); - } - - public JobGraph createJobGraph(String jobName) { - jobGraph = new JobGraph(jobName); - // Turn lazy scheduling off - jobGraph.setScheduleMode(ScheduleMode.ALL); - - init(); - - for (String sourceName : streamGraph.getSources()) { - createChain(sourceName, sourceName); - } - - setSlotSharing(); - - return jobGraph; - } - - private List<Tuple2<String, String>> createChain(String startNode, String current) { - - if (!builtNodes.contains(startNode)) { - - List<Tuple2<String, String>> transitiveOutEdges = new ArrayList<Tuple2<String, String>>(); - List<String> chainableOutputs = new ArrayList<String>(); - List<String> nonChainableOutputs = new ArrayList<String>(); - - for (String outName : streamGraph.getOutEdges(current)) { - if (isChainable(current, outName)) { - chainableOutputs.add(outName); - } else { - nonChainableOutputs.add(outName); - } - } - - for (String chainable : chainableOutputs) { - transitiveOutEdges.addAll(createChain(startNode, chainable)); - } - - for (String nonChainable : nonChainableOutputs) { - transitiveOutEdges.add(new Tuple2<String, String>(current, nonChainable)); - createChain(nonChainable, nonChainable); - } - - chainedNames.put(current, createChainedName(current, chainableOutputs)); - - StreamConfig config = current.equals(startNode) ? createProcessingVertex(startNode) - : new StreamConfig(new Configuration()); - - setVertexConfig(current, config, chainableOutputs, nonChainableOutputs); - - if (current.equals(startNode)) { - - config.setChainStart(); - config.setOutEdgesInOrder(transitiveOutEdges); - - for (Tuple2<String, String> edge : transitiveOutEdges) { - connect(startNode, edge); - } - - config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNode)); - - } else { - - Map<String, StreamConfig> chainedConfs = chainedConfigs.get(startNode); - - if (chainedConfs == null) { - chainedConfigs.put(startNode, new HashMap<String, StreamConfig>()); - } - chainedConfigs.get(startNode).put(current, config); - } - - return transitiveOutEdges; - - } else { - return new ArrayList<Tuple2<String, String>>(); - } - } - - private String createChainedName(String vertexID, List<String> chainedOutputs) { - String vertexName = streamGraph.getOperatorName(vertexID); - if (chainedOutputs.size() > 1) { - List<String> outputChainedNames = new ArrayList<String>(); - for (String chainable : chainedOutputs) { - outputChainedNames.add(chainedNames.get(chainable)); - } - return vertexName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")"; - } else if (chainedOutputs.size() == 1) { - return vertexName + " -> " + chainedNames.get(chainedOutputs.get(0)); - } else { - return vertexName; - } - - } - - private StreamConfig createProcessingVertex(String vertexName) { - - AbstractJobVertex vertex = new AbstractJobVertex(chainedNames.get(vertexName)); - - vertex.setInvokableClass(streamGraph.getJobVertexClass(vertexName)); - if (streamGraph.getParallelism(vertexName) > 0) { - vertex.setParallelism(streamGraph.getParallelism(vertexName)); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Parallelism set: {} for {}", streamGraph.getParallelism(vertexName), - vertexName); - } - - if (streamGraph.getInputFormat(vertexName) != null) { - vertex.setInputSplitSource(streamGraph.getInputFormat(vertexName)); - } - - streamVertices.put(vertexName, vertex); - builtNodes.add(vertexName); - jobGraph.addVertex(vertex); - - return new StreamConfig(vertex.getConfiguration()); - } - - private void setVertexConfig(String vertexName, StreamConfig config, - List<String> chainableOutputs, List<String> nonChainableOutputs) { - - config.setVertexName(vertexName); - config.setBufferTimeout(streamGraph.getBufferTimeout(vertexName)); - - config.setTypeSerializerIn1(streamGraph.getInSerializer1(vertexName)); - config.setTypeSerializerIn2(streamGraph.getInSerializer2(vertexName)); - config.setTypeSerializerOut1(streamGraph.getOutSerializer1(vertexName)); - config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexName)); - - config.setUserInvokable(streamGraph.getInvokable(vertexName)); - config.setOutputSelectors(streamGraph.getOutputSelector(vertexName)); - config.setOperatorStates(streamGraph.getState(vertexName)); - - config.setNumberOfOutputs(nonChainableOutputs.size()); - config.setOutputs(nonChainableOutputs); - config.setChainedOutputs(chainableOutputs); - - Class<? extends AbstractInvokable> vertexClass = streamGraph.getJobVertexClass(vertexName); - - if (vertexClass.equals(StreamIterationHead.class) - || vertexClass.equals(StreamIterationTail.class)) { - config.setIterationId(streamGraph.getIterationID(vertexName)); - config.setIterationWaitTime(streamGraph.getIterationTimeout(vertexName)); - } - - List<String> allOutputs = new ArrayList<String>(chainableOutputs); - allOutputs.addAll(nonChainableOutputs); - - for (String output : allOutputs) { - config.setSelectedNames(output, streamGraph.getSelectedNames(vertexName, output)); - } - - vertexConfigs.put(vertexName, config); - } - - private <T> void connect(String headOfChain, Tuple2<String, String> edge) { - - String upStreamVertexName = edge.f0; - String downStreamVertexName = edge.f1; - - int outputIndex = streamGraph.getOutEdges(upStreamVertexName).indexOf(downStreamVertexName); - - AbstractJobVertex headVertex = streamVertices.get(headOfChain); - AbstractJobVertex downStreamVertex = streamVertices.get(downStreamVertexName); - - StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration()); - StreamConfig upStreamConfig = new StreamConfig(headVertex.getConfiguration()); - - List<Integer> outEdgeIndexList = streamGraph.getOutEdgeTypes(upStreamVertexName); - int numOfInputs = downStreamConfig.getNumberOfInputs(); - - downStreamConfig.setInputIndex(numOfInputs++, outEdgeIndexList.get(outputIndex)); - downStreamConfig.setNumberOfInputs(numOfInputs); - - StreamPartitioner<?> partitioner = streamGraph.getOutPartitioner(upStreamVertexName, - downStreamVertexName); - - upStreamConfig.setPartitioner(downStreamVertexName, partitioner); - - if (partitioner.getStrategy() == PartitioningStrategy.FORWARD) { - downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE); - } else { - downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(), - headOfChain, downStreamVertexName); - } - } - - private boolean isChainable(String vertexName, String outName) { - - StreamInvokable<?, ?> headInvokable = streamGraph.getInvokable(vertexName); - StreamInvokable<?, ?> outInvokable = streamGraph.getInvokable(outName); - - return streamGraph.getInEdges(outName).size() == 1 - && outInvokable != null - && outInvokable.getChainingStrategy() == ChainingStrategy.ALWAYS - && (headInvokable.getChainingStrategy() == ChainingStrategy.HEAD || headInvokable - .getChainingStrategy() == ChainingStrategy.ALWAYS) - && streamGraph.getOutPartitioner(vertexName, outName).getStrategy() == PartitioningStrategy.FORWARD - && streamGraph.getParallelism(vertexName) == streamGraph.getParallelism(outName) - && streamGraph.chaining; - } - - private void setSlotSharing() { - SlotSharingGroup shareGroup = new SlotSharingGroup(); - - for (AbstractJobVertex vertex : streamVertices.values()) { - vertex.setSlotSharingGroup(shareGroup); - } - - for (Integer iterID : streamGraph.getIterationIDs()) { - CoLocationGroup ccg = new CoLocationGroup(); - AbstractJobVertex tail = streamVertices.get(streamGraph.getIterationTail(iterID)); - AbstractJobVertex head = streamVertices.get(streamGraph.getIterationHead(iterID)); - - ccg.addVertex(head); - ccg.addVertex(tail); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java deleted file mode 100644 index 1281bf0..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector; - -import java.util.LinkedList; -import java.util.List; - -import org.apache.flink.util.Collector; - -public class CollectorWrapper<OUT> implements Collector<OUT> { - - private List<Collector<OUT>> outputs; - - public CollectorWrapper() { - this.outputs = new LinkedList<Collector<OUT>>(); - } - - @SuppressWarnings("unchecked") - public void addCollector(Collector<?> output) { - outputs.add((Collector<OUT>) output); - } - - @Override - public void collect(OUT record) { - for(Collector<OUT> output: outputs){ - output.collect(record); - } - } - - @Override - public void close() { - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java deleted file mode 100755 index 4681cd3..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A StreamCollector that uses user defined output names and a user defined - * output selector to make directed emits. - * - * @param <OUT> - * Type of the Tuple collected. - */ -public class DirectedCollectorWrapper<OUT> extends CollectorWrapper<OUT> { - - private static final Logger LOG = LoggerFactory.getLogger(DirectedCollectorWrapper.class); - - List<OutputSelector<OUT>> outputSelectors; - - protected Map<String, List<Collector<OUT>>> outputMap; - - private List<Collector<OUT>> selectAllOutputs; - private Set<Collector<OUT>> emitted; - - /** - * Creates a new DirectedStreamCollector - * - * @param outputSelector - * User defined {@link OutputSelector} - */ - public DirectedCollectorWrapper(List<OutputSelector<OUT>> outputSelectors) { - this.outputSelectors = outputSelectors; - this.emitted = new HashSet<Collector<OUT>>(); - this.selectAllOutputs = new LinkedList<Collector<OUT>>(); - this.outputMap = new HashMap<String, List<Collector<OUT>>>(); - - } - - @Override - public void addCollector(Collector<?> output) { - addCollector(output, new ArrayList<String>()); - } - - @SuppressWarnings("unchecked") - public void addCollector(Collector<?> output, List<String> selectedNames) { - - if (selectedNames.isEmpty()) { - selectAllOutputs.add((Collector<OUT>) output); - } else { - for (String selectedName : selectedNames) { - - if (!outputMap.containsKey(selectedName)) { - outputMap.put(selectedName, new LinkedList<Collector<OUT>>()); - outputMap.get(selectedName).add((Collector<OUT>) output); - } else { - if (!outputMap.get(selectedName).contains(output)) { - outputMap.get(selectedName).add((Collector<OUT>) output); - } - } - - } - } - } - - @Override - public void collect(OUT record) { - emitted.clear(); - - for (Collector<OUT> output : selectAllOutputs) { - output.collect(record); - emitted.add(output); - } - - for (OutputSelector<OUT> outputSelector : outputSelectors) { - Iterable<String> outputNames = outputSelector.select(record); - - for (String outputName : outputNames) { - List<Collector<OUT>> outputList = outputMap.get(outputName); - if (outputList == null) { - if (LOG.isErrorEnabled()) { - String format = String.format( - "Cannot emit because no output is selected with the name: %s", - outputName); - LOG.error(format); - - } - } else { - for (Collector<OUT> output : outputList) { - if (!emitted.contains(output)) { - output.collect(record); - emitted.add(output); - } - } - - } - - } - } - - } - - @Override - public void close() { - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java deleted file mode 100644 index 6dbcff4..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector; - -import java.io.Serializable; - -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.datastream.SplitDataStream; - -/** - * Interface for defining an OutputSelector for a {@link SplitDataStream} using - * the {@link SingleOutputStreamOperator#split} call. Every output object of a - * {@link SplitDataStream} will run through this operator to select outputs. - * - * @param <OUT> - * Type parameter of the split values. - */ -public interface OutputSelector<OUT> extends Serializable { - /** - * Method for selecting output names for the emitted objects when using the - * {@link SingleOutputStreamOperator#split} method. The values will be - * emitted only to output names which are contained in the returned - * iterable. - * - * @param value - * Output object for which the output selection should be made. - */ - public Iterable<String> select(OUT value); -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java deleted file mode 100644 index 4551c5a..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector; - -import java.io.IOException; - -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.streaming.io.StreamRecordWriter; -import org.apache.flink.util.Collector; -import org.apache.flink.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class StreamOutput<OUT> implements Collector<OUT> { - - private static final Logger LOG = LoggerFactory.getLogger(StreamOutput.class); - - private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output; - private SerializationDelegate<StreamRecord<OUT>> serializationDelegate; - private StreamRecord<OUT> streamRecord; - private int channelID; - - public StreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output, - int channelID, SerializationDelegate<StreamRecord<OUT>> serializationDelegate) { - - this.serializationDelegate = serializationDelegate; - - if (serializationDelegate != null) { - this.streamRecord = serializationDelegate.getInstance(); - } else { - throw new RuntimeException("Serializer cannot be null"); - } - this.channelID = channelID; - this.output = output; - } - - public RecordWriter<SerializationDelegate<StreamRecord<OUT>>> getRecordWriter() { - return output; - } - - @Override - public void collect(OUT record) { - streamRecord.setObject(record); - streamRecord.newId(channelID); - serializationDelegate.setInstance(streamRecord); - - try { - output.emit(serializationDelegate); - } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Emit failed due to: {}", StringUtils.stringifyException(e)); - } - } - } - - @Override - public void close() { - if (output instanceof StreamRecordWriter) { - ((StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>) output).close(); - } else { - try { - output.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java deleted file mode 100755 index a6eade8..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ /dev/null @@ -1,419 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.datastream; - -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.ClosureCleaner; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.StreamGraph; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.co.CoFlatMapFunction; -import org.apache.flink.streaming.api.function.co.CoMapFunction; -import org.apache.flink.streaming.api.function.co.CoReduceFunction; -import org.apache.flink.streaming.api.function.co.CoWindowFunction; -import org.apache.flink.streaming.api.function.co.RichCoMapFunction; -import org.apache.flink.streaming.api.function.co.RichCoReduceFunction; -import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable; -import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable; -import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; -import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable; -import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable; -import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; -import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp; -import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; - -/** - * The ConnectedDataStream represents a stream for two different data types. It - * can be used to apply transformations like {@link CoMapFunction} on two - * {@link DataStream}s - * - * @param <IN1> - * Type of the first input data steam. - * @param <IN2> - * Type of the second input data stream. - */ -public class ConnectedDataStream<IN1, IN2> { - - protected StreamExecutionEnvironment environment; - protected StreamGraph jobGraphBuilder; - protected DataStream<IN1> dataStream1; - protected DataStream<IN2> dataStream2; - - protected boolean isGrouped; - protected KeySelector<IN1, ?> keySelector1; - protected KeySelector<IN2, ?> keySelector2; - - protected ConnectedDataStream(DataStream<IN1> input1, DataStream<IN2> input2) { - this.jobGraphBuilder = input1.streamGraph; - this.environment = input1.environment; - this.dataStream1 = input1.copy(); - this.dataStream2 = input2.copy(); - - if ((input1 instanceof GroupedDataStream) && (input2 instanceof GroupedDataStream)) { - this.isGrouped = true; - this.keySelector1 = ((GroupedDataStream<IN1>) input1).keySelector; - this.keySelector2 = ((GroupedDataStream<IN2>) input2).keySelector; - } else { - this.isGrouped = false; - this.keySelector1 = null; - this.keySelector2 = null; - } - } - - protected ConnectedDataStream(ConnectedDataStream<IN1, IN2> coDataStream) { - this.jobGraphBuilder = coDataStream.jobGraphBuilder; - this.environment = coDataStream.environment; - this.dataStream1 = coDataStream.getFirst(); - this.dataStream2 = coDataStream.getSecond(); - this.isGrouped = coDataStream.isGrouped; - this.keySelector1 = coDataStream.keySelector1; - this.keySelector2 = coDataStream.keySelector2; - } - - public <F> F clean(F f) { - if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) { - ClosureCleaner.clean(f, true); - } - ClosureCleaner.ensureSerializable(f); - return f; - } - - public StreamExecutionEnvironment getExecutionEnvironment() { - return environment; - } - - /** - * Returns the first {@link DataStream}. - * - * @return The first DataStream. - */ - public DataStream<IN1> getFirst() { - return dataStream1.copy(); - } - - /** - * Returns the second {@link DataStream}. - * - * @return The second DataStream. - */ - public DataStream<IN2> getSecond() { - return dataStream2.copy(); - } - - /** - * Gets the type of the first input - * - * @return The type of the first input - */ - public TypeInformation<IN1> getInputType1() { - return dataStream1.getType(); - } - - /** - * Gets the type of the second input - * - * @return The type of the second input - */ - public TypeInformation<IN2> getInputType2() { - return dataStream2.getType(); - } - - /** - * GroupBy operation for connected data stream. Groups the elements of - * input1 and input2 according to keyPosition1 and keyPosition2. Used for - * applying function on grouped data streams for example - * {@link ConnectedDataStream#reduce} - * - * @param keyPosition1 - * The field used to compute the hashcode of the elements in the - * first input stream. - * @param keyPosition2 - * The field used to compute the hashcode of the elements in the - * second input stream. - * @return @return The transformed {@link ConnectedDataStream} - */ - public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) { - return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(keyPosition1), - dataStream2.groupBy(keyPosition2)); - } - - /** - * GroupBy operation for connected data stream. Groups the elements of - * input1 and input2 according to keyPositions1 and keyPositions2. Used for - * applying function on grouped data streams for example - * {@link ConnectedDataStream#reduce} - * - * @param keyPositions1 - * The fields used to group the first input stream. - * @param keyPositions2 - * The fields used to group the second input stream. - * @return @return The transformed {@link ConnectedDataStream} - */ - public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] keyPositions2) { - return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(keyPositions1), - dataStream2.groupBy(keyPositions2)); - } - - /** - * GroupBy operation for connected data stream using key expressions. Groups - * the elements of input1 and input2 according to field1 and field2. A field - * expression is either the name of a public field or a getter method with - * parentheses of the {@link DataStream}S underlying type. A dot can be used - * to drill down into objects, as in {@code "field1.getInnerField2()" }. - * - * @param field1 - * The grouping expression for the first input - * @param field2 - * The grouping expression for the second input - * @return The grouped {@link ConnectedDataStream} - */ - public ConnectedDataStream<IN1, IN2> groupBy(String field1, String field2) { - return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(field1), - dataStream2.groupBy(field2)); - } - - /** - * GroupBy operation for connected data stream using key expressions. Groups - * the elements of input1 and input2 according to fields1 and fields2. A - * field expression is either the name of a public field or a getter method - * with parentheses of the {@link DataStream}S underlying type. A dot can be - * used to drill down into objects, as in {@code "field1.getInnerField2()" } - * . - * - * @param fields1 - * The grouping expressions for the first input - * @param fields2 - * The grouping expressions for the second input - * @return The grouped {@link ConnectedDataStream} - */ - public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] fields2) { - return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(fields1), - dataStream2.groupBy(fields2)); - } - - /** - * GroupBy operation for connected data stream. Groups the elements of - * input1 and input2 using keySelector1 and keySelector2. Used for applying - * function on grouped data streams for example - * {@link ConnectedDataStream#reduce} - * - * @param keySelector1 - * The {@link KeySelector} used for grouping the first input - * @param keySelector2 - * The {@link KeySelector} used for grouping the second input - * @return @return The transformed {@link ConnectedDataStream} - */ - public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> keySelector1, - KeySelector<IN2, ?> keySelector2) { - return new ConnectedDataStream<IN1, IN2>(dataStream1.groupBy(keySelector1), - dataStream2.groupBy(keySelector2)); - } - - /** - * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps - * the output to a common type. The transformation calls a - * {@link CoMapFunction#map1} for each element of the first input and - * {@link CoMapFunction#map2} for each element of the second input. Each - * CoMapFunction call returns exactly one element. The user can also extend - * {@link RichCoMapFunction} to gain access to other features provided by - * the {@link RichFuntion} interface. - * - * @param coMapper - * The CoMapFunction used to jointly transform the two input - * DataStreams - * @return The transformed {@link DataStream} - */ - public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) { - TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoMapFunction.class, - coMapper.getClass(), 2, null, null); - - return addCoFunction("Co-Map", outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>( - clean(coMapper))); - - } - - /** - * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and - * maps the output to a common type. The transformation calls a - * {@link CoFlatMapFunction#flatMap1} for each element of the first input - * and {@link CoFlatMapFunction#flatMap2} for each element of the second - * input. Each CoFlatMapFunction call returns any number of elements - * including none. The user can also extend {@link RichFlatMapFunction} to - * gain access to other features provided by the {@link RichFuntion} - * interface. - * - * @param coFlatMapper - * The CoFlatMapFunction used to jointly transform the two input - * DataStreams - * @return The transformed {@link DataStream} - */ - public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap( - CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) { - TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoFlatMapFunction.class, - coFlatMapper.getClass(), 2, null, null); - - return addCoFunction("Co-Flat Map", outTypeInfo, new CoFlatMapInvokable<IN1, IN2, OUT>( - clean(coFlatMapper))); - } - - /** - * Applies a reduce transformation on a {@link ConnectedDataStream} and maps - * the outputs to a common type. If the {@link ConnectedDataStream} is - * batched or windowed then the reduce transformation is applied on every - * sliding batch/window of the data stream. If the connected data stream is - * grouped then the reducer is applied on every group of elements sharing - * the same key. This type of reduce is much faster than reduceGroup since - * the reduce function can be applied incrementally. The user can also - * extend the {@link RichCoReduceFunction} to gain access to other features - * provided by the {@link RichFuntion} interface. - * - * @param coReducer - * The {@link CoReduceFunction} that will be called for every - * element of the inputs. - * @return The transformed {@link DataStream}. - */ - public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) { - - TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoReduceFunction.class, - coReducer.getClass(), 2, null, null); - - return addCoFunction("Co-Reduce", outTypeInfo, getReduceInvokable(clean(coReducer))); - - } - - /** - * Applies a CoWindow transformation on the connected DataStreams. The - * transformation calls the {@link CoWindowFunction#coWindow} method for for - * time aligned windows of the two data streams. System time is used as - * default to compute windows. - * - * @param coWindowFunction - * The {@link CoWindowFunction} that will be applied for the time - * windows. - * @param windowSize - * Size of the windows that will be aligned for both streams in - * milliseconds. - * @param slideInterval - * After every function call the windows will be slid by this - * interval. - * - * @return The transformed {@link DataStream}. - */ - @SuppressWarnings("unchecked") - public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce( - CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize, long slideInterval) { - return windowReduce(coWindowFunction, windowSize, slideInterval, - (TimestampWrapper<IN1>) SystemTimestamp.getWrapper(), - (TimestampWrapper<IN2>) SystemTimestamp.getWrapper()); - } - - /** - * Applies a CoWindow transformation on the connected DataStreams. The - * transformation calls the {@link CoWindowFunction#coWindow} method for - * time aligned windows of the two data streams. The user can implement - * their own time stamps or use the system time by default. - * - * @param coWindowFunction - * The {@link CoWindowFunction} that will be applied for the time - * windows. - * @param windowSize - * Size of the windows that will be aligned for both streams. If - * system time is used it is milliseconds. User defined time - * stamps are assumed to be monotonically increasing. - * @param slideInterval - * After every function call the windows will be slid by this - * interval. - * - * @param timestamp1 - * User defined time stamps for the first input. - * @param timestamp2 - * User defined time stamps for the second input. - * @return The transformed {@link DataStream}. - */ - public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce( - CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize, long slideInterval, - TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) { - - if (windowSize < 1) { - throw new IllegalArgumentException("Window size must be positive"); - } - if (slideInterval < 1) { - throw new IllegalArgumentException("Slide interval must be positive"); - } - - TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoWindowFunction.class, - coWindowFunction.getClass(), 2, null, null); - - return addCoFunction("Co-Window", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>( - clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2)); - - } - - protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable( - CoReduceFunction<IN1, IN2, OUT> coReducer) { - CoReduceInvokable<IN1, IN2, OUT> invokable; - if (isGrouped) { - invokable = new CoGroupedReduceInvokable<IN1, IN2, OUT>(clean(coReducer), keySelector1, - keySelector2); - } else { - invokable = new CoReduceInvokable<IN1, IN2, OUT>(clean(coReducer)); - } - return invokable; - } - - public <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine( - CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeInformation<OUT> outTypeInfo, - long windowSize, long slideInterval, TimestampWrapper<IN1> timestamp1, - TimestampWrapper<IN2> timestamp2) { - - if (windowSize < 1) { - throw new IllegalArgumentException("Window size must be positive"); - } - if (slideInterval < 1) { - throw new IllegalArgumentException("Slide interval must be positive"); - } - - return addCoFunction("Co-Window", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>( - clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2)); - - } - - public <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName, - TypeInformation<OUT> outTypeInfo, CoInvokable<IN1, IN2, OUT> functionInvokable) { - - @SuppressWarnings({ "unchecked", "rawtypes" }) - SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator( - environment, functionName, outTypeInfo, functionInvokable); - - dataStream1.streamGraph.addCoTask(returnStream.getId(), functionInvokable, - getInputType1(), getInputType2(), outTypeInfo, functionName, - environment.getDegreeOfParallelism()); - - dataStream1.connectGraph(dataStream1, returnStream.getId(), 1); - dataStream1.connectGraph(dataStream2, returnStream.getId(), 2); - - return returnStream; - } - - protected ConnectedDataStream<IN1, IN2> copy() { - return new ConnectedDataStream<IN1, IN2>(this); - } - -}