Re: Strange behaviour of the flatMap Collector

2016-08-11 Thread Yassin Marzouki
Indeed, using the same parallelism corrected the output. Thank you!

On Thu, Aug 11, 2016 at 2:34 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> The source runs parallel (n tasks), but the sink has a parallelism of 1.
> The sink hence has to merge the parallel streams from the source, which
> happens based on arrival speed of the streams, i.e., its not deterministic.
> That's why you see the lines being mixed.
>
> Try running source and sink with the same parallelism, then no merge of
> streams needs to happen. You'll see then that per output file, the lines
> are correct.
>
> Stephan
>
>
> On Thu, Aug 11, 2016 at 2:29 PM, Yassin Marzouki <yassmar...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> When I use out.collect() twice inside a faltMap, the output is sometimes
>> and randomly skewed. Take this example:
>>
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.cre
>> ateLocalEnvironment();
>> env.generateSequence(1, 10)
>> .flatMap((Long t, Collector out) -> {
>> out.collect("line1");
>> out.collect("line2");
>> })
>> .writeAsText("test",FileSystem.WriteMode.OVERWRITE).
>> setParallelism(1);
>> env.execute("Test");
>>
>> I expect the output to be
>> line1
>> line2
>> line1
>> line2
>> ...
>>
>> But some resulting lines (18 out of 20) were:
>> line2
>> line2
>> and the same for line1.
>>
>> What could be the reason for this?
>>
>> Best,
>> Yassine
>>
>
>


Strange behaviour of the flatMap Collector

2016-08-11 Thread Yassin Marzouki
Hi all,

When I use out.collect() twice inside a faltMap, the output is sometimes
and randomly skewed. Take this example:

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.generateSequence(1, 10)
.flatMap((Long t, Collector out) -> {
out.collect("line1");
out.collect("line2");
})

.writeAsText("test",FileSystem.WriteMode.OVERWRITE).setParallelism(1);
env.execute("Test");

I expect the output to be
line1
line2
line1
line2
...

But some resulting lines (18 out of 20) were:
line2
line2
and the same for line1.

What could be the reason for this?

Best,
Yassine


Re: No output when using event time with multiple Kafka partitions

2016-07-27 Thread Yassin Marzouki
I just tried playing with the source paralleism setting, and I got a very
strange result:

If specify the source parallism
using env.addSource(kafka).setParallelism(N), results are printed correctly
for any number N except for N=4. I guess that's related to the number of
task slots since I have a 4 CPU cores, but what is the explanation of that?
So I suppose that if I don't specify the source parallelism, it is set
automatically to 4. Isn't it supposed to be set to the number of topic
patitions (= 2) by default?


On Wed, Jul 27, 2016 at 2:33 PM, Yassin Marzouki <yassmar...@gmail.com>
wrote:

> Hi Kostas,
>
> When I remove the window and the apply() and put print() after 
> assignTimestampsAndWatermarks,
> the messages are printed correctly:
>
> 2> Request{ts=2015-01-01, 06:15:34:000}
> 2> Request{ts=2015-01-02, 16:38:10:000}
> 2> Request{ts=2015-01-02, 18:58:41:000}
> 2> Request{ts=2015-01-02, 19:10:00:000}
> 2> Request{ts=2015-01-02, 23:36:51:000}
> 2> Request{ts=2015-01-03, 17:38:47:000}
> ...
>
> But strangely using only one task. If I set the source parallelism to 1
> using env.addSource(kafka).setParallelism(1) (the window and the apply()
> still removed), results are printed using all available slots (number of
> CPU cores):
>
> 4> Request{ts=2015-01-01, 06:15:34:000}
> 4> Request{ts=2015-01-02, 16:38:10:000}
> 2> Request{ts=2015-01-02, 19:10:00:000}
> 4> Request{ts=2015-01-02, 23:36:51:000}
> 1> Request{ts=2015-01-02, 18:58:41:000}
> 2> Request{ts=2015-01-03, 17:38:47:000}
> 3> Request{ts=2015-01-03, 17:56:42:000}
> ...
>
> Now if I keep the window and apply() with without specifying source
> parallelism, no messages are printed (only regular kafka consumer and flink
> logs), and if the source parallelism is set to 1, messages are printed
> correctly:
>
> 1> Window: TimeWindow{start=142007040, end=142015680}
> 2> Request{ts=2015-01-01, 06:15:34:000}
> 1> Request{ts=2015-01-02, 16:38:10:000}
> 4> Request{ts=2015-01-02, 19:10:00:000}
> 3> Window: TimeWindow{start=142015680, end=142024320}
> 3> Request{ts=2015-01-02, 18:58:41:000}
> 2> Request{ts=2015-01-02, 23:36:51:000}
> 3> Window: TimeWindow{start=142041600, end=142050240}
> 2> Request{ts=2015-01-03, 17:38:47:000}
> 4> Window: TimeWindow{start=142024320, end=142032960}
> 1> Request{ts=2015-01-03, 17:56:42:000}
> 1> Request{ts=2015-01-05, 17:13:45:000}
> 4> Request{ts=2015-01-05, 01:25:55:000}
> 2> Request{ts=2015-01-05, 14:27:45:000}
> ...
>
> On Wed, Jul 27, 2016 at 1:41 PM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi Yassine,
>>
>> Could you just remove the window and the apply, and  just put a print()
>> after the:
>>
>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()
>> {
>> @Override
>> public long extractAscendingTimestamp(Request req) {
>> return req.ts;
>> }
>> })
>>
>>
>> This at least will tell us if reading from Kafka works as expected.
>>
>> Kostas
>>
>> On Jul 25, 2016, at 3:39 PM, Yassin Marzouki <yassmar...@gmail.com>
>> wrote:
>>
>> Hi everyone,
>>
>> I am reading messages from a Kafka topic with 2 partitions and using
>> event time. This is my code:
>>
>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()
>> {
>> @Override
>> public long extractAscendingTimestamp(Request req) {
>> return req.ts;
>> }
>> })
>> .windowAll(TumblingEventTimeWindows.of(Time.days(1)))
>> .apply((TimeWindow window, Iterable iterable, Collector
>> collector) -> {
>> collector.collect("Window: " + window.toString());
>> for (Request req : iterable) {
>> collector.collect(req.toString());
>> }
>> })
>> .print()
>>
>> I could get an output only when setting the kafka source parallelism to
>> 1. I guess that is because messages from multiple partitions arrive
>> out-of-order to the timestamp exctractor according to this thread
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-td4782.html#a4804>,
>> correct?
>> So I replaced the AscendingTimestampExtractor with a
>> BoundedOutOfOrdernessGenerator as in the documentation example
>> <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html#tab_java_3>
>>  (with
>> a higher delay) in order to handle out-of-order events, but I still can't
>> get any output. Why is that?
>>
>> Best,
>> Yassine
>>
>>
>>
>


Re: No output when using event time with multiple Kafka partitions

2016-07-27 Thread Yassin Marzouki
Hi Kostas,

When I remove the window and the apply() and put print() after
assignTimestampsAndWatermarks,
the messages are printed correctly:

2> Request{ts=2015-01-01, 06:15:34:000}
2> Request{ts=2015-01-02, 16:38:10:000}
2> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-02, 19:10:00:000}
2> Request{ts=2015-01-02, 23:36:51:000}
2> Request{ts=2015-01-03, 17:38:47:000}
...

But strangely using only one task. If I set the source parallelism to 1
using env.addSource(kafka).setParallelism(1) (the window and the apply()
still removed), results are printed using all available slots (number of
CPU cores):

4> Request{ts=2015-01-01, 06:15:34:000}
4> Request{ts=2015-01-02, 16:38:10:000}
2> Request{ts=2015-01-02, 19:10:00:000}
4> Request{ts=2015-01-02, 23:36:51:000}
1> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-03, 17:38:47:000}
3> Request{ts=2015-01-03, 17:56:42:000}
...

Now if I keep the window and apply() with without specifying source
parallelism, no messages are printed (only regular kafka consumer and flink
logs), and if the source parallelism is set to 1, messages are printed
correctly:

1> Window: TimeWindow{start=142007040, end=142015680}
2> Request{ts=2015-01-01, 06:15:34:000}
1> Request{ts=2015-01-02, 16:38:10:000}
4> Request{ts=2015-01-02, 19:10:00:000}
3> Window: TimeWindow{start=142015680, end=142024320}
3> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-02, 23:36:51:000}
3> Window: TimeWindow{start=142041600, end=142050240}
2> Request{ts=2015-01-03, 17:38:47:000}
4> Window: TimeWindow{start=142024320, end=142032960}
1> Request{ts=2015-01-03, 17:56:42:000}
1> Request{ts=2015-01-05, 17:13:45:000}
4> Request{ts=2015-01-05, 01:25:55:000}
2> Request{ts=2015-01-05, 14:27:45:000}
...

On Wed, Jul 27, 2016 at 1:41 PM, Kostas Kloudas <k.klou...@data-artisans.com
> wrote:

> Hi Yassine,
>
> Could you just remove the window and the apply, and  just put a print()
> after the:
>
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
> @Override
> public long extractAscendingTimestamp(Request req) {
> return req.ts;
> }
> })
>
>
> This at least will tell us if reading from Kafka works as expected.
>
> Kostas
>
> On Jul 25, 2016, at 3:39 PM, Yassin Marzouki <yassmar...@gmail.com> wrote:
>
> Hi everyone,
>
> I am reading messages from a Kafka topic with 2 partitions and using event
> time. This is my code:
>
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
> @Override
> public long extractAscendingTimestamp(Request req) {
> return req.ts;
> }
> })
> .windowAll(TumblingEventTimeWindows.of(Time.days(1)))
> .apply((TimeWindow window, Iterable iterable, Collector
> collector) -> {
> collector.collect("Window: " + window.toString());
> for (Request req : iterable) {
> collector.collect(req.toString());
> }
> })
> .print()
>
> I could get an output only when setting the kafka source parallelism to 1. I
> guess that is because messages from multiple partitions arrive out-of-order
> to the timestamp exctractor according to this thread
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-td4782.html#a4804>,
> correct?
> So I replaced the AscendingTimestampExtractor with a
> BoundedOutOfOrdernessGenerator as in the documentation example
> <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html#tab_java_3>
>  (with
> a higher delay) in order to handle out-of-order events, but I still can't
> get any output. Why is that?
>
> Best,
> Yassine
>
>
>


Re: If I chain two windows, what event-time would the second window have?

2016-07-27 Thread Yassin Marzouki
Hi Kostas,

Thank you very much for the explanation.

Best,
Yassine

On Wed, Jul 27, 2016 at 1:09 PM, Kostas Kloudas <k.klou...@data-artisans.com
> wrote:

> Hi Yassine,
>
> When the WindowFunction is applied to the content of a window, the
> timestamp of the resulting record
> is the window.maxTimestamp, which is the endOfWindow-1.
>
> You can imaging if you have a Tumbling window from 0 to 2000, the result
> will have a timestamp of 1999.
> Window boundaries are closed in the start and open at the end timestamp,
> or [start, end).
>
> If you want to play around, I would suggest checking out the tests in the
> WindowOperatorTest class.
>
> There you can do experiments and figure out how Flinkā€™s windowOperator
> works internally and what is the
> interplay between windowAssingers, triggers, and the windowOperator.
>
> Hope this helps,
> Kostas
>
> On Jul 27, 2016, at 8:41 AM, Yassin Marzouki <yassmar...@gmail.com> wrote:
>
> Hi all,
>
> Say I assign timestamps to a stream and then apply a transformation like
> this:
>
>
> stream.keyBy(0).timeWindow(Time.hours(5)).reduce(count).timeWindowAll(Time.days(1)).apply(transformation)
>
> Now, when the first window is applied, events are aggregated based on
> their timestamps, but I don't understand what timestamp will be assigned to
> the aggregated result of the reduce operation for the second window to
> process it. Could you please explain it? Thank you.
>
> Best,
> Yassine
>
>
>


If I chain two windows, what event-time would the second window have?

2016-07-27 Thread Yassin Marzouki
Hi all,

Say I assign timestamps to a stream and then apply a transformation like
this:

stream.keyBy(0).timeWindow(Time.hours(5)).reduce(count).timeWindowAll(Time.days(1)).apply(transformation)

Now, when the first window is applied, events are aggregated based on their
timestamps, but I don't understand what timestamp will be assigned to the
aggregated result of the reduce operation for the second window to process
it. Could you please explain it? Thank you.

Best,
Yassine


No output when using event time with multiple Kafka partitions

2016-07-25 Thread Yassin Marzouki
Hi everyone,

I am reading messages from a Kafka topic with 2 partitions and using event
time. This is my code:

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(Request req) {
return req.ts;
}
})
.windowAll(TumblingEventTimeWindows.of(Time.days(1)))
.apply((TimeWindow window, Iterable iterable, Collector
collector) -> {
collector.collect("Window: " + window.toString());
for (Request req : iterable) {
collector.collect(req.toString());
}
})
.print()

I could get an output only when setting the kafka source parallelism to 1. I
guess that is because messages from multiple partitions arrive out-of-order
to the timestamp exctractor according to this thread
,
correct?
So I replaced the AscendingTimestampExtractor with a
BoundedOutOfOrdernessGenerator as in the documentation example

(with
a higher delay) in order to handle out-of-order events, but I still can't
get any output. Why is that?

Best,
Yassine


Re: Variable not initialized in the open() method of RichMapFunction

2016-07-22 Thread Yassin Marzouki
Thank you Stephan and Kim, that solved the problem.
Just to make sure, is using a MapFunction as in the following code any
different? i.e. does it initialize the objectMapper for every element in
the stream?

.map(new MapFunction<String, Request>() {

private ObjectMapper objectMapper = new ObjectMapper();

@Override
 public Request map(String value) throws Exception {
 return objectMapper.readValue(value, Request.class);
}
})

On Fri, Jul 22, 2016 at 5:20 PM, Dong iL, Kim <kim.s...@gmail.com> wrote:

> oops. stephan already answered.
> sorry. T^T
>
> On Sat, Jul 23, 2016 at 12:16 AM, Dong iL, Kim <kim.s...@gmail.com> wrote:
>
>> is open method signature right? or typo?
>>
>> void open(Configuration parameters) throws Exception;
>>
>> On Sat, Jul 23, 2016 at 12:09 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> I think you overrode the open method with the wrong signature. The right
>>> signature would be "open(Configuration cfg) {...}". You probably overlooked
>>> this because you missed the "@Override" annotation.
>>>
>>> On Fri, Jul 22, 2016 at 4:49 PM, Yassin Marzouki <yassmar...@gmail.com>
>>> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> I want to convert a stream of json strings to POJOs using Jackson, so I
>>>> did the following:
>>>>
>>>> .map(new RichMapFunction<String, Request>() {
>>>>
>>>> private ObjectMapper objectMapper;
>>>>
>>>> public void open() {
>>>> objectMapper = new ObjectMapper();
>>>> }
>>>>
>>>> @Override
>>>>  public Request map(String value) throws Exception {
>>>>  return objectMapper.readValue(value, Request.class);
>>>> }
>>>> })
>>>>
>>>> But this code gave me a NullPointerException because the objectMapper
>>>> was not initialized successfully.
>>>>
>>>> 1. Isn't the open() method supposed to be called before map() and
>>>> initialize objectMapper?
>>>> 2. I figured out that initializing objectMapper before the open()
>>>> method resolves the problem, and that it works also with a simple
>>>> MapFunction. In that case, is there an advantage for using a
>>>> RichMapFunction?
>>>>
>>>> Best,
>>>> Yassine
>>>>
>>>
>>>
>>
>>
>> --
>> http://www.kiva.org; TARGET="_top">
>> http://www.kiva.org/images/bannerlong.png; WIDTH="460"
>> HEIGHT="60" ALT="Kiva - loans that change lives" BORDER="0"
>> ALIGN="BOTTOM">
>>
>
>
>
> --
> http://www.kiva.org; TARGET="_top">
> http://www.kiva.org/images/bannerlong.png; WIDTH="460"
> HEIGHT="60" ALT="Kiva - loans that change lives" BORDER="0"
> ALIGN="BOTTOM">
>


Variable not initialized in the open() method of RichMapFunction

2016-07-22 Thread Yassin Marzouki
Hi everyone,

I want to convert a stream of json strings to POJOs using Jackson, so I did
the following:

.map(new RichMapFunction() {

private ObjectMapper objectMapper;

public void open() {
objectMapper = new ObjectMapper();
}

@Override
 public Request map(String value) throws Exception {
 return objectMapper.readValue(value, Request.class);
}
})

But this code gave me a NullPointerException because the objectMapper was
not initialized successfully.

1. Isn't the open() method supposed to be called before map() and
initialize objectMapper?
2. I figured out that initializing objectMapper before the open() method
resolves the problem, and that it works also with a simple MapFunction. In
that case, is there an advantage for using a RichMapFunction?

Best,
Yassine