Hi Chesnay,

Thanks for the reply. While your solution ultimately does use multiple
partitions, from what I can tell the underlying processing is still
sequential. Imagine a stream where barriers are quite rare, say a million
values is followed by a barrier. Then these million values all end up at
the same partition and are added up sequentially, and while they are being
processed, the other partitions are waiting for their turn. A truly
parallel solution would partition the million values, process each
partition in parallel to get the partial sums, and on each barrier
aggregate the partial sums into a total sum.

Filip



On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <ches...@apache.org> wrote:

> In other words, you need a way to partition the stream such that a series
> of items followed by a barrier are never interrupted.
>
> I'm wondering whether you could just apply DataStream#partitionCustom to
> your source:
>
> public static class BarrierPartitioner implements Partitioner<DataItem> {
>
>    private int currentPartition = 0;   @Override   public int 
> partition(DataItem key, int numPartitions) {
>       if (key instanceof Barrier) {
>          int partitionToReturn = currentPartition;         currentPartition = 
> (currentPartition + 1) % numPartitions;         return partitionToReturn;     
>  } else {
>          return currentPartition;      }
>    }
> }
>
> DataStream<DataItem> stream = ...;DataStream<DataItem> partitionedStream = 
> stream.partitionCustom(new BarrierPartitioner(), item -> item);
>
>
> On 08/10/2019 14:55, Filip Niksic wrote:
>
> Hi Yun,
>
> The behavior with increased parallelism should be the same as with no
> parallelism. In other words, for the input from the previous email, the
> output should always be 1, 3, regardless of parallelism. Operationally, the
> partial sums maintained in each subtask should somehow be aggregated before
> they are output.
>
> To answer the second question, I know that watermarks provide the same
> functionality. Is there some way to convert the input with explicit
> punctuation into one with watermarks? I see there is an interface called
> AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not
> sure how this assigner would be used. For example, it could maintain the
> number of previously seen Barriers and assign this number as a watermark to
> each Value, but then this number becomes the state that needs to be shared
> between multiple substreams. Or perhaps the Barriers can somehow be
> duplicated and sent to each substream? Alternatively, is there some notion
> of event-based windows that would be triggered by specific user-defined
> elements in the stream? In such mechanism perhaps the watermarks would be
> used internally, but they would not be explicitly exposed to the user?
>
> Best regards,
>
> Filip
>
>
>
> On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <yungao...@aliyun.com> wrote:
>
>>
>>       Hi Filip,
>>
>>            I have one question on the problem: what is the expected
>> behavior when the  parallelism of the FlatMapFunction is increased to more
>> than 1? Should each subtask maintains the partial sum of all values
>> received, and whenever the barrier is received, then it just outputs the
>> partial sum of the received value ?
>>
>>           Another question is that I think in Flink the watermark
>> mechanism has provided the functionality similar to punctuation,  therefore
>> is it possible to implement the same logic with the Flink Window directly?
>>
>>     Best,
>>     Yun
>>
>> ------------------------------------------------------------------
>> From:Filip Niksic <fnik...@seas.upenn.edu>
>> Send Time:2019 Oct. 8 (Tue.) 08:56
>> To:user <user@flink.apache.org>
>> Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?
>>
>> Hi all,
>>
>> What would be a natural way to implement a parallel version of the
>> following Flink program?
>>
>> Suppose I have a stream of items of type DataItem with two concrete
>> implementations of DataItem: Value and Barrier. Let’s say that Value holds
>> an integer value, and Barrier acts as explicit punctuation.
>>
>> public interface DataItem {}
>>
>> public class Value implements DataItem {
>>
>>    private final int val;
>>
>>    public Value(int val) { this.val = val; }
>>
>>    public int getVal() { return val; }
>>
>> }
>>
>> public class Barrier implements DataItem {}
>>
>> The program should maintain a sum of values seen since the beginning of
>> the stream. On each Barrier, the program should output the sum seen so far.
>>
>> An obvious way to implement this would be with a FlatMapFunction,
>> maintaining the sum as state and emitting it on each Barrier.
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.
>> getExecutionEnvironment();
>>
>> DataStream<DataItem> stream = env.fromElements(DataItem.class,
>>
>>        new Value(1), new Barrier(), new Value(3), new Value(-1), new
>> Barrier());
>>
>> stream.flatMap(new FlatMapFunction<DataItem, Integer>() {
>>
>>    private int sum = 0;
>>
>>    @Override
>>
>>    public void flatMap(DataItem dataItem, Collector<Integer> collector) 
>> throws
>> Exception {
>>
>>        if (dataItem instanceof Value) {
>>
>>            sum += ((Value) dataItem).getVal();
>>
>>        } else {
>>
>>            collector.collect(sum);
>>
>>        }
>>
>>    }
>>
>> }).setParallelism(1).print().setParallelism(1);
>>
>> env.execute();
>>
>> // We should see 1 followed by 3 as output
>>
>> However, such an operator cannot be parallelized, since the order of
>> Values and Barriers matters. That’s why I need to set parallelism to 1
>> above. Is there a way to rewrite this to exploit parallelism?
>>
>> (Another reason to set parallelism to 1 above is that I’m assuming there
>> is a single instance of the FlatMapFunction. A proper implementation would
>> take more care in using state. Feel free to comment on that as well.)
>>
>>
>> Best regards,
>>
>>
>> Filip Niksic
>>
>>
>>
>

Reply via email to