Hi Chesnay, Thanks for the reply. While your solution ultimately does use multiple partitions, from what I can tell the underlying processing is still sequential. Imagine a stream where barriers are quite rare, say a million values is followed by a barrier. Then these million values all end up at the same partition and are added up sequentially, and while they are being processed, the other partitions are waiting for their turn. A truly parallel solution would partition the million values, process each partition in parallel to get the partial sums, and on each barrier aggregate the partial sums into a total sum.
Filip On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <ches...@apache.org> wrote: > In other words, you need a way to partition the stream such that a series > of items followed by a barrier are never interrupted. > > I'm wondering whether you could just apply DataStream#partitionCustom to > your source: > > public static class BarrierPartitioner implements Partitioner<DataItem> { > > private int currentPartition = 0; @Override public int > partition(DataItem key, int numPartitions) { > if (key instanceof Barrier) { > int partitionToReturn = currentPartition; currentPartition = > (currentPartition + 1) % numPartitions; return partitionToReturn; > } else { > return currentPartition; } > } > } > > DataStream<DataItem> stream = ...;DataStream<DataItem> partitionedStream = > stream.partitionCustom(new BarrierPartitioner(), item -> item); > > > On 08/10/2019 14:55, Filip Niksic wrote: > > Hi Yun, > > The behavior with increased parallelism should be the same as with no > parallelism. In other words, for the input from the previous email, the > output should always be 1, 3, regardless of parallelism. Operationally, the > partial sums maintained in each subtask should somehow be aggregated before > they are output. > > To answer the second question, I know that watermarks provide the same > functionality. Is there some way to convert the input with explicit > punctuation into one with watermarks? I see there is an interface called > AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not > sure how this assigner would be used. For example, it could maintain the > number of previously seen Barriers and assign this number as a watermark to > each Value, but then this number becomes the state that needs to be shared > between multiple substreams. Or perhaps the Barriers can somehow be > duplicated and sent to each substream? Alternatively, is there some notion > of event-based windows that would be triggered by specific user-defined > elements in the stream? In such mechanism perhaps the watermarks would be > used internally, but they would not be explicitly exposed to the user? > > Best regards, > > Filip > > > > On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <yungao...@aliyun.com> wrote: > >> >> Hi Filip, >> >> I have one question on the problem: what is the expected >> behavior when the parallelism of the FlatMapFunction is increased to more >> than 1? Should each subtask maintains the partial sum of all values >> received, and whenever the barrier is received, then it just outputs the >> partial sum of the received value ? >> >> Another question is that I think in Flink the watermark >> mechanism has provided the functionality similar to punctuation, therefore >> is it possible to implement the same logic with the Flink Window directly? >> >> Best, >> Yun >> >> ------------------------------------------------------------------ >> From:Filip Niksic <fnik...@seas.upenn.edu> >> Send Time:2019 Oct. 8 (Tue.) 08:56 >> To:user <user@flink.apache.org> >> Subject:[QUESTION] How to parallelize with explicit punctuation in Flink? >> >> Hi all, >> >> What would be a natural way to implement a parallel version of the >> following Flink program? >> >> Suppose I have a stream of items of type DataItem with two concrete >> implementations of DataItem: Value and Barrier. Let’s say that Value holds >> an integer value, and Barrier acts as explicit punctuation. >> >> public interface DataItem {} >> >> public class Value implements DataItem { >> >> private final int val; >> >> public Value(int val) { this.val = val; } >> >> public int getVal() { return val; } >> >> } >> >> public class Barrier implements DataItem {} >> >> The program should maintain a sum of values seen since the beginning of >> the stream. On each Barrier, the program should output the sum seen so far. >> >> An obvious way to implement this would be with a FlatMapFunction, >> maintaining the sum as state and emitting it on each Barrier. >> >> StreamExecutionEnvironment env = StreamExecutionEnvironment. >> getExecutionEnvironment(); >> >> DataStream<DataItem> stream = env.fromElements(DataItem.class, >> >> new Value(1), new Barrier(), new Value(3), new Value(-1), new >> Barrier()); >> >> stream.flatMap(new FlatMapFunction<DataItem, Integer>() { >> >> private int sum = 0; >> >> @Override >> >> public void flatMap(DataItem dataItem, Collector<Integer> collector) >> throws >> Exception { >> >> if (dataItem instanceof Value) { >> >> sum += ((Value) dataItem).getVal(); >> >> } else { >> >> collector.collect(sum); >> >> } >> >> } >> >> }).setParallelism(1).print().setParallelism(1); >> >> env.execute(); >> >> // We should see 1 followed by 3 as output >> >> However, such an operator cannot be parallelized, since the order of >> Values and Barriers matters. That’s why I need to set parallelism to 1 >> above. Is there a way to rewrite this to exploit parallelism? >> >> (Another reason to set parallelism to 1 above is that I’m assuming there >> is a single instance of the FlatMapFunction. A proper implementation would >> take more care in using state. Feel free to comment on that as well.) >> >> >> Best regards, >> >> >> Filip Niksic >> >> >> >