Here is the solution I currently have. It turned out to be more complicated
than I expected. It would be great if a more experienced Flink user could
comment and point out the shortcomings. And if you have other ideas for
achieving the same thing, let me know!

Let's start like in the original email, except now we set the time
characteristic to EventTime and parallelism to a constant named PARALLELISM.

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final int PARALLELISM = 2;
env.setParallelism(PARALLELISM);

DataStream<DataItem> stream = env.fromElements(DataItem.class,
        new Value(1), new Barrier(), new Value(3), new Value(-1), new
Barrier());


The first step is to use a punctuation-based timestamp-and-watermark
assigner as follows. We keep track of the number of barriers in the stream.
We assign a timestamp n to the n-th barrier and all the values that
immediately precede it, and we emit a watermark with timestamp n on the
n-th barrier. This will allow us to define 1 millisecond tumbling windows
that precisely capture the values between two barriers.

DataStream<DataItem> timedStream =
        stream.assignTimestampsAndWatermarks(new
AssignerWithPunctuatedWatermarks<DataItem>() {
    private long barrierCount = 0;

    @Override
    public long extractTimestamp(DataItem item, long previousTimestamp) {
        return barrierCount;
    }

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(DataItem item, long
extractedTimestamp) {
        if (item instanceof Barrier) {
            barrierCount++;
            return new Watermark(extractedTimestamp);
        }
        return null;
    }
});


In the test input stream, the first value and barrier get a timestamp 0,
and the next two values and the final barrier get a timestamp 1. Two
watermarks with timestamps 0 and 1 are emitted.

To achieve parallelization, we partition the values by artificially
generated keys. A value's key is based on its position in the stream, so we
first wrap the values into a type that contains this information.

class ValueWithId {
    private final int val;
    private final long id;

    public ValueWithId(int val, long id) {
        this.val = val;
        this.id = id;
    }
    public int getVal() { return val; }
    public long getId() { return id; }
}


Here is the mapping. At the same time we can drop the barriers, since we no
longer need them. Note that we need to explicitly set the mapping
operator's parallelism to 1, since the operator is stateful.

DataStream<ValueWithId> wrappedStream =
        timedStream.flatMap(new FlatMapFunction<DataItem, ValueWithId>() {
    private long count = 0L;

    @Override
    public void flatMap(DataItem item, Collector<ValueWithId>
collector) throws Exception {
        if (item instanceof Value) {
            int val = ((Value) item).getVal();
            collector.collect(new ValueWithId(val, count++));
        }
    }
}).setParallelism(1);


Now we're ready to do the key-based partitioning. A value's key is its id
as assigned above modulo PARALLELISM. We follow the partitioning by
splitting the stream into 1 millisecond tumbling windows. Then we simply
aggregate the partial sums, first for each key separately (and importantly,
in parallel), and then for each window.

DataStream<Integer> partialSums = wrappedStream.keyBy(x -> x.getId() %
PARALLELISM)
        .timeWindow(Time.of(1L, TimeUnit.MILLISECONDS))
        .aggregate(new AggregateFunction<ValueWithId, Integer, Integer>() {
            @Override
            public Integer createAccumulator() { return 0; }

            @Override
            public Integer add(ValueWithId valueWithId, Integer acc) {
return acc + valueWithId.getVal(); }

            @Override
            public Integer getResult(Integer acc) { return acc; }

            @Override
            public Integer merge(Integer acc1, Integer acc2) { return
acc1 + acc2; }
        })
        .timeWindowAll(Time.of(1L, TimeUnit.MILLISECONDS))
        .reduce((x, y) -> x + y);


Finally, in the original problem I asked for cumulative sums since the
start of the stream, so we perform the last set of transformations to
achieve that.

DataStream<Integer> cumulativeSums = partialSums
        .windowAll(GlobalWindows.create())
        .trigger(CountTrigger.of(1))
        .reduce((x, y) -> x + y);
cumulativeSums.print().setParallelism(1);
env.execute();
// We should see 1 followed by 3 as output


I am not completely sure if my usage of state in the
timestamp-and-watermark assigner and the mapper is correct. Is it possible
for Flink to duplicate the assigner, move it around and somehow mess up the
timestamps? Likewise, is it possible for things to go wrong with the mapper?

Another concern I have is that my key-based partitions depend on the
constant PARALLELISM. Ideally, the program should be flexible about the
parallelism that happens to be available during runtime.

Finally, if anyone notices that I am in any part reinventing the wheel and
that Flink already has a feature implementing some of the above, or that
something can be done more elegantly, let me know!

Best regards,

Filip



On Tue, Oct 8, 2019 at 11:12 AM Filip Niksic <fnik...@seas.upenn.edu> wrote:

> Hi Chesnay,
>
> Thanks for the reply. While your solution ultimately does use multiple
> partitions, from what I can tell the underlying processing is still
> sequential. Imagine a stream where barriers are quite rare, say a million
> values is followed by a barrier. Then these million values all end up at
> the same partition and are added up sequentially, and while they are being
> processed, the other partitions are waiting for their turn. A truly
> parallel solution would partition the million values, process each
> partition in parallel to get the partial sums, and on each barrier
> aggregate the partial sums into a total sum.
>
> Filip
>
>
>
> On Tue, Oct 8, 2019 at 9:09 AM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> In other words, you need a way to partition the stream such that a series
>> of items followed by a barrier are never interrupted.
>>
>> I'm wondering whether you could just apply DataStream#partitionCustom to
>> your source:
>>
>> public static class BarrierPartitioner implements Partitioner<DataItem> {
>>
>>    private int currentPartition = 0;   @Override   public int 
>> partition(DataItem key, int numPartitions) {
>>       if (key instanceof Barrier) {
>>          int partitionToReturn = currentPartition;         currentPartition 
>> = (currentPartition + 1) % numPartitions;         return partitionToReturn;  
>>     } else {
>>          return currentPartition;      }
>>    }
>> }
>>
>> DataStream<DataItem> stream = ...;DataStream<DataItem> partitionedStream = 
>> stream.partitionCustom(new BarrierPartitioner(), item -> item);
>>
>>
>> On 08/10/2019 14:55, Filip Niksic wrote:
>>
>> Hi Yun,
>>
>> The behavior with increased parallelism should be the same as with no
>> parallelism. In other words, for the input from the previous email, the
>> output should always be 1, 3, regardless of parallelism. Operationally, the
>> partial sums maintained in each subtask should somehow be aggregated before
>> they are output.
>>
>> To answer the second question, I know that watermarks provide the same
>> functionality. Is there some way to convert the input with explicit
>> punctuation into one with watermarks? I see there is an interface called
>> AssignerWithPunctuatedWatermarks, maybe that's the solution. But I'm not
>> sure how this assigner would be used. For example, it could maintain the
>> number of previously seen Barriers and assign this number as a watermark to
>> each Value, but then this number becomes the state that needs to be shared
>> between multiple substreams. Or perhaps the Barriers can somehow be
>> duplicated and sent to each substream? Alternatively, is there some notion
>> of event-based windows that would be triggered by specific user-defined
>> elements in the stream? In such mechanism perhaps the watermarks would be
>> used internally, but they would not be explicitly exposed to the user?
>>
>> Best regards,
>>
>> Filip
>>
>>
>>
>> On Tue, Oct 8, 2019 at 2:19 AM Yun Gao <yungao...@aliyun.com> wrote:
>>
>>>
>>>       Hi Filip,
>>>
>>>            I have one question on the problem: what is the expected
>>> behavior when the  parallelism of the FlatMapFunction is increased to more
>>> than 1? Should each subtask maintains the partial sum of all values
>>> received, and whenever the barrier is received, then it just outputs the
>>> partial sum of the received value ?
>>>
>>>           Another question is that I think in Flink the watermark
>>> mechanism has provided the functionality similar to punctuation,  therefore
>>> is it possible to implement the same logic with the Flink Window directly?
>>>
>>>     Best,
>>>     Yun
>>>
>>> ------------------------------------------------------------------
>>> From:Filip Niksic <fnik...@seas.upenn.edu>
>>> Send Time:2019 Oct. 8 (Tue.) 08:56
>>> To:user <user@flink.apache.org>
>>> Subject:[QUESTION] How to parallelize with explicit punctuation in Flink?
>>>
>>> Hi all,
>>>
>>> What would be a natural way to implement a parallel version of the
>>> following Flink program?
>>>
>>> Suppose I have a stream of items of type DataItem with two concrete
>>> implementations of DataItem: Value and Barrier. Let’s say that Value holds
>>> an integer value, and Barrier acts as explicit punctuation.
>>>
>>> public interface DataItem {}
>>>
>>> public class Value implements DataItem {
>>>
>>>    private final int val;
>>>
>>>    public Value(int val) { this.val = val; }
>>>
>>>    public int getVal() { return val; }
>>>
>>> }
>>>
>>> public class Barrier implements DataItem {}
>>>
>>> The program should maintain a sum of values seen since the beginning of
>>> the stream. On each Barrier, the program should output the sum seen so far.
>>>
>>> An obvious way to implement this would be with a FlatMapFunction,
>>> maintaining the sum as state and emitting it on each Barrier.
>>>
>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.
>>> getExecutionEnvironment();
>>>
>>> DataStream<DataItem> stream = env.fromElements(DataItem.class,
>>>
>>>        new Value(1), new Barrier(), new Value(3), new Value(-1), new
>>> Barrier());
>>>
>>> stream.flatMap(new FlatMapFunction<DataItem, Integer>() {
>>>
>>>    private int sum = 0;
>>>
>>>    @Override
>>>
>>>    public void flatMap(DataItem dataItem, Collector<Integer> collector) 
>>> throws
>>> Exception {
>>>
>>>        if (dataItem instanceof Value) {
>>>
>>>            sum += ((Value) dataItem).getVal();
>>>
>>>        } else {
>>>
>>>            collector.collect(sum);
>>>
>>>        }
>>>
>>>    }
>>>
>>> }).setParallelism(1).print().setParallelism(1);
>>>
>>> env.execute();
>>>
>>> // We should see 1 followed by 3 as output
>>>
>>> However, such an operator cannot be parallelized, since the order of
>>> Values and Barriers matters. That’s why I need to set parallelism to 1
>>> above. Is there a way to rewrite this to exploit parallelism?
>>>
>>> (Another reason to set parallelism to 1 above is that I’m assuming there
>>> is a single instance of the FlatMapFunction. A proper implementation would
>>> take more care in using state. Feel free to comment on that as well.)
>>>
>>>
>>> Best regards,
>>>
>>>
>>> Filip Niksic
>>>
>>>
>>>
>>

Reply via email to