This is an automated email from the ASF dual-hosted git repository. karthikz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new ffae7af Joshfischer/eco stateful topology (#2851) ffae7af is described below commit ffae7af382984534b824358b4397733267d20eb5 Author: Josh Fischer <j...@joshfischer.io> AuthorDate: Fri Apr 6 00:04:43 2018 -0500 Joshfischer/eco stateful topology (#2851) * saving work. * adding examples for stateful topologies with ECO --- .../twitter/heron/examples/eco/RandomString.java | 51 +++++++++++++ .../heron/examples/eco/StatefulConsumerBolt.java | 66 +++++++++++++++++ .../heron/examples/eco/StatefulNumberSpout.java | 83 ++++++++++++++++++++++ .../heron/examples/eco/StatefulRandomIntSpout.java | 73 +++++++++++++++++++ .../heron/examples/eco/StatefulWindowSumBolt.java | 67 +++++++++++++++++ .../com/twitter/heron/examples/eco/WordSpout.java | 64 +++++++++++++++++ .../examples/eco/heron-stateful-windowing.yaml | 52 ++++++++++++++ .../examples/eco/heron-stateful-word-count.yaml | 38 ++++++++++ 8 files changed, 494 insertions(+) diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/RandomString.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/RandomString.java new file mode 100644 index 0000000..6ece43c --- /dev/null +++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/RandomString.java @@ -0,0 +1,51 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.examples.eco; + +import java.util.Random; + +public class RandomString { + private char[] symbols; + + private Random random = new Random(); + + private char[] buf; + + public RandomString(int length) { + // Construct the symbol set + StringBuilder tmp = new StringBuilder(); + for (char ch = '0'; ch <= '9'; ++ch) { + tmp.append(ch); + } + + for (char ch = 'a'; ch <= 'z'; ++ch) { + tmp.append(ch); + } + + symbols = tmp.toString().toCharArray(); + if (length < 1) { + throw new IllegalArgumentException("length < 1: " + length); + } + + buf = new char[length]; + } + + public String nextString() { + for (int idx = 0; idx < buf.length; ++idx) { + buf[idx] = symbols[random.nextInt(symbols.length)]; + } + + return new String(buf); + } +} diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulConsumerBolt.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulConsumerBolt.java new file mode 100644 index 0000000..bd5cbfe --- /dev/null +++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulConsumerBolt.java @@ -0,0 +1,66 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.examples.eco; + +import java.util.Map; + +import com.twitter.heron.api.bolt.BaseRichBolt; +import com.twitter.heron.api.bolt.OutputCollector; +import com.twitter.heron.api.state.State; +import com.twitter.heron.api.topology.IStatefulComponent; +import com.twitter.heron.api.topology.OutputFieldsDeclarer; +import com.twitter.heron.api.topology.TopologyContext; +import com.twitter.heron.api.tuple.Tuple; + + +public class StatefulConsumerBolt extends BaseRichBolt + implements IStatefulComponent<Integer, Integer> { + private static final long serialVersionUID = -5470591933906954522L; + + private OutputCollector collector; + private State<Integer, Integer> myState; + + @Override + public void initState(State<Integer, Integer> state) { + this.myState = state; + } + + @Override + public void preSave(String checkpointId) { + // Nothing really since we operate out of the system supplied state + } + + @SuppressWarnings("rawtypes") + public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { + collector = outputCollector; + } + + @Override + public void execute(Tuple tuple) { + int key = tuple.getInteger(0); + System.out.println("looking in state for: " + key); + if (myState.get(key) == null) { + System.out.println("did not find " + key + " in state: "); + myState.put(key, 1); + } else { + System.out.println("found in state: " + key); + Integer val = myState.get(key); + myState.put(key, ++val); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + } +} diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulNumberSpout.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulNumberSpout.java new file mode 100644 index 0000000..c3fb3db --- /dev/null +++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulNumberSpout.java @@ -0,0 +1,83 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.examples.eco; + +import java.util.Map; +import java.util.Random; +import java.util.logging.Logger; + +import com.twitter.heron.api.spout.BaseRichSpout; +import com.twitter.heron.api.spout.SpoutOutputCollector; +import com.twitter.heron.api.state.State; +import com.twitter.heron.api.topology.IStatefulComponent; +import com.twitter.heron.api.topology.OutputFieldsDeclarer; +import com.twitter.heron.api.topology.TopologyContext; +import com.twitter.heron.api.tuple.Fields; +import com.twitter.heron.api.tuple.Values; +import com.twitter.heron.api.utils.Utils; + +@SuppressWarnings("HiddenField") +public class StatefulNumberSpout extends BaseRichSpout + implements IStatefulComponent<String, Long> { + private static final Logger LOG = Logger.getLogger(StatefulNumberSpout.class.getName()); + private static final long serialVersionUID = 5454291010750852782L; + private SpoutOutputCollector collector; + private Random rand; + private long msgId; + private State<String, Long> state; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("value", "ts", "msgid")); + } + + @Override + public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector + collector) { + this.collector = collector; + this.rand = new Random(); + } + + @Override + public void nextTuple() { + Utils.sleep(1000); + long val = msgId; + long randomNumber = System.currentTimeMillis() - (24 * 60 * 60 * 1000); + System.out.println("Emitting: " + val); + collector.emit(new Values(val, + randomNumber, msgId), msgId); + msgId++; + } + + @Override + public void ack(Object msgId) { + LOG.fine("Got ACK for msgId : " + msgId); + } + + @Override + public void fail(Object msgId) { + LOG.fine("Got FAIL for msgId : " + msgId); + } + + @Override + public void initState(State<String, Long> state) { + this.state = state; + this.msgId = this.state.getOrDefault("msgId", 0L); + } + + @Override + public void preSave(String checkpointId) { + this.state.put("msgId", msgId); + } +} diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulRandomIntSpout.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulRandomIntSpout.java new file mode 100644 index 0000000..e128840 --- /dev/null +++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulRandomIntSpout.java @@ -0,0 +1,73 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.examples.eco; + +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +import com.twitter.heron.api.spout.BaseRichSpout; +import com.twitter.heron.api.spout.SpoutOutputCollector; +import com.twitter.heron.api.state.State; +import com.twitter.heron.api.topology.IStatefulComponent; +import com.twitter.heron.api.topology.OutputFieldsDeclarer; +import com.twitter.heron.api.topology.TopologyContext; +import com.twitter.heron.api.tuple.Fields; +import com.twitter.heron.api.tuple.Values; + +import backtype.storm.utils.Utils; + +public class StatefulRandomIntSpout extends BaseRichSpout + implements IStatefulComponent<String, Integer> { + private SpoutOutputCollector spoutOutputCollector; + private State<String, Integer> count; + + public StatefulRandomIntSpout() { + } + + // Generates a random integer between 1 and 100 + private int randomInt() { + return ThreadLocalRandom.current().nextInt(1, 101); + } + + // These two methods are required to implement the IStatefulComponent interface + @Override + public void preSave(String checkpointId) { + System.out.println(String.format("Saving spout state at checkpoint %s", checkpointId)); + } + + @Override + public void initState(State<String, Integer> state) { + count = state; + } + + // These three methods are required to extend the BaseRichSpout abstract class + @Override + public void open(Map<String, Object> map, TopologyContext ctx, SpoutOutputCollector collector) { + spoutOutputCollector = collector; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("random-int")); + } + + @Override + public void nextTuple() { + Utils.sleep(2000); + int randomInt = randomInt(); + System.out.println("Emitting Value: " + randomInt); + spoutOutputCollector.emit(new Values(randomInt)); + } +} + diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulWindowSumBolt.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulWindowSumBolt.java new file mode 100644 index 0000000..8f721e4 --- /dev/null +++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/StatefulWindowSumBolt.java @@ -0,0 +1,67 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.examples.eco; + +import java.util.Map; + +import com.twitter.heron.api.bolt.BaseStatefulWindowedBolt; +import com.twitter.heron.api.bolt.OutputCollector; +import com.twitter.heron.api.state.State; +import com.twitter.heron.api.topology.OutputFieldsDeclarer; +import com.twitter.heron.api.topology.TopologyContext; +import com.twitter.heron.api.tuple.Fields; +import com.twitter.heron.api.tuple.Tuple; +import com.twitter.heron.api.tuple.Values; +import com.twitter.heron.api.windowing.TupleWindow; + +@SuppressWarnings("HiddenField") +public class StatefulWindowSumBolt extends BaseStatefulWindowedBolt<String, Long> { + private static final long serialVersionUID = -539382497249834244L; + private State<String, Long> state; + private long sum; + + private OutputCollector collector; + + @Override + public void prepare(Map<String, Object> topoConf, TopologyContext context, + OutputCollector collector) { + this.collector = collector; + } + + @Override + public void initState(State<String, Long> state) { + this.state = state; + sum = state.getOrDefault("sum", 0L); + } + + @Override + public void execute(TupleWindow inputWindow) { + for (Tuple tuple : inputWindow.get()) { + System.out.println("Adding to sum: " + tuple.getLongByField("value")); + sum += tuple.getLongByField("value"); + System.out.println("Sum is now: " + sum); + } + collector.emit(new Values(sum)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("sum")); + } + + @Override + public void preSave(String checkpointId) { + state.put("sum", sum); + } +} diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/WordSpout.java b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/WordSpout.java new file mode 100644 index 0000000..3f3b6f1 --- /dev/null +++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/WordSpout.java @@ -0,0 +1,64 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.examples.eco; + +import java.util.Map; +import java.util.Random; + +import com.twitter.heron.api.spout.BaseRichSpout; +import com.twitter.heron.api.spout.SpoutOutputCollector; +import com.twitter.heron.api.topology.OutputFieldsDeclarer; +import com.twitter.heron.api.topology.TopologyContext; +import com.twitter.heron.api.tuple.Fields; +import com.twitter.heron.api.tuple.Values; + +@SuppressWarnings("HiddenField") +public class WordSpout extends BaseRichSpout { + private static final long serialVersionUID = 4322775001819135036L; + + private static final int ARRAY_LENGTH = 128 * 1024; + private static final int WORD_LENGTH = 20; + + private final String[] words = new String[ARRAY_LENGTH]; + + private final Random rnd = new Random(31); + + private SpoutOutputCollector collector; + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("word")); + } + + @Override + @SuppressWarnings("rawtypes") + public void open(Map map, TopologyContext topologyContext, + SpoutOutputCollector spoutOutputCollector) { + System.out.println("open spout"); + RandomString randomString = new RandomString(WORD_LENGTH); + + for (int i = 0; i < ARRAY_LENGTH; i++) { + words[i] = randomString.nextString(); + } + + collector = spoutOutputCollector; + } + + @Override + public void nextTuple() { + System.out.println("next tuple"); + int nextInt = rnd.nextInt(ARRAY_LENGTH); + collector.emit(new Values(words[nextInt])); + } +} diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-windowing.yaml b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-windowing.yaml new file mode 100644 index 0000000..9c9d5cf --- /dev/null +++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-windowing.yaml @@ -0,0 +1,52 @@ +# Copyright 2017 Twitter. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: "stateful-windowing" +type: "heron" + +config: + topology.workers: 1 + topology.reliability.mode: "EFFECTIVELY_ONCE" + +components: + + - id: "windowLength" + className: "com.twitter.heron.api.bolt.BaseWindowedBolt$Count" + constructorArgs: + - 5 + + - id: "slidingInterval" + className: "com.twitter.heron.api.bolt.BaseWindowedBolt$Count" + constructorArgs: + - 3 + +spouts: + - id: "integer-spout" + className: "com.twitter.heron.examples.eco.StatefulNumberSpout" + parallelism: 1 + +bolts: + - id: "stateful-window-sum-bolt" + className: "com.twitter.heron.examples.eco.StatefulWindowSumBolt" + configMethods: + - name: "withWindow" + args: [ref: "windowLength", ref: "slidingInterval"] + parallelism: 1 + + +streams: + - from: "integer-spout" + to: "stateful-window-sum-bolt" + grouping: + type: SHUFFLE \ No newline at end of file diff --git a/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-word-count.yaml b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-word-count.yaml new file mode 100644 index 0000000..e5838af --- /dev/null +++ b/eco-heron-examples/src/java/com/twitter/heron/examples/eco/heron-stateful-word-count.yaml @@ -0,0 +1,38 @@ +# Copyright 2017 Twitter. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: "stateful-word-count" +type: "heron" + +config: + topology.workers: 1 + topology.reliability.mode: "EFFECTIVELY_ONCE" + + +spouts: + - id: "int-spout" + className: "com.twitter.heron.examples.eco.StatefulRandomIntSpout" + parallelism: 1 + +bolts: + - id: "stateful-consumer-bolt" + className: "com.twitter.heron.examples.eco.StatefulConsumerBolt" + parallelism: 1 + + +streams: + - from: "int-spout" + to: "stateful-consumer-bolt" + grouping: + type: SHUFFLE \ No newline at end of file -- To stop receiving notification emails like this one, please contact karth...@apache.org.