[FLINK-8597] Add examples for Connected Streams with Broadcast State. This closes #5425.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dea41726 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dea41726 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dea41726 Branch: refs/heads/master Commit: dea4172604126fc40fc8fbf7f83a632733bd0dfb Parents: 9628dc8 Author: kkloudas <kklou...@gmail.com> Authored: Wed Feb 7 17:22:37 2018 +0100 Committer: kkloudas <kklou...@gmail.com> Committed: Fri Feb 9 18:14:56 2018 +0100 ---------------------------------------------------------------------- .../examples/broadcast/BroadcastExample.java | 159 +++++++++++++++++++ .../examples/broadcast/BroadcastExample.scala | 121 ++++++++++++++ 2 files changed, 280 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dea41726/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java new file mode 100644 index 0000000..376d9d3 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/broadcast/BroadcastExample.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.broadcast; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFunction; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Example illustrating the use of {@link org.apache.flink.api.common.state.BroadcastState}. + */ +public class BroadcastExample { + + public static void main(String[] args) throws Exception { + + final List<Integer> input = new ArrayList<>(); + input.add(1); + input.add(2); + input.add(3); + input.add(4); + + final List<Tuple2<Integer, Integer>> keyedInput = new ArrayList<>(); + keyedInput.add(new Tuple2<>(1, 1)); + keyedInput.add(new Tuple2<>(1, 5)); + keyedInput.add(new Tuple2<>(2, 2)); + keyedInput.add(new Tuple2<>(2, 6)); + keyedInput.add(new Tuple2<>(3, 3)); + keyedInput.add(new Tuple2<>(3, 7)); + keyedInput.add(new Tuple2<>(4, 4)); + keyedInput.add(new Tuple2<>(4, 8)); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>( + "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + KeyedStream<Tuple2<Integer, Integer>, Integer> elementStream = env.fromCollection(keyedInput) + .rebalance() // needed to increase the parallelism + .map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { + private static final long serialVersionUID = 8710586935083422712L; + + @Override + public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) { + return value; + } + }) + .setParallelism(4) + .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() { + private static final long serialVersionUID = -1110876099102344900L; + + @Override + public Integer getKey(Tuple2<Integer, Integer> value) { + return value.f0; + } + }); + + BroadcastStream<Integer> broadcastStream = env.fromCollection(input) + .flatMap(new FlatMapFunction<Integer, Integer>() { + private static final long serialVersionUID = 6462244253439410814L; + + @Override + public void flatMap(Integer value, Collector<Integer> out) { + out.collect(value); + } + }) + .setParallelism(4) + .broadcast(mapStateDescriptor); + + DataStream<String> output = elementStream + .connect(broadcastStream) + .process( + new KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, Integer>, Integer, String>() { + + private static final long serialVersionUID = 8512350700250748742L; + + private final ValueStateDescriptor<String> valueState = + new ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO); + + private final MapStateDescriptor<String, Integer> localMapStateDescriptor = + new MapStateDescriptor<>( + "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO + ); + + @Override + public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception { + + ctx.getBroadcastState(localMapStateDescriptor).put(String.valueOf(value), value); + + ctx.applyToKeyedState(valueState, new KeyedStateFunction<Integer, ValueState<String>>() { + @Override + public void process(Integer key, ValueState<String> state) throws Exception { + out.collect("Broadcast side task#" + getRuntimeContext().getIndexOfThisSubtask() + ": " + key + " " + state.value()); + } + }); + } + + @Override + public void processElement(Tuple2<Integer, Integer> value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { + + String prev = getRuntimeContext().getState(valueState).value(); + + StringBuilder str = new StringBuilder(); + str + .append("Value=") + .append(value) + .append(" Broadcast State=["); + + for (Map.Entry<String, Integer> entry : ctx.getBroadcastState(localMapStateDescriptor).immutableEntries()) { + str + .append(entry.getKey()) + .append("->") + .append(entry.getValue()) + .append(" "); + } + + str.append("]"); + + getRuntimeContext().getState(valueState).update(str.toString()); + + out.collect("BEFORE: " + prev + " " + "AFTER: " + str); + } + }); + output.print(); + env.execute(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dea41726/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/broadcast/BroadcastExample.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/broadcast/BroadcastExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/broadcast/BroadcastExample.scala new file mode 100644 index 0000000..fff3c39 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/broadcast/BroadcastExample.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.broadcast + +import org.apache.flink.api.scala._ +import org.apache.flink.api.common.state.{MapStateDescriptor, ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.runtime.state.KeyedStateFunction +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.util.Collector + +/** + * Example illustrating the use of [[org.apache.flink.api.common.state.BroadcastState]]. + */ +object BroadcastExample { + + def main(args: Array[String]): Unit = { + + val input = List(1, 2, 3, 4) + + val keyedInput = List[(Int, Int)]( + new Tuple2[Int, Int](1, 1), + new Tuple2[Int, Int](1, 5), + new Tuple2[Int, Int](2, 2), + new Tuple2[Int, Int](2, 6), + new Tuple2[Int, Int](3, 3), + new Tuple2[Int, Int](3, 7), + new Tuple2[Int, Int](4, 4), + new Tuple2[Int, Int](4, 8) + ) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val mapStateDescriptor = new MapStateDescriptor[String, Integer]( + "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO) + + val elementStream = env + .fromCollection(keyedInput) + .rebalance + .map(value => value) + .setParallelism(4) + .keyBy(value => value._1) + + val broadcastStream = env + .fromCollection(input) + .flatMap((value: Int, out: Collector[Int]) => out.collect(value)) + .setParallelism(4) + .broadcast(mapStateDescriptor) + + val output = elementStream + .connect(broadcastStream) + .process(new KeyedBroadcastProcessFunction[Int, (Int, Int), Int, String]() { + + val valueState = new ValueStateDescriptor[String]("any", BasicTypeInfo.STRING_TYPE_INFO) + + val mapStateDesc = new MapStateDescriptor[String, Integer]( + "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO) + + @throws[Exception] + override def processBroadcastElement( + value: Int, + ctx: KeyedBroadcastProcessFunction[Int, (Int, Int), Int, String]#KeyedContext, + out: Collector[String]) + : Unit = { + + ctx.getBroadcastState(mapStateDesc).put(value + "", value) + + ctx.applyToKeyedState(valueState, new KeyedStateFunction[Int, ValueState[String]] { + + override def process(key: Int, state: ValueState[String]): Unit = + out.collect("Broadcast side task#" + + getRuntimeContext.getIndexOfThisSubtask + ": " + key + " " + state.value) + }) + } + + @throws[Exception] + override def processElement( + value: (Int, Int), + ctx: KeyedBroadcastProcessFunction[Int, (Int, Int), Int, String]#KeyedReadOnlyContext, + out: Collector[String]) + : Unit = { + + val prev = getRuntimeContext.getState(valueState).value + + val str = new StringBuilder + str.append("Value=").append(value).append(" Broadcast State=[") + + import scala.collection.JavaConversions._ + for (entry <- ctx.getBroadcastState(mapStateDesc).immutableEntries()) { + str.append(entry.getKey).append("->").append(entry.getValue).append(" ") + } + str.append("]") + + getRuntimeContext.getState(valueState).update(str.toString) + + out.collect("BEFORE: " + prev + " " + "AFTER: " + str) + + } + }) + + output.print + env.execute + } +}