Re: Python API not working

2018-01-04 Thread Yassine MARZOUGUI
Hi all,

Any ideas on this?


2017-12-15 15:10 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

> Hi Ufuk,
>
> Thanks for your response. Unfortunately specifying 'streaming` or `batch`
> doesn't work, it looks like mode should be either "plan" or "operator" ,
> and then the program expects other inputs from the stdin (id, port, etc.).
>
> 2017-12-15 14:23 GMT+01:00 Ufuk Celebi <u...@apache.org>:
>
>> Hey Yassine,
>>
>> let me include Chesnay (cc'd) who worked on the Python API.
>>
>> I'm not familiar with the API and what it expects, but try entering
>> `streaming` or `batch` for the mode. Chesnay probably has the details.
>>
>> – Ufuk
>>
>>
>> On Fri, Dec 15, 2017 at 11:05 AM, Yassine MARZOUGUI
>> <y.marzou...@mindlytix.com> wrote:
>> > Hi All,
>> >
>> > I'm trying to use Flink with the python API, and started with the
>> wordcount
>> > exemple from the Documentation. I'm using Flink 1.4 and python 2.7.
>> > When running env.execute(local=True), the command doesn't execute and
>> keeps
>> > waiting for input. If I hit enter again I get the following error from
>> > Environment.py : ValueError("Invalid mode specified: " + mode)
>> > Looking at the source code, it looks like there are a bunch of
>> > sys.stdin.readline().rstrip('\n') where an input is expected from the
>> user.
>> > Any idea how to run the job? Thank you.
>> >
>> > Best,
>> > Yassine
>> >
>>
>
>


Re: Python API not working

2017-12-15 Thread Yassine MARZOUGUI
Hi Ufuk,

Thanks for your response. Unfortunately specifying 'streaming` or `batch`
doesn't work, it looks like mode should be either "plan" or "operator" ,
and then the program expects other inputs from the stdin (id, port, etc.).

2017-12-15 14:23 GMT+01:00 Ufuk Celebi <u...@apache.org>:

> Hey Yassine,
>
> let me include Chesnay (cc'd) who worked on the Python API.
>
> I'm not familiar with the API and what it expects, but try entering
> `streaming` or `batch` for the mode. Chesnay probably has the details.
>
> – Ufuk
>
>
> On Fri, Dec 15, 2017 at 11:05 AM, Yassine MARZOUGUI
> <y.marzou...@mindlytix.com> wrote:
> > Hi All,
> >
> > I'm trying to use Flink with the python API, and started with the
> wordcount
> > exemple from the Documentation. I'm using Flink 1.4 and python 2.7.
> > When running env.execute(local=True), the command doesn't execute and
> keeps
> > waiting for input. If I hit enter again I get the following error from
> > Environment.py : ValueError("Invalid mode specified: " + mode)
> > Looking at the source code, it looks like there are a bunch of
> > sys.stdin.readline().rstrip('\n') where an input is expected from the
> user.
> > Any idea how to run the job? Thank you.
> >
> > Best,
> > Yassine
> >
>


Python API not working

2017-12-15 Thread Yassine MARZOUGUI
Hi All,

I'm trying to use Flink with the python API, and started with the wordcount
exemple

from the Documentation. I'm using Flink 1.4 and python 2.7.
When running env.execute(local=True), the command doesn't execute and keeps
waiting for input. If I hit enter again I get the following error from
Environment.py : ValueError("Invalid mode specified: " + mode)
Looking at the source code
,
it looks like there are a bunch of sys.stdin.readline().rstrip('\n') where
an input is expected from the user. Any idea how to run the job? Thank you.

Best,
Yassine


Re: notNext() and next(negation) not yielding same output in Flink CEP

2017-07-24 Thread Yassine MARZOUGUI
Hi Dawid,

Thanks a lot for the explanation, it's all clear now.

Best,
Yassine

2017-07-23 13:11 GMT+02:00 Dawid Wysakowicz <wysakowicz.da...@gmail.com>:

> Hi Yassine,
>
> First of all notNext(A) is not equal to next(not A). notNext should be
> considered as a “stopCondition” which tells if an event matching the A
> condition occurs the current partial match is discarded. The next(not A) on
> the other hand accepts every event that do not match the A condition.
>
> So let’s analyze a sequence of events like “b c a1 a2 a3 d”. For the first
> version with next(not A) the output will be “c a1 a2 a3 d” which is what
> you expect, I think. In the other version with notNext(A) a partial match
> “c a1” will be discarded after “a2” as the notNext says that after the A’s
> there should be no A.
>
> I hope this helps understanding how notNext works.
>
> Regards,
> Dawid
>
> > On 22 Jul 2017, at 20:32, Yassine MARZOUGUI <y.marzou...@mindlytix.com>
> wrote:
> >
> > Hi all,
> >
> > I would like to match the maximal consecutive sequences of events of
> type A in a stream.
> > I'm using the following :
> > Pattern.begin("start").where(event is not A)
> > .next("middle").where(event is A).oneOrMore().consecutive()
> > .next("not").where(event is not A)
> > I This give the output I want. However if I use
> notNext("not").where(event is A) instead of next("not").where(event is not
> A), the middle patterns contain only sequences of single elements of type A.
> > My understaning is that notNext() in this case is equivalent to
> next(negation), so why is the output different?
> >
> > Thank you in advance.
> >
> > Best,
> > Yassine
>
>


notNext() and next(negation) not yielding same output in Flink CEP

2017-07-22 Thread Yassine MARZOUGUI
Hi all,

I would like to match the maximal consecutive sequences of events of type A
in a stream.
I'm using the following :

Pattern.begin("start").where(event is not A)

.next("middle").where(event is A).oneOrMore().consecutive()

.next("not").where(event is not A)

I This give the output I want. However if I use
notNext("not").where(event is A) instead of next("not").where(event is
not A), the middle patterns contain only sequences of single elements
of type A.
My understaning is that notNext() in this case is equivalent to
next(negation), so why is the output different?

Thank you in advance.

Best,
Yassine


Re: Behaviour of the BucketingSink when checkpoints fail

2017-04-28 Thread Yassine MARZOUGUI
Hi Aljoscha,

Thank you for your response. I guess then I will manually rename the
pending files. Does this however mean that the BucketingSink is not
exactly-once as it is described is the docs, since in this case (failure of
the job and failure of checkpoints) there will be duplicates? Or am I
missing something in the notion of exactly-once guarantees?

Best,
Yassine

2017-04-28 15:47 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:

> Hi,
> Yes, your analysis is correct. The pending files are not recognised as
> such because they were never in any checkpointed state that could be
> restored. I’m afraid it’s not possible to build the sink state just from
> the files existing in the output folder. The reason we have state in the
> first place is so that we can figure out what each of the files in the
> output folder are.
>
> Maybe you could manually move the pending files that you know are correct
> to “final”?
>
> Best,
> Aljoscha
>
> On 28. Apr 2017, at 11:22, Yassine MARZOUGUI <y.marzou...@mindlytix.com>
> wrote:
>
> Hi all,
>
> I'm have a failed job containing a BucketingSink. The last successful
> checkpoint was before the source started emitting data. The following
> checkpoints all failed due to the long timeout as I mentioned here :
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-td12762.html.
>
> The Taskmanager has then failed. Upon recovery, the pending fies did not
> move to finished state.
>
> Is that because the sink was not able to checkpoint to list of pending
> files?
> Is it possible to build the sink state just from the output folder and the
> suffixes of the files?
>
> Thanks,
> Yassine
>
>
>


Behaviour of the BucketingSink when checkpoints fail

2017-04-28 Thread Yassine MARZOUGUI
Hi all,

I'm have a failed job containing a BucketingSink. The last successful
checkpoint was before the source started emitting data. The following
checkpoints all failed due to the long timeout as I mentioned here :
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-td12762.html
.

The Taskmanager has then failed. Upon recovery, the pending fies did not
move to finished state.

Is that because the sink was not able to checkpoint to list of pending
files?
Is it possible to build the sink state just from the output folder and the
suffixes of the files?

Thanks,
Yassine


Re: Checkpoints very slow with high backpressure

2017-04-24 Thread Yassine MARZOUGUI
Hi Ufuk,

The ProcessFunction receives elements and buffers them into a MapState, and
periodically (for example every x seconds) register processing time timers
(according to some rules which it gets from a connected rule stream). When
a timer fires, I pop next element from state, request an external server,
and collect the response.
The requests to the external server should happen periodically and not
continuousely, that's why I control them using timers, and buffer elements
in the RocksdbState.

2017-04-24 13:48 GMT+02:00 Ufuk Celebi <u...@apache.org>:

> @Yessine: no, there is no way to disable the back pressure mechanism. Do
> you have more details about the two last operators? What do you mean with
> the process function is slow on purpose?
>
> @Rune: with 1.3 Flink will configure the internal buffers in a way that
> not too much data is buffered in the internal buffers (
> https://issues.apache.org/jira/browse/FLINK-4545). You could try the
> current master and check whether it improves the checkpointing behaviour
> under back pressure. Out of curiosity, are you using the async I/O API for
> the communication with the external REST service (https://ci.apache.org/
> projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html)?
>
> – Ufuk
>
>
> On Mon, Apr 24, 2017 at 11:08 AM, Rune Skou Larsen <r...@trifork.com>
> wrote:
>
>> Sorry I cant help you, but we're also experiencing slow checkpointing,
>> when having backpressure from sink.
>>
>> I tried HDFS, S3, and RocksDB state backends, but to no avail -
>> checkpointing always times out with backpressure.
>>
>> Can we somehow reduce Flink's internal buffer sizes, so checkpointing
>> with backpressure becomes faster?
>>
>> - Rune
>>
>> ---
>>
>> Our current setup - (improvement suggestions welome!):
>>
>> Flink 1.2.0,  yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge
>>
>> program_parallelism: 12taskmanagers: 6slotsPerTaskManager: 
>> 4taskmanager_heap_mb: 4096jobmanager_heap_mb: 1024
>>
>> Basic program structure:
>>
>> 1) read batch from Kinesis
>>
>> 2) Split batch and shuffle using custom partitioner (consistent hashing).
>>
>> 3) enrich using external REST service
>>
>> 4) Write to database (This step is the bottleneck)
>> On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
>>
>> Im sorry guys if you received multiple instances of this mail, I kept
>> trying to send it yesterday, but looks like the mailing list was stuck and
>> didn't dispatch it until now. Sorry for the disturb.
>> On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <y.marzou...@mindlytix.com>
>> wrote:
>>>
>>> Hi all,
>>> I have a Streaming pipeline as follows:
>>> 1 - read a folder continuousely from HDFS
>>> 2 - filter duplicates (using keyby(x->x) and keeping a state per key
>>> indicating whether its is seen)
>>> 3 - schedule some future actions on the stream using ProcessFunction and
>>> processing time timers (elements are kept in a MapState)
>>> 4- write results back to HDFS using a BucketingSink.
>>> I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).
>>> Currenlty the source contain just one a file of 1GB, so that's the
>>> maximum state that the job might hold. I noticed that the backpressure on
>>> the operators #1 and #2 is High, and the split reader has only read 60 Mb
>>> out of 1Gb source source file. I suspect this is because the
>>> ProcessFunction is slow (on purpose). However looks like this affected the
>>> checkpoints which are failing after the timeout (which is set to 2 hours),
>>> see attached screenshot.
>>> ​
>>> In the job manager logs I keep getting warnings :
>>>
>>> 2017-04-23 19:32:38,827 WARN  
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received 
>>> late message for now expired checkpoint attempt 8 from 
>>> 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
>>>
>>> Is the high backpressure the cause for the checkpoints being too slow?
>>> If yes Is there a way to disbale the backpressure mechanism since the
>>> records will be buffered in the rocksdb state after all which is backed by
>>> the disk?
>>> Thank you.
>>> Best,
>>> Yassine
>>>
>> --
>>
>> Venlig hilsen/Best regards *Rune Skou Larsen*
>>
>> [image: goto] Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark
>> Phone +45 3160 2497 <+45%2031%2060%2024%2097> Skype: rsltrifork Twitter:
>> RuneSkouLarsen
>>
>
>


Fwd: Checkpoints very slow with high backpressure

2017-04-24 Thread Yassine MARZOUGUI
-- Forwarded message --
From: "Yassine MARZOUGUI" <y.marzou...@mindlytix.com>
Date: Apr 23, 2017 20:53
Subject: Checkpoints very slow with high backpressure
To: <user@flink.apache.org>
Cc:

Hi all,

I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key
indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and
processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum
state that the job might hold. I noticed that the backpressure on the
operators #1 and #2 is High, and the split reader has only read 60 Mb out
of 1Gb source source file. I suspect this is because the ProcessFunction is
slow (on purpose). However looks like this affected the checkpoints which
are failing after the timeout (which is set to 2 hours), see attached
screenshot.


​
In the job manager logs I keep getting warnings :

2017-04-23 19:32:38,827 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
Received late message for now expired checkpoint attempt 8 from
210769a077c67841d980776d8caece0a of job
6c7e44d205d738fc8a6cb4da181d2d86.

Is the high backpressure the cause for the checkpoints being too slow? If
yes Is there a way to disbale the backpressure mechanism since the records
will be buffered in the rocksdb state after all which is backed by the disk?

Thank you.

Best,
Yassine


Re: Checkpoints very slow with high backpressure

2017-04-24 Thread Yassine MARZOUGUI
Im sorry guys if you received multiple instances of this mail, I kept
trying to send it yesterday, but looks like the mailing list was stuck and
didn't dispatch it until now. Sorry for the disturb.

On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <y.marzou...@mindlytix.com>
wrote:

> Hi all,
>
> I have a Streaming pipeline as follows:
> 1 - read a folder continuousely from HDFS
> 2 - filter duplicates (using keyby(x->x) and keeping a state per key
> indicating whether its is seen)
> 3 - schedule some future actions on the stream using ProcessFunction and
> processing time timers (elements are kept in a MapState)
> 4- write results back to HDFS using a BucketingSink.
>
> I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).
>
> Currenlty the source contain just one a file of 1GB, so that's the maximum
> state that the job might hold. I noticed that the backpressure on the
> operators #1 and #2 is High, and the split reader has only read 60 Mb out
> of 1Gb source source file. I suspect this is because the ProcessFunction is
> slow (on purpose). However looks like this affected the checkpoints which
> are failing after the timeout (which is set to 2 hours), see attached
> screenshot.
>
>
> ​
> In the job manager logs I keep getting warnings :
>
> 2017-04-23 19:32:38,827 WARN  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
> message for now expired checkpoint attempt 8 from 
> 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
>
> Is the high backpressure the cause for the checkpoints being too slow? If
> yes Is there a way to disbale the backpressure mechanism since the records
> will be buffered in the rocksdb state after all which is backed by the disk?
>
> Thank you.
>
> Best,
> Yassine
>
>


Checkpoints very slow with high backpressure

2017-04-24 Thread Yassine MARZOUGUI
Hi all,

I have a streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key
indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and
processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum
state that the job might hold. I noticed that the backpressure on the
operators #1 and #2 is High, and the split reader has only read 60 Mb out
of 1Gb source source file. I suspect this is because the ProcessFunction is
slow (on purpose). However looks like this affected the checkpoints which
are failing after the timeout (which is set to 2 hours), see attached
screenshot.

In the job manager logs I keep getting warnings :

2017-04-23 19:32:38,827 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
late message for now expired checkpoint attempt 8 from
210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.

Is the high backpressure the cause for the checkpoints being too slow? If
yes Is there a way to disbale the backpressure mechanism since the records
will be buffered in the rocksdb state after all which is backed by the disk?

Any help is appreciated. Thank you.

Best,
Yassine


Re: Flink Checkpoint runs slow for low load stream

2017-04-24 Thread Yassine MARZOUGUI
Hi all,

I am experiencing a similar problem but with HDFS as a source instead of
Kafka. I have a streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key
indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and
processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum
state that the job might hold. I noticed that the backpressure on the
operators #1 and #2 is High, and the split reader has only read 60 Mb out
of 1Gb source source file. I suspect this is because the ProcessFunction is
slow (on purpose). However looks like this affected the checkpoints which
are failing after the timeout (which is set to 2 hours).

In the job manager logs I keep getting warnings :

2017-04-23 19:32:38,827 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Received late message for now expired checkpoint attempt 8 from
210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.

Is the high backpressure the cause for the checkpoints being too slow? If
yes Is there a way to disbale the backpressure mechanism since the records
will be buffered in the rocksdb state after all which is backed by the disk?

Any help is appreciated. Thank you.

Best,
Yassine

On Jan 5, 2017 12:25, "Chakravarthy varaga" 
wrote:

> BRILLIANT !!!
>
> Checkpoint times are consistent with 1.1.4...
>
> Thanks for your formidable support !
>
> Best Regards
> CVP
>
> On Wed, Jan 4, 2017 at 5:33 PM, Fabian Hueske  wrote:
>
>> Hi CVP,
>>
>> we recently release Flink 1.1.4, i.e., the next bugfix release of the
>> 1.1.x series with major robustness improvements [1].
>> You might want to give 1.1.4 a try as well.
>>
>> Best, Fabian
>>
>> [1] http://flink.apache.org/news/2016/12/21/release-1.1.4.html
>>
>> 2017-01-04 16:51 GMT+01:00 Chakravarthy varaga 
>> :
>>
>>> Hi Stephan, All,
>>>
>>>  I just got a chance to try if 1.1.3 fixes slow check pointing on FS
>>> backend. It seemed to have been fixed. Thanks for the fix.
>>>
>>>  While testing this, with varying check point intervals, there seem
>>> to be Spikes of slow checkpoints every 30/40 seconds for an interval of 15
>>> secs. The check point time lasts for about 300 ms as apposed to 10/20 ms.
>>>  Basically 15 secs seem to be the nominal value so far. anything
>>> below this interval shoots the spikes too often. For us living with 15 sec
>>> recovery is do-able and eventually catch up on recovery !
>>>
>>> Best Regards
>>> CVP
>>>
>>> On Tue, Oct 4, 2016 at 6:20 PM, Chakravarthy varaga <
>>> chakravarth...@gmail.com> wrote:
>>>
 Thanks for your prompt response Stephan.

 I'd wait for Flink 1.1.3 !!!

 Best Regards
 Varaga

 On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen  wrote:

> The plan to release 1.1.3 is asap ;-)
>
> Waiting for last backported patched to get in, then release testing
> and release.
>
> If you want to test it today, you would need to manually build the
> release-1.1 branch.
>
> Best,
> Stephan
>
>
> On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga <
> chakravarth...@gmail.com> wrote:
>
>> Hi Gordon,
>>
>>  Do I need to clone and build release-1.1 branch to test this?
>>  I currently use flinlk 1.1.2 runtime. When is the plan to
>> release it in 1.1.3?
>>
>> Best Regards
>> Varaga
>>
>> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <
>> tzuli...@apache.org> wrote:
>>
>>> Hi,
>>>
>>> Helping out here: this is the PR for async Kafka offset committing -
>>> https://github.com/apache/flink/pull/2574.
>>> It has already been merged into the master and release-1.1 branches,
>>> so you can try out the changes now if you’d like.
>>> The change should also be included in the 1.1.3 release, which the
>>> Flink community is discussing to release soon.
>>>
>>> Will definitely be helpful if you can provide feedback afterwards!
>>>
>>> Best Regards,
>>> Gordon
>>>
>>>
>>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (
>>> chakravarth...@gmail.com) wrote:
>>>
>>> Hi Stephan,
>>>
>>> Is the Async kafka offset commit released in 1.3.1?
>>>
>>> Varaga
>>>
>>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <
>>> chakravarth...@gmail.com> wrote:
>>>
 Hi Stephan,

  That should be great. Let me know once the fix is done and the
 snapshot version to use, I'll check and revert then.
  Can you also share the JIRA that 

Checkpoints very slow with high backpressure

2017-04-24 Thread Yassine MARZOUGUI
Re-sending as it looks like the previous mail wasn't correctly sent

---

Hi all,

I have a streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key
indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and
processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum
state that the job might hold. I noticed that the backpressure on the
operators #1 and #2 is High, and the split reader has only read 60 Mb out
of 1Gb source source file. I suspect this is because the ProcessFunction is
slow (on purpose). However looks like this affected the checkpoints which
are failing after the timeout (which is set to 2 hours).

In the job manager logs I keep getting warnings :

2017-04-23 19:32:38,827 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late
message for now expired checkpoint attempt 8 from
210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.

Is the high backpressure the cause for the checkpoints being too slow? If
yes Is there a way to disbale the backpressure mechanism since the records
will be buffered in the rocksdb state after all which is backed by the disk?

Any help is appreciated. Thank you.

Best,
Yassine


Checkpoints very slow with high backpressure

2017-04-24 Thread Yassine MARZOUGUI



Checkpoints very slow with high backpressure

2017-04-24 Thread Yassine Marzougui



Checkpoints very slow with high backpressure

2017-04-24 Thread Yassine MARZOUGUI
Hi all,

I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key
indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and
processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum
state that the job might hold. I noticed that the backpressure on the
operators #1 and #2 is High, and the split reader has only read 60 Mb out
of 1Gb source source file. I suspect this is because the ProcessFunction is
slow (on purpose). However looks like this affected the checkpoints which
are failing after the timeout (which is set to 2 hours), see attached
screenshot.


​
In the job manager logs I keep getting warnings :

2017-04-23 19:32:38,827 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
Received late message for now expired checkpoint attempt 8 from
210769a077c67841d980776d8caece0a of job
6c7e44d205d738fc8a6cb4da181d2d86.

Is the high backpressure the cause for the checkpoints being too slow? If
yes Is there a way to disbale the backpressure mechanism since the records
will be buffered in the rocksdb state after all which is backed by the disk?

Thank you.

Best,
Yassine


Re: Windows emit results at the end of the stream

2017-03-23 Thread Yassine MARZOUGUI
Hi Sonex,

When using readTextFile(...) with event time, only one watermark with the
value Long.MAX_VALUE is sent at the end of the stream, which explais why
the windows are stored until the whole file is processed. In order to have
periodic watermarks, you need to process the file continuousely as folows:

TextInputFormat inputFormat = new TextInputFormat(new Path("file/to/read.txt"));
env.readFile(inputFormat,"file/to/read.txt",
FileProcessingMode.PROCESS_CONTINUOUSLY,1L,
TypeInformation.of(String.class))
   .map(...)

Hope this helps.

Best,
Yassine

2017-03-23 9:47 GMT+01:00 Sonex :

> Hi everyone,
>
> I am using a simple window computation on a stream with event time. The
> code
> looks like this:
>
> streamData.readTextFile(...)
> .map(...)
> .assignAscendingTimestamps(_.timestamp)
> .keyBy(_.id)
> .timeWindow(Time.seconds(3600),Time.seconds(3600))
> .apply(new MyWindowFunction)
> .map(...)
>
> By monitoring the memory usage and the flink web dashboard, I noticed that
> flink applies the window function until the entire stream finishes (thus
> storing all aggregations in memory) and then continues to the map
> transformation. What I would expect is emission of window results to the
> map
> transformation as soon as results of the window are ready.
>
> Can anyone explain this behavior?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Windows-emit-
> results-at-the-end-of-the-stream-tp12337.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


ProcessingTimeTimer in ProcessFunction after a savepoint

2017-03-17 Thread Yassine MARZOUGUI
Hi all,

How does the processing time timer behave when a job is taken down with a
savepoint and then restarted after the timer was supposed to fire? Will the
timer fire at restart because it was missed during the savepoint?

I'm wondering because I would like to schedule periodic timers in the
future (in processing time) at which a state is read and emitted, but I'm
afraid the timer will never fire if it occurs when the job is being down,
and therefore the state will never be emitted.

Best,
Yassine


Re: Appropriate State to use to buffer events in ProcessFunction

2017-03-16 Thread Yassine MARZOUGUI
Hi Xiaogang,

Indeed, the MapState is what I was looking for in order to have efficient
sorted state, as it would faciliate many use cases like this one, or
joining streams, etc. I searched a bit and found your contribution
<https://github.com/apache/flink/pull/3336> of MapState for the next 1.3
release, I'll see how it works for me.
Thank you for pointing this out, very helpful!

Best,
Yassine

2017-03-16 18:50 GMT+01:00 SHI Xiaogang <shixiaoga...@gmail.com>:

> Hi Yassine,
>
> If I understand correctly, you are needing sorted states which
> unfortunately are not supported in Flink now.
> We have some ideas to provide such sorted states to facilitate the
> development of user applications. But it is still under discussion due to
> the concerns on back compatibility.
>
> Currently, I think we can work around the problem with MapStates in
> RocksDB statebackends.
> In RocksDB statebackends, each entry in MapState corresponds to an entry
> in RocksDB. The key of a RocksDB entry is formatted as "
> keyGroup#key#keyLen#namespace#namespaceLen#mapKey"
>
> The entries in RocksDB are sorted in the lexicographical order. In the
> cases where the map keys are typed Timestamp/Long, the entries in the
> MapState will be iterated as the same order in a sorted map. Thus, you can
> find all the events whose timestamps are smaller than the given one.
>
> The solution is quite tricky because it does not work when Heap
> statebackends are used. But given that the state may grow up to ~100GB,
> RocksDB statebackends are strongly recommended.
>
> May the information helps you.
>
> Regards,
> Xiaogang
>
> 2017-03-09 23:19 GMT+08:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>
>> Hi Timo,
>>
>> I thought about the ListState but quickly discarded It as it keeps the
>> insersion order and not events order. After a second thought I think I will
>> reconsider it since my events are occaionally out-of-order. Didn't know
>> that Flink CEP operators 'next' and 'within', can handle event time, so I
>> think I will give it a try! Thank you!
>>
>> Best,
>> Yassine
>>
>> 2017-03-08 9:55 GMT+01:00 Timo Walther <twal...@apache.org>:
>>
>>> Hi Yassine,
>>>
>>> have you thought about using a ListState? As far as I know, it keeps at
>>> least the insertion order. You could sort it once your trigger event has
>>> arrived.
>>> If you use a RocksDB as state backend, 100+ GB of state should not be a
>>> problem. Have you thought about using Flink's CEP library? It might fit to
>>> your needs without implementing a custom process function.
>>>
>>> I hope that helps.
>>>
>>> Timo
>>>
>>>
>>> Am 07/03/17 um 19:23 schrieb Yassine MARZOUGUI:
>>>
>>> Hi all,
>>>>
>>>> I want to label events in a stream based on a condition on some future
>>>> events.
>>>> For example my stream contains events of type A and B and and I would
>>>> like to assign a label 1 to an event E of type A if an event of type B
>>>> happens within a duration x of E. I am using event time and my events can
>>>> be out of order.
>>>> For this I'm using ProcessFunction which looks suitable for my use
>>>> case. In order to handle out of order events, I'm keeping events of type A
>>>> in a state and once an event of type B is received, I fire an event time
>>>> timer in which I loop through events of type A in the state having a
>>>> timestamps < timer.timestamp, label them and remove them from the state.
>>>> Currently the state is simply a value state containing a
>>>> TreeMap<Timestamp, EventA>. I'm keeping events sorted in order to
>>>> effectively get events older than the timer timestamp.
>>>> I wonder If that's the appropriate data structure to use in the value
>>>> state to buffer events and be able to handle out of orderness, or if there
>>>> is a more effective implementation, especially that the state may grow to
>>>> reach ~100 GB sometimes?
>>>>
>>>> Any insight is appreciated.
>>>>
>>>> Thanks,
>>>> Yassine
>>>>
>>>>
>>>>
>>>>
>>>
>>
>


Re: Late Events with BoundedOutOfOrdernessTimestampExtractor and allowed lateness

2017-03-15 Thread Yassine MARZOUGUI
Hi Nico,

You might check Fabian's answer on a similar question I posted previousely
on the mailing list, it can be helpful :
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/BoundedOutOfOrdernessTimestampExtractor-and-allowedlateness-td9583.html

Best,
Yassine

On Mar 15, 2017 18:58, "Nico"  wrote:

> Hi,
>
> I struggle a bit to understand the difference between
> BoundedOutOfOrdernessTimestampExtractor and the allowed lateness function
> of a window...
>
> As I understand it, when I use BoundedOutOfOrdernessTimestampExtractor
> the watermark is lagging behind the real event time of the stream with 
> maxOutOfOrderness.
> So for the window trigger late events won't be late as the watermark is
> lagging.
>
> In Contrast, allowed lateness will wait after the watermark passed. So
> using allowed lateness a window could fire twice - after the watermark and
> the arrival of late events.
>
> Is that the only difference? What would happen if I use both? Do I have a
> double delay?
>
> Does a best practice exist when to BoundedOutOfOrdernessTimestampExtractor
> or allowed latesness?
>
> Best regards,
> Nico
>


Re: Question about processElement(...) and onTimer(...) in ProcessFunction

2017-03-15 Thread Yassine MARZOUGUI
Thank you Fabian for you answer.

Best,
Yassine

On Mar 14, 2017 09:31, "Fabian Hueske" <fhue...@gmail.com> wrote:

> Hi Yassine,
>
> as far as I know, the processElement() and onTimer() methods are not
> concurrently called.
> This is definitely true for event-time timers (they are triggered by
> watermarks which are internally handled as records) and I'm pretty sure
> that the behavior is the same for processing time timers.
> @Kostas (in CC) please correct me if I'm wrong here.
>
> Best, Fabian
>
> 2017-03-14 8:04 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>
>> Hi all,
>>
>> In ProcessFuction, does processElement() still get called on incoming
>> elements when onTimer() is called, or are elements buffered until onTimer()
>> returns?
>> I am wondering because both processElement() and onTimer() can access and
>> manipulate the state, so if for example state.clear() is called at the end
>> of onTimer() is there a risk that a state update by processElement() that
>> happened during the onTimer() call get cleared? Thank you in advance.
>>
>> Best,
>> Yassine
>>
>
>


Question about processElement(...) and onTimer(...) in ProcessFunction

2017-03-14 Thread Yassine MARZOUGUI
Hi all,

In ProcessFuction, does processElement() still get called on incoming
elements when onTimer() is called, or are elements buffered until onTimer()
returns?
I am wondering because both processElement() and onTimer() can access and
manipulate the state, so if for example state.clear() is called at the end
of onTimer() is there a risk that a state update by processElement() that
happened during the onTimer() call get cleared? Thank you in advance.

Best,
Yassine


Re: Appropriate State to use to buffer events in ProcessFunction

2017-03-09 Thread Yassine MARZOUGUI
Hi Timo,

I thought about the ListState but quickly discarded It as it keeps the
insersion order and not events order. After a second thought I think I will
reconsider it since my events are occaionally out-of-order. Didn't know
that Flink CEP operators 'next' and 'within', can handle event time, so I
think I will give it a try! Thank you!

Best,
Yassine

2017-03-08 9:55 GMT+01:00 Timo Walther <twal...@apache.org>:

> Hi Yassine,
>
> have you thought about using a ListState? As far as I know, it keeps at
> least the insertion order. You could sort it once your trigger event has
> arrived.
> If you use a RocksDB as state backend, 100+ GB of state should not be a
> problem. Have you thought about using Flink's CEP library? It might fit to
> your needs without implementing a custom process function.
>
> I hope that helps.
>
> Timo
>
>
> Am 07/03/17 um 19:23 schrieb Yassine MARZOUGUI:
>
> Hi all,
>>
>> I want to label events in a stream based on a condition on some future
>> events.
>> For example my stream contains events of type A and B and and I would
>> like to assign a label 1 to an event E of type A if an event of type B
>> happens within a duration x of E. I am using event time and my events can
>> be out of order.
>> For this I'm using ProcessFunction which looks suitable for my use case.
>> In order to handle out of order events, I'm keeping events of type A in a
>> state and once an event of type B is received, I fire an event time timer
>> in which I loop through events of type A in the state having a timestamps <
>> timer.timestamp, label them and remove them from the state.
>> Currently the state is simply a value state containing a
>> TreeMap<Timestamp, EventA>. I'm keeping events sorted in order to
>> effectively get events older than the timer timestamp.
>> I wonder If that's the appropriate data structure to use in the value
>> state to buffer events and be able to handle out of orderness, or if there
>> is a more effective implementation, especially that the state may grow to
>> reach ~100 GB sometimes?
>>
>> Any insight is appreciated.
>>
>> Thanks,
>> Yassine
>>
>>
>>
>>
>


Appropriate State to use to buffer events in ProcessFunction

2017-03-07 Thread Yassine MARZOUGUI
Hi all,

I want to label events in a stream based on a condition on some future
events.
For example my stream contains events of type A and B and and I would like
to assign a label 1 to an event E of type A if an event of type B happens
within a duration x of E. I am using event time and my events can be out of
order.
For this I'm using ProcessFunction which looks suitable for my use case. In
order to handle out of order events, I'm keeping events of type A in a
state and once an event of type B is received, I fire an event time timer
in which I loop through events of type A in the state having a timestamps <
timer.timestamp, label them and remove them from the state.
Currently the state is simply a value state containing a TreeMap. I'm keeping events sorted in order to effectively get events older
than the timer timestamp.
I wonder If that's the appropriate data structure to use in the value state
to buffer events and be able to handle out of orderness, or if there is a
more effective implementation, especially that the state may grow to reach
~100 GB sometimes?

Any insight is appreciated.

Thanks,
Yassine


Re: AsyncIO/QueryableStateClient hanging with high parallelism

2017-03-06 Thread Yassine MARZOUGUI
I think I found the reason for what happened. The way I used the
QueryableStateClient is that I wrapped scala.concurrent.Future in a
FlinkFuture and then called FlinkFuture.thenAccept. It turns out
thenAccept doesn't
throw exceptions and when an exception happens (which likely happened once
I inreased the parallelism) the job simply doesn't finish. I solved the
problem by using resultFuture.get()which araised the appropriate exceptions
when they happens and failed the job.

Best,
Yassine

2017-03-06 15:53 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

> Hi all,
>
> I set up a job with simple queryable state sink and tried to query it from
> another job using the new Async I/O API. Everything worked as expected,
> except when I tried to increase the parallelism of the querying job it
> hanged.
> As you can see in the attached image, when the parallism is 5 (even <5)
> the job finishes within 5 seconds, but when it is >5 it hangs. Any Idea of
> what might be causing this behaviour? Thank you.
>
> Best,
> Yassine
>


AsyncIO/QueryableStateClient hanging with high parallelism

2017-03-06 Thread Yassine MARZOUGUI
Hi all,

I set up a job with simple queryable state sink and tried to query it from
another job using the new Async I/O API. Everything worked as expected,
except when I tried to increase the parallelism of the querying job it
hanged.
As you can see in the attached image, when the parallism is 5 (even <5) the
job finishes within 5 seconds, but when it is >5 it hangs. Any Idea of what
might be causing this behaviour? Thank you.

Best,
Yassine


OutOfMemory error (Direct buffer memory) while allocating the TaskManager off-heap memory

2017-03-03 Thread Yassine MARZOUGUI
Hi all,

I tried starting a local Flink 1.2.0 cluster using start-local.sh, with the
following settings for the taskmanager memory:

taskmanager.heap.mb: 16384
taskmanager.memory.off-heap: true
taskmanager.memory.preallocate: true

That throws and OOM error:
Caused by: java.lang.Exception: OutOfMemory error (Direct buffer memory)
while allocating the TaskManager off-heap memory (39017161219 bytes). Try
increasing the maximum direct memory (-XX:MaxDirectMemorySize)

However If I add an obsolute taskmanager.memory.size:
taskmanager.memory.size: 15360
the cluster starts successfully.

My understanding is that if taskmanager.memory.size is unspecified then it
should be equal to 0.7 * taskmanager.heap.mb. So I don't understand why it
throws an exception and it works if its larger than that fraction.

Any help is appreciated.

Best,
Yassine


Re: Http Requests from Flink

2017-03-02 Thread Yassine MARZOUGUI
Hi Ulf,

I've done HTTP requests in Flink using OkHttp library
. I found it practical and easy to use.
Here is how I used it to make a POST request for each incoming element in
the stream and ouput the response:

DataStream stream = 

stream.map(new RichMapFunction() {

OkHttpClient client;

@Override
public void open(Configuration config) throws IOException {
client = new OkHttpClient();
}

@Override
public String map(String in) throws Exception {

okhttp3.Request request = new okhttp3.Request.Builder()
.url("http://localhost:8080;)
.post(RequestBody.create(MediaType.parse("text/plain;
charset=utf-8"), in))
.build();
Response response = client.newCall(request).execute();
if (response.code() != 200) {
throw new Exception("Failed request");
}
String result = response.body().string();
return result;
}
})

I hope this helps.

Best,
Yassine


2017-03-02 14:17 GMT+01:00 Alex De Castro :

> Hi Ulf,
>
> I’ve had similar problem, before but from a sink perspective: I had to
> create a HTTP sink for a Kafka REST API. I’ve used scalaj-http
> https://github.com/scalaj/scalaj-http which is a wrapper for the
> corresponding Java lib.
>
>
>
> For example,
>
> https://github.com/scalaj/scalaj-http
>
>
>
> For example
>
>
>
> *class *HttpSink *extends *SinkFunction[Message]{
>   *private val **secretkey *= *new *GetToken().*token *
>
>   *def *sendMessage(message: Message):String = Http("
> http://XXX.XXX.XX.XXX:5000/api/message;) // <-- GLOBAL var
> .header("Content-Type","application/json")
> .header("Authorization", s"Bearer *$**secretkey*")
> .postData(message.*data*).asString.body
>
>   @throws[Exception]
>   *override def *invoke(message: Message): Unit = {
> log.info(sendMessage(message))
>   }
> }
>
>
>
> I image for a http source, you could send a request to the REST API
> periodically and convert the micro-batches into a stream. I’d love to know
> about other alternatives.
>
>
>
> Cheers,
>
> Alex
>
> *From: *Ulf Thomas 
> *Reply-To: *"user@flink.apache.org" 
> *Date: *Thursday, March 2, 2017 at 12:58 PM
> *To: *"user@flink.apache.org" 
> *Subject: *Http Requests from Flink
>
>
>
> Hello,
>
>
>
> I've been trying to perform HTTP requests from a Flink Program but I
> wasn't successful :-(.
>
>
>
> Does anybody here has done this before and can point me to an working
> library?
>
>
>
> I've attached a small demo project in case someone wants to try to solve
> this.
>
>
>
> Best,
>
>
>
> --
>
> --
> Ulf Thomas
> Software Developer
> relayr
> This email and any attachments may contain information which is
> confidential and/or privileged. The information is intended exclusively for
> the addressee and the views expressed may not be official policy, but the
> personal views of the originator. If you are not the intended recipient, be
> aware that any disclosure, copying, distribution or use of the contents is
> prohibited. If you have received this email and any file transmitted with
> it in error, please notify the sender by telephone or return email
> immediately and delete the material from your computer. Internet
> communications are not secure and Lab49 is not responsible for their abuse
> by third parties, nor for any alteration or corruption in transmission, nor
> for any damage or loss caused by any virus or other defect. Lab49 accepts
> no liability or responsibility arising out of or in any way connected to
> this email.
>


Re: Flink requesting external web service with rate limited requests

2017-02-28 Thread Yassine MARZOUGUI
Hi Fabian,

I have a related question regarding throttling at the source: If there is a
sleep in the source as in ContinuousFileMonitoringFunction.java

:

while (isRunning) {
synchronized (checkpointLock) {
monitorDirAndForwardSplits(fileSystem, context);
}
Thread.sleep(interval);
}

Does it also block checkpoints?
Thanks.

Best,
Yassine

2017-02-28 10:39 GMT+01:00 Fabian Hueske :

> Hi Giuliano,
>
> Flink 1.2 introduced the AsyncFunction which asynchronously sends requests
> to external systems (k-v-stores, web services, etc.).
> You can limit the number of concurrent requests, but AFAIK you cannot
> specify a limit of requests per minute.
> Maybe you can configure the function such that it works for your use case.
>
> Alternatively, you can take it as a blueprint for a custom operator
> because handles watermarks and checkpoints correctly.
>
> I am not aware of a built-in mechanism to throttle a stream. You can do it
> manually and simply sleep() in a MapFunction but that will also block
> checkpoints.
>
> Best, Fabian
>
> 2017-02-28 3:19 GMT+01:00 Giuliano Caliari :
>
>> Hello,
>>
>> I have an interesting problem that I'm having a hard time modeling on
>> Flink,
>> I'm not sure if it's the right tool for the job.
>>
>> I have a stream of messages in Kafka that I need to group and send them to
>> an external web service but I have some concerns that need to be
>> addressed:
>>
>> 1. Rate Limited requests => Only tens of requests per minute. If the limit
>> is exceeded the system has to stop making requests for a few minutes.
>> 2. Crash handling => I'm using savepoints
>>
>> My first (naive) solution was to implement on a Sink function but the
>> requests may take a long time to return (up to minutes) so blocking the
>> thread will interfere with the savepoint mechanism (see  here
>> > nabble.com/Rate-limit-processing-td11174.html>
>> ). Because of this implementing the limit on the sink and relying on
>> backpressure to slow down the flow will get in the way of savepointing.
>> I'm
>> not sure how big of a problem this will be but on my tests I'm reading
>> thousands of messages before the backpressure mechanism starts and
>> savepointing is taking around 20 minutes.
>>
>> My second implementation was sleeping on the Fetcher for the Kafka
>> Consumer
>> but the ws requests time have a huge variance so I ended up implementing a
>> communication channel between the sink and the source - an object with
>> mutable state. Not great.
>>
>> So my question is if there is a nice way to limit the flow of messages on
>> the system according to the rate given by a sink function? Is there any
>> other way I could make this work on Flink?
>>
>> Thank you
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Flink-requesting-
>> external-web-service-with-rate-limited-requests-tp11952.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


RE: Aggregation problem.

2017-02-18 Thread Yassine MARZOUGUI
Hi,

I think this is an expected output and not necessarily a bug. To get the
element having the maximum value, maxBy() should be used instead of max().

See this answer for more details :
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Wrong-and-non-consistent-behavior-of-max-tp484p488.html

Best,
Yassine

On Feb 18, 2017 12:28, "Kürşat Kurt"  wrote:

Ok, i have opened the issue with the test case.

Thanks.



https://issues.apache.org/jira/browse/FLINK-5840





*From:* Fabian Hueske [mailto:fhue...@gmail.com]
*Sent:* Saturday, February 18, 2017 3:33 AM
*To:* user@flink.apache.org
*Subject:* Re: Aggregation problem.



Hi,

this looks like a bug to me.

Can you open a JIRA and maybe a small testcase to reproduce the issue?

Thank you,

Fabian



2017-02-18 1:06 GMT+01:00 Kürşat Kurt :

Hi;



I have a Dataset like this:



*(**0,Auto,0.4,1,5.8317538999854194E-5)*

*(0,Computer,0.2,1,4.8828125E-5)*

*(0,Sports,0.4,2,1.7495261699956258E-4)*

*(1,Auto,0.4,1,1.7495261699956258E-4)*

*(1,Computer,0.2,1,4.8828125E-5)*

*(1,Sports,0.4,1,5.8317538999854194E-5)*



This code; *ds.groupBy(0).max(4).print() *prints :



*(0,Sports,0.4,1,1.7495261699956258E-4)*

*(1,Sports,0.4,1,1.7495261699956258E-4)*



..but i am expecting



*(0,Sports,0.4,2,1.7495261699956258E-4)*

*(1,Auto,0.4,1,1.7495261699956258E-4)*



What is wrong with this code?


Re: JavaDoc 404

2017-02-14 Thread Yassine MARZOUGUI
Hi Robert,

Thanks for reporting back!
The docs look much better now.

Cheers, Yassine


On Feb 14, 2017 14:30, "Robert Metzger" <rmetz...@apache.org> wrote:

HI Yassine,

I've now fixed the javadocs build. It is already rebuild for the 1.2
javadocs: https://ci.apache.org/projects/flink/flink-docs-
release-1.2/api/java/org/apache/flink/streaming/connectors/fs/bucketing/
BucketingSink.html
The build for master should be done in 30 minutes.


On Wed, Feb 8, 2017 at 10:49 AM, Yassine MARZOUGUI <
y.marzou...@mindlytix.com> wrote:

> Thanks Robert and Ufuk for the update.
>
> 2017-02-07 18:43 GMT+01:00 Robert Metzger <rmetz...@apache.org>:
>
>> I've filed a JIRA for the issue: https://issues.apache.o
>> rg/jira/browse/FLINK-5736
>>
>> On Tue, Feb 7, 2017 at 5:00 PM, Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> Yes, I'll try to fix it asap. Sorry for the inconvenience.
>>>
>>> On Mon, Feb 6, 2017 at 4:43 PM, Ufuk Celebi <u...@apache.org> wrote:
>>>
>>>> Thanks for reporting this. I think Robert (cc'd) is working in fixing
>>>> this, correct?
>>>>
>>>> On Sat, Feb 4, 2017 at 12:12 PM, Yassine MARZOUGUI
>>>> <y.marzou...@mindlytix.com> wrote:
>>>> > Hi,
>>>> >
>>>> > The JavaDoc link of BucketingSink in this page[1] yields to a 404
>>>> error. I
>>>> > couldn't find the correct url.
>>>> > The broken link :
>>>> > https://ci.apache.org/projects/flink/flink-docs-master/api/j
>>>> ava/org/apache/flink/streaming/connectors/fs/bucketing/Bucke
>>>> tingSink.html
>>>> >
>>>> > Other pages in the JavaDoc, like this one[2], seem lacking formatting,
>>>> > because
>>>> > https://ci.apache.org/projects/flink/flink-docs-master/api/j
>>>> ava/stylesheet.css
>>>> > and
>>>> > https://ci.apache.org/projects/flink/flink-docs-master/api/j
>>>> ava/script.js
>>>> > are not found (404).
>>>> >
>>>> > Best,
>>>> > Yassine
>>>> >
>>>> >
>>>> > [1] :
>>>> > https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>>>> dev/connectors/filesystem_sink.html
>>>> > [2] :
>>>> > https://ci.apache.org/projects/flink/flink-docs-master/api/j
>>>> ava/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.html
>>>>
>>>
>>>
>>
>


Re: Flink 1.2 Maven dependency

2017-02-09 Thread Yassine MARZOUGUI
Hi,

I coud find the dependency here :
https://search.maven.org/#artifactdetails%7Corg.apache.flink%7Cflink-core%7C1.2.0%7Cjar
, I wonder why it still doesn't show in http://mvnrepository.com/
artifact/org.apache.flink/flink-core.

The dependency version for Flink 1.2 is 1.2.0.


org.apache.flink
flink-core
1.2.0


Best,
Yassine


On Feb 9, 2017 20:39, "Dominik Safaric"  wrote:

Hi,

I’ve been trying to use the Flink 1.2 Maven dependency, but unfortunately I
was not able to retrieve it.

In addition, I cannot find the 1.2 version neither on the repository
website (e.g. Flink core http://mvnrepository.com/artifact/org.apache.flink/
flink-core).

Could someone explain why there isn’t a Maven dependency available yet?

Thanks,
Dominik


Re: JavaDoc 404

2017-02-08 Thread Yassine MARZOUGUI
Thanks Robert and Ufuk for the update.

2017-02-07 18:43 GMT+01:00 Robert Metzger <rmetz...@apache.org>:

> I've filed a JIRA for the issue: https://issues.apache.
> org/jira/browse/FLINK-5736
>
> On Tue, Feb 7, 2017 at 5:00 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Yes, I'll try to fix it asap. Sorry for the inconvenience.
>>
>> On Mon, Feb 6, 2017 at 4:43 PM, Ufuk Celebi <u...@apache.org> wrote:
>>
>>> Thanks for reporting this. I think Robert (cc'd) is working in fixing
>>> this, correct?
>>>
>>> On Sat, Feb 4, 2017 at 12:12 PM, Yassine MARZOUGUI
>>> <y.marzou...@mindlytix.com> wrote:
>>> > Hi,
>>> >
>>> > The JavaDoc link of BucketingSink in this page[1] yields to a 404
>>> error. I
>>> > couldn't find the correct url.
>>> > The broken link :
>>> > https://ci.apache.org/projects/flink/flink-docs-master/api/j
>>> ava/org/apache/flink/streaming/connectors/fs/bucketing/
>>> BucketingSink.html
>>> >
>>> > Other pages in the JavaDoc, like this one[2], seem lacking formatting,
>>> > because
>>> > https://ci.apache.org/projects/flink/flink-docs-master/api/j
>>> ava/stylesheet.css
>>> > and
>>> > https://ci.apache.org/projects/flink/flink-docs-master/api/j
>>> ava/script.js
>>> > are not found (404).
>>> >
>>> > Best,
>>> > Yassine
>>> >
>>> >
>>> > [1] :
>>> > https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>>> dev/connectors/filesystem_sink.html
>>> > [2] :
>>> > https://ci.apache.org/projects/flink/flink-docs-master/api/j
>>> ava/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.html
>>>
>>
>>
>


JavaDoc 404

2017-02-04 Thread Yassine MARZOUGUI
Hi,

The JavaDoc link of BucketingSink in this page[1] yields to a 404 error. I
couldn't find the correct url.
The broken link : https://ci.apache.org/projects/flink/flink-docs-
master/api/java/org/apache/flink/streaming/connectors/fs/
bucketing/BucketingSink.html

Other pages in the JavaDoc, like this one[2], seem lacking formatting,
because https://ci.apache.org/projects/flink/flink-docs-
master/api/java/stylesheet.css and https://ci.apache.org/
projects/flink/flink-docs-master/api/java/script.js are not found (404).

Best,
Yassine


[1] : https://ci.apache.org/projects/flink/flink-docs-
release-1.2/dev/connectors/filesystem_sink.html
[2] : https://ci.apache.org/projects/flink/flink-docs-
master/api/java/org/apache/flink/streaming/api/functions/
sink/RichSinkFunction.html


Re: Externalized Checkpoints vs Periodic Checkpoints

2017-02-02 Thread Yassine MARZOUGUI
Thank you Till for the clarification, that was helpful.

Best,
Yassine

2017-02-02 15:31 GMT+01:00 Till Rohrmann <trohrm...@apache.org>:

> Hi Yassine,
>
> a periodic checkpoint is checkpoint which will be triggered periodically
> by Flink. The checkpoint itself can have multiple properties and one of
> them is whether the checkpoint is externalized or not.
>
> An externalized checkpoint is a checkpoint for which Flink writes the meta
> information into a target directory. In contrast to that, for a
> non-externalized checkpoint Flink will store the checkpoint meta
> information only in memory. The former has the advantage that you don't
> lose the checkpoints if you shutdown your cluster. They behave similar to
> savepoints and in fact savepoints are externalized checkpoints with some
> more properties.
>
> At the moment, Flink's checkpoint coordinator only retains the last
> successfully completed checkpoint. This means that whenever a new
> checkpoint completes then the last completed checkpoint will be discarded.
> This also applies to externalized checkpoints.
>
> Cheers,
> Till
>
> On Wed, Feb 1, 2017 at 2:03 PM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
>> Hi all,
>>
>> Could someone clarify the difference between externalized checkpoints[1]
>> and regular periodic checkpoints[2]?
>> Moreover, I have a question regarding the retention of checkpoints: For
>> regular checkpoints, does the last checkpoint discard the previous ones? If
>> yes, is that the case too for the externalized checkpoints? Thank you.
>>
>> Best,
>> Yassine
>>
>> [1] : https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> setup/checkpoints.html
>> [2] : https://ci.apache.org/projects/flink/flink-docs-release-1.
>> 2/dev/stream/checkpointing.html
>>
>
>


Externalized Checkpoints vs Periodic Checkpoints

2017-02-01 Thread Yassine MARZOUGUI
Hi all,

Could someone clarify the difference between externalized checkpoints[1]
and regular periodic checkpoints[2]?
Moreover, I have a question regarding the retention of checkpoints: For
regular checkpoints, does the last checkpoint discard the previous ones? If
yes, is that the case too for the externalized checkpoints? Thank you.

Best,
Yassine

[1] :
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/checkpoints.html
[2] :
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/checkpointing.html


Re: Rate-limit processing

2017-01-20 Thread Yassine MARZOUGUI
Hi,

You might find this similar thread from the mailing list archive helpful :
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/throttled-stream-td6138.html
.

Best,
Yassine

2017-01-20 10:53 GMT+01:00 Florian König :

> Hi,
>
> i need to limit the rate of processing in a Flink stream application.
> Specifically, the number of items processed in a .map() operation has to
> stay under a certain maximum per second.
>
> At the moment, I have another .map() operation before the actual
> processing, which just sleeps for a certain time (e.g., 250ms for a limit
> of 4 requests / sec) and returns the item unchanged:
>
> …
>
> public T map(final T value) throws Exception {
> Thread.sleep(delay);
> return value;
> }
>
> …
>
> This works as expected, but is a rather crude approach. Checkpointing the
> job takes a very long time: minutes for a state of a few kB, which for
> other jobs is done in a few milliseconds. I assume that letting the whole
> thread sleep for most of the time interferes with the checkpointing - not
> good!
>
> Would using a different synchronization mechanism (e.g.,
> https://google.github.io/guava/releases/19.0/api/docs/
> index.html?com/google/common/util/concurrent/RateLimiter.html) help to
> make checkpointing work better?
>
> Or, preferably, is there a mechanism inside Flink that I can use to
> accomplish the desired rate limiting? I haven’t found anything in the docs.
>
> Cheers,
> Florian
>


Re: Continuous File monitoring not reading nested files

2017-01-09 Thread Yassine MARZOUGUI
Hi,

I found the root cause of the problem : the listEligibleFiles method in
ContinuousFileMonitoringFunction scans only the topmost files and ignores
the nested files. By fixing that I was able to get the expected output. I
created Jira issue: https://issues.apache.org/jira/browse/FLINK-5432.

@Kostas, If you haven't already started working on a fix for this, I would
happily contribute a fix for it if you like.

Best,
Yassine

2017-01-09 17:23 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

> Hi Kostas,
>
> I debugged the code and the nestedFileEnumeration parameter was always
> true during the execution. I noticed however that in the following loop in
> ContinuousFileMonitoringFunction, for some reason, the fileStatus was
> null for files in nested folders, and non null for files directly under the
> parent path, so no splits were forwarded in the case of nested folders.
>
> for(int var5 = 0; var5 < var4; ++var5) {
> FileInputSplit split = var3[var5];
> FileStatus fileStatus = (FileStatus)eligibleFiles.get(
> split.getPath());
> if(fileStatus != null) {
> Long modTime = Long.valueOf(fileStatus.
> getModificationTime());
> Object splitsToForward = (List)splitsByModTime.get(
> modTime);
> if(splitsToForward == null) {
> splitsToForward = new ArrayList();
> splitsByModTime.put(modTime, splitsToForward);
> }
>
> ((List)splitsToForward).add(new
> TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(),
> split.getPath(), split.getStart(), split.getLength(),
> split.getHostnames()));
> }
> }
>
> Thanks,
> Yassine
>
>
> 2017-01-09 15:04 GMT+01:00 Kostas Kloudas <k.klou...@data-artisans.com>:
>
>> Hi Yassine,
>>
>> I suspect that the problem is in the way the input format (and not the
>> reader) scans nested files,
>> but could you see if in the code that is executed by the tasks, the
>> nestedFileEnumeration parameter is still true?
>>
>> I am asking in order to pin down if the problem is in the way we ship the
>> code to the tasks or in reading the
>> nested files.
>>
>> Thanks,
>> Kostas
>>
>> On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI <y.marzou...@mindlytix.com>
>> wrote:
>>
>> Hi,
>>
>> Any updates on this issue? Thank you.
>>
>> Best,
>> Yassine
>>
>>
>> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org> wrote:
>>
>> +kostas, who probably has the most experience with this by now. Do you
>> have an idea what might be going on?
>>
>> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
>> wrote:
>>
>>> Looks like this is not specific to the continuous file monitoring, I'm
>>> having the same issue (files in nested directories are not read) when using:
>>>
>>> env.readFile(fileInputFormat, "hdfs:///shared/mydir", 
>>> FileProcessingMode.PROCESS_ONCE,
>>> -1L)
>>>
>>> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
>>> :
>>>
>>> Hi all,
>>>
>>> I'm using the following code to continuously process files from a
>>> directory "mydir".
>>>
>>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>> ExecutionEnvironment();
>>>
>>> FileInputFormat fileInputFormat = new TextInputFormat(new Path("
>>> hdfs:///shared/mydir"));
>>> fileInputFormat.setNestedFileEnumeration(true);
>>>
>>> env.readFile(fileInputFormat,
>>> "hdfs:///shared/mydir",
>>> FileProcessingMode.PROCESS_CONTINUOUSLY, 1L)
>>> .print();
>>>
>>> env.execute();
>>>
>>> If I add directory under mydir, say "2016-12-16", and then add a file "
>>> *2016-12-16/file.txt"*, its contents are not printed. If I add the same
>>> file directly under "*mydir"*,  its contents are correctly printed.
>>> After that the logs will show the following :
>>>
>>> 10:55:44,928 DEBUG org.apache.flink.streaming.api
>>> .functions.source.ContinuousFileMonitoringFunction  - Ignoring
>>> hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time=
>>> 1481882041587 and global mod time= 1481882126122
>>> 10:55:44,928 DEBUG org.apache.flink.streaming.api
>>> .functions.source.ContinuousFileMonitoringFunction  - Ignoring
>>> hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time=
>>> 1481881788704 and global mod time= 1481882126122
>>>
>>> Looks like the ContinuousFileMonitoringFunction  considered it already
>>> read 2016-12-16 as a file and then excludes it, but its contents were not
>>> processed. Any Idea why this happens?
>>> Thank you.
>>>
>>> Best,
>>> Yassine
>>>
>>>
>>>
>>
>>
>


Re: Continuous File monitoring not reading nested files

2017-01-09 Thread Yassine MARZOUGUI
Hi Kostas,

I debugged the code and the nestedFileEnumeration parameter was always true
during the execution. I noticed however that in the following loop
in ContinuousFileMonitoringFunction, for some reason, the fileStatus was
null for files in nested folders, and non null for files directly under the
parent path, so no splits were forwarded in the case of nested folders.

for(int var5 = 0; var5 < var4; ++var5) {
FileInputSplit split = var3[var5];
FileStatus fileStatus =
(FileStatus)eligibleFiles.get(split.getPath());
if(fileStatus != null) {
Long modTime =
Long.valueOf(fileStatus.getModificationTime());
Object splitsToForward =
(List)splitsByModTime.get(modTime);
if(splitsToForward == null) {
splitsToForward = new ArrayList();
splitsByModTime.put(modTime, splitsToForward);
}

((List)splitsToForward).add(new
TimestampedFileInputSplit(modTime.longValue(), split.getSplitNumber(),
split.getPath(), split.getStart(), split.getLength(),
split.getHostnames()));
}
}

Thanks,
Yassine


2017-01-09 15:04 GMT+01:00 Kostas Kloudas <k.klou...@data-artisans.com>:

> Hi Yassine,
>
> I suspect that the problem is in the way the input format (and not the
> reader) scans nested files,
> but could you see if in the code that is executed by the tasks, the
> nestedFileEnumeration parameter is still true?
>
> I am asking in order to pin down if the problem is in the way we ship the
> code to the tasks or in reading the
> nested files.
>
> Thanks,
> Kostas
>
> On Jan 9, 2017, at 12:56 PM, Yassine MARZOUGUI <y.marzou...@mindlytix.com>
> wrote:
>
> Hi,
>
> Any updates on this issue? Thank you.
>
> Best,
> Yassine
>
>
> On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org> wrote:
>
> +kostas, who probably has the most experience with this by now. Do you
> have an idea what might be going on?
>
> On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
> wrote:
>
>> Looks like this is not specific to the continuous file monitoring, I'm
>> having the same issue (files in nested directories are not read) when using:
>>
>> env.readFile(fileInputFormat, "hdfs:///shared/mydir", 
>> FileProcessingMode.PROCESS_ONCE,
>> -1L)
>>
>> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>>
>> Hi all,
>>
>> I'm using the following code to continuously process files from a
>> directory "mydir".
>>
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>>
>> FileInputFormat fileInputFormat = new TextInputFormat(new Path("
>> hdfs:///shared/mydir"));
>> fileInputFormat.setNestedFileEnumeration(true);
>>
>> env.readFile(fileInputFormat,
>> "hdfs:///shared/mydir",
>> FileProcessingMode.PROCESS_CONTINUOUSLY, 1L)
>> .print();
>>
>> env.execute();
>>
>> If I add directory under mydir, say "2016-12-16", and then add a file "
>> *2016-12-16/file.txt"*, its contents are not printed. If I add the same
>> file directly under "*mydir"*,  its contents are correctly printed.
>> After that the logs will show the following :
>>
>> 10:55:44,928 DEBUG org.apache.flink.streaming.api
>> .functions.source.ContinuousFileMonitoringFunction  - Ignoring
>> hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time=
>> 1481882041587 and global mod time= 1481882126122
>> 10:55:44,928 DEBUG org.apache.flink.streaming.api
>> .functions.source.ContinuousFileMonitoringFunction  - Ignoring
>> hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time= 1481881788704
>> and global mod time= 1481882126122
>>
>> Looks like the ContinuousFileMonitoringFunction  considered it already
>> read 2016-12-16 as a file and then excludes it, but its contents were not
>> processed. Any Idea why this happens?
>> Thank you.
>>
>> Best,
>> Yassine
>>
>>
>>
>
>


Re: Continuous File monitoring not reading nested files

2017-01-09 Thread Yassine MARZOUGUI
Hi,

Any updates on this issue? Thank you.

Best,
Yassine


On Dec 20, 2016 6:15 PM, "Aljoscha Krettek" <aljos...@apache.org> wrote:

+kostas, who probably has the most experience with this by now. Do you have
an idea what might be going on?

On Fri, 16 Dec 2016 at 15:45 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
wrote:

> Looks like this is not specific to the continuous file monitoring, I'm
> having the same issue (files in nested directories are not read) when using:
>
> env.readFile(fileInputFormat, "hdfs:///shared/mydir", 
> FileProcessingMode.PROCESS_ONCE,
> -1L)
>
> 2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>
> Hi all,
>
> I'm using the following code to continuously process files from a
> directory "mydir".
>
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> FileInputFormat fileInputFormat = new TextInputFormat(new
> Path("hdfs:///shared/mydir"));
> fileInputFormat.setNestedFileEnumeration(true);
>
> env.readFile(fileInputFormat,
> "hdfs:///shared/mydir",
> FileProcessingMode.PROCESS_CONTINUOUSLY, 1L)
> .print();
>
> env.execute();
>
> If I add directory under mydir, say "2016-12-16", and then add a file "
> *2016-12-16/file.txt"*, its contents are not printed. If I add the same
> file directly under "*mydir"*,  its contents are correctly printed. After
> that the logs will show the following :
>
> 10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.
> ContinuousFileMonitoringFunction  - Ignoring 
> hdfs://mlxbackoffice/shared/mydir/2016-12-16,
> with mod time= 1481882041587 and global mod time= 1481882126122
> 10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.
> ContinuousFileMonitoringFunction  - Ignoring 
> hdfs://mlxbackoffice/shared/mydir/file.txt,
> with mod time= 1481881788704 and global mod time= 1481882126122
>
> Looks like the ContinuousFileMonitoringFunction  considered it already
> read 2016-12-16 as a file and then excludes it, but its contents were not
> processed. Any Idea why this happens?
> Thank you.
>
> Best,
> Yassine
>
>
>


Triggering a saveppoint failed the Job

2017-01-04 Thread Yassine MARZOUGUI
Hi all,

I tried to trigger a savepoint for a streaming job, both the savepoint and
the job failed.

The job failed with the following exception:

java.lang.RuntimeException: Error while triggering checkpoint for
IterationSource-7 (1/1)
at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1026)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorIdentifier(StreamTask.java:767)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.access$500(StreamTask.java:115)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.createStreamFactory(StreamTask.java:986)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:956)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:583)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:551)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:511)
at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1019)
... 5 more


And the savepoint failed with the following exception:

Using address /127.0.0.1:6123 to connect to JobManager.
Triggering savepoint for job 153310c4a836a92ce69151757c6b73f1.
Waiting for response...


 The program finished with the following exception:

java.lang.Exception: Failed to complete savepoint
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:793)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:782)
at
org.apache.flink.runtime.concurrent.impl.FlinkFuture$6.recover(FlinkFuture.java:263)
at akka.dispatch.Recover.internal(Future.scala:267)
at akka.dispatch.japi$RecoverBridge.apply(Future.scala:183)
at akka.dispatch.japi$RecoverBridge.apply(Future.scala:181)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Failure.recover(Try.scala:185)
at
scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at
scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at
akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Checkpoint failed: Checkpoint Coordinator
is shutting down
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:338)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.shutdown(CheckpointCoordinator.java:245)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.postRunCleanup(ExecutionGraph.java:1065)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.jobVertexInFinalState(ExecutionGraph.java:1034)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.subtaskInFinalState(ExecutionJobVertex.java:435)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.vertexCancelled(ExecutionJobVertex.java:407)
at
org.apache.flink.runtime.executiongraph.ExecutionVertex.executionCanceled(ExecutionVertex.java:593)
at
org.apache.flink.runtime.executiongraph.Execution.cancelingComplete(Execution.java:729)
at

Re: Continuous File monitoring not reading nested files

2016-12-16 Thread Yassine MARZOUGUI
Looks like this is not specific to the continuous file monitoring, I'm
having the same issue (files in nested directories are not read) when using:

env.readFile(fileInputFormat, "hdfs:///shared/mydir",
FileProcessingMode.PROCESS_ONCE,
-1L)

2016-12-16 11:12 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

> Hi all,
>
> I'm using the following code to continuously process files from a
> directory "mydir".
>
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> FileInputFormat fileInputFormat = new TextInputFormat(new
> Path("hdfs:///shared/mydir"));
> fileInputFormat.setNestedFileEnumeration(true);
>
> env.readFile(fileInputFormat,
> "hdfs:///shared/mydir",
> FileProcessingMode.PROCESS_CONTINUOUSLY, 1L)
> .print();
>
> env.execute();
>
> If I add directory under mydir, say "2016-12-16", and then add a file "
> *2016-12-16/file.txt"*, its contents are not printed. If I add the same
> file directly under "*mydir"*,  its contents are correctly printed. After
> that the logs will show the following :
>
> 10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.
> ContinuousFileMonitoringFunction  - Ignoring 
> hdfs://mlxbackoffice/shared/mydir/2016-12-16,
> with mod time= 1481882041587 and global mod time= 1481882126122
> 10:55:44,928 DEBUG org.apache.flink.streaming.api.functions.source.
> ContinuousFileMonitoringFunction  - Ignoring 
> hdfs://mlxbackoffice/shared/mydir/file.txt,
> with mod time= 1481881788704 and global mod time= 1481882126122
>
> Looks like the ContinuousFileMonitoringFunction  considered it already
> read 2016-12-16 as a file and then excludes it, but its contents were not
> processed. Any Idea why this happens?
> Thank you.
>
> Best,
> Yassine
>


Continuous File monitoring not reading nested files

2016-12-16 Thread Yassine MARZOUGUI
Hi all,

I'm using the following code to continuously process files from a directory
"mydir".

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

FileInputFormat fileInputFormat = new TextInputFormat(new
Path("hdfs:///shared/mydir"));
fileInputFormat.setNestedFileEnumeration(true);

env.readFile(fileInputFormat,
"hdfs:///shared/mydir",
FileProcessingMode.PROCESS_CONTINUOUSLY, 1L)
.print();

env.execute();

If I add directory under mydir, say "2016-12-16", and then add a file "
*2016-12-16/file.txt"*, its contents are not printed. If I add the same
file directly under "*mydir"*,  its contents are correctly printed. After
that the logs will show the following :

10:55:44,928 DEBUG
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
 - Ignoring hdfs://mlxbackoffice/shared/mydir/2016-12-16, with mod time=
1481882041587 and global mod time= 1481882126122
10:55:44,928 DEBUG
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
 - Ignoring hdfs://mlxbackoffice/shared/mydir/file.txt, with mod time=
1481881788704 and global mod time= 1481882126122

Looks like the ContinuousFileMonitoringFunction  considered it already
read 2016-12-16 as a file and then excludes it, but its contents were not
processed. Any Idea why this happens?
Thank you.

Best,
Yassine


Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

2016-12-14 Thread Yassine MARZOUGUI
Hi Aljoscha,

Thanks a lot for the explanation. Using readFile with PROCESS_CONTINUOUSLY
solves it. Two more questions though:

1. Is it possible to gracefully stop the job once it has read the input
once?
2. Does the watermark extraction period depend on the watch interval, or
should any watch interval (except -1L) work the same way?

In my case the input is indeed finite and static, but contains hundreds of
GBs, which made the window state grow quickly beyond the memory capacity,
and the fact that the window contents were fired periodically helped
keeping it small.

Best,
Yassine

2016-12-14 10:38 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:

> Hi Yassine,
> for a bit more detailed explanation: We internally changed how the timer
> system works, this timer system is also used to periodically extract
> watermarks. Due to this change, in your case we don't extract watermarks
> anymore.
>
> Internally, your call resolves to something like this:
>
> Env.readFile(FileInputFormat inputFormat, String
> filePath, FileProcessingMode watchType, long interval)
>
> with the FileProcessingMode being set to PROCESS_ONCE.
>
> To get back the old behaviour you can call this method directly with
> PROCESS_CONTINUOUSLY. This will keep the pipeline running and will also
> ensure that watermarks keep being extracted.
>
> In your case, it is not strictly wrong to emit only one large watermark in
> the end because your processing is finite. I admit that the change from
> Flink 1.1 seems a bit strange but this should only occur in toy examples
> where the data is finite.
>
> Does that help?
>
> Cheers,
> Aljoscha
>
> On Tue, 13 Dec 2016 at 18:17 Aljoscha Krettek <aljos...@apache.org> wrote:
>
>> Hi Yassine,
>> I managed to reproduce the problem. The cause is that we recently changed
>> how the timer service is being cleaned up and now the watermark timers are
>> not firing anymore.
>>
>> I'll keep you posted and hope to find a solution fast.
>>
>> Cheers,
>> Aljoscha
>>
>> On Sun, 11 Dec 2016 at 22:10 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
>> wrote:
>>
>> Hi Aljoscha,
>>
>> Please excuse me for the late response; I've been busy for the whole
>> previous week.
>> I used the custom watermark debugger (with 1.1, I changed 
>> super.processWatermark(mark)
>> to super.output.emitWatermark(mark)), surprisingly with 1.2, only one
>> watremark is printed at the end of the stream with the value WM: Watermark
>> @ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are
>> printed periodically. I am  using the following revision of 1.2-SNAPSHOT :
>> https://github.com/apache/flink/tree/4e336c692b74f218ba09844a46f495
>> 34e3a210e9.
>>
>> I uploaded the dataset I'm using as an input here :
>> https://drive.google.com/file/d/0BzERCAJnxXocNGpMTGMzX09id1U/
>> view?usp=sharing ,the first column corresponds to the timestamp.
>>
>> You can find the code below. Thanks you for your help.
>>
>> import com.opencsv.CSVParser;
>> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>> import org.apache.flink.api.java.tuple.Tuple;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.tuple.Tuple3;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.
>> StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.timestamps.
>> AscendingTimestampExtractor;
>> import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
>> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
>> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.streaming.api.windowing.assigners.
>> EventTimeSessionWindows;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>> import org.apache.flink.util.Collector;
>> import java.util.*;
>>
>> /**
>>  * Created by ymarzougui on 11/1/2016.
>>  */
>> public class SortedSessionsAssigner {
>> public static void main(String[] args) throws Exception {
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
>> getExecutionEnvironment();
>> env.setParallelism(1);
>&g

Re: Incremental aggregations - Example not working

2016-12-12 Thread Yassine MARZOUGUI
Yes, it was suppoed to work. I looked into this, and as Chesnay said, this
is a bug in the fold function. I opened an issue in JIRA :
https://issues.apache.org/jira/browse/FLINK-5320, and will fix it very
soon, thank you for reporting it.
In the mean time you can workaround the problem by specifying the
TypeInformation along with the fold function as follows : fold(ACC,
FoldFunction, WindowFunction, foldAccumulatorType, resultType). In the
example, the foldAccumulatorType is new TupleTypeInfo<Tuple3<String, Long,
Integer>>(), and the resultType is also new TupleTypeInfo<Tuple3<String,
Long, Integer>>().

Best,
Yassine

2016-12-12 16:38 GMT+01:00 Matt <dromitl...@gmail.com>:

> I'm using 1.2-SNAPSHOT, should it work in that version?
>
> On Mon, Dec 12, 2016 at 12:18 PM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
>> Hi Matt,
>>
>> What version of Flink are you using?
>> The incremental agregation with fold(ACC, FoldFunction, WindowFunction)
>> in a new change that will be part of Flink 1.2, for Flink 1.1 the correct
>> way to perform incrementation aggregations is : apply(ACC, FoldFunction,
>> WindowFunction) (see the docs for 1.1 [1])
>>
>> [1] : https://ci.apache.org/projects/flink/flink-docs-release-1.
>> 1/apis/streaming/windows.html#windowfunction-with-incremental-aggregation
>>
>> Best,
>> Yassine
>>
>> 2016-12-12 15:37 GMT+01:00 Chesnay Schepler <ches...@apache.org>:
>>
>>> Hello Matt,
>>>
>>> This looks like a bug in the fold() function to me.
>>>
>>> I'm adding Timo to the discussion, he can probably shed some light on
>>> this.
>>>
>>> Regards,
>>> Chesnay
>>>
>>>
>>> On 12.12.2016 15:13, Matt wrote:
>>>
>>> In case this is important, if I remove the WindowFunction, and only use
>>> the FoldFunction it works fine.
>>>
>>> I don't see what is wrong...
>>>
>>> On Mon, Dec 12, 2016 at 10:53 AM, Matt <dromitl...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm following the documentation [1] of window functions with
>>>> incremental aggregations, but I'm getting an "input mismatch" error.
>>>>
>>>> The code [2] is almost identical to the one in the documentation, at
>>>> the bottom you can find the exact error.
>>>>
>>>> What am I missing? Can you provide a working example of a fold function
>>>> with both a FoldFunction and a WindowFunction?
>>>>
>>>> Regards,
>>>> Matt
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/w
>>>> indows.html#windowfunction-with-incremental-aggregation
>>>>
>>>> [2] https://gist.github.com/cc7ed5570e4ce30c3a482ab835e3983d
>>>>
>>>
>>>
>>>
>>
>


Re: Incremental aggregations - Example not working

2016-12-12 Thread Yassine MARZOUGUI
Hi Matt,

What version of Flink are you using?
The incremental agregation with fold(ACC, FoldFunction, WindowFunction) in
a new change that will be part of Flink 1.2, for Flink 1.1 the correct way
to perform incrementation aggregations is : apply(ACC, FoldFunction,
WindowFunction) (see the docs for 1.1 [1])

[1] :
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html#windowfunction-with-incremental-aggregation

Best,
Yassine

2016-12-12 15:37 GMT+01:00 Chesnay Schepler :

> Hello Matt,
>
> This looks like a bug in the fold() function to me.
>
> I'm adding Timo to the discussion, he can probably shed some light on this.
>
> Regards,
> Chesnay
>
>
> On 12.12.2016 15:13, Matt wrote:
>
> In case this is important, if I remove the WindowFunction, and only use
> the FoldFunction it works fine.
>
> I don't see what is wrong...
>
> On Mon, Dec 12, 2016 at 10:53 AM, Matt  wrote:
>
>> Hi,
>>
>> I'm following the documentation [1] of window functions with incremental
>> aggregations, but I'm getting an "input mismatch" error.
>>
>> The code [2] is almost identical to the one in the documentation, at the
>> bottom you can find the exact error.
>>
>> What am I missing? Can you provide a working example of a fold function
>> with both a FoldFunction and a WindowFunction?
>>
>> Regards,
>> Matt
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/
>> windows.html#windowfunction-with-incremental-aggregation
>>
>> [2] https://gist.github.com/cc7ed5570e4ce30c3a482ab835e3983d
>>
>
>
>


Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

2016-12-11 Thread Yassine MARZOUGUI
er");

}

public static class WatermarkDebugger
extends AbstractStreamOperator implements
OneInputStreamOperator<T, T> {
private static final long serialVersionUID = 1L;

@Override
public void processElement(StreamRecord element) throws
Exception {
System.out.println("ELEMENT: " + element);
output.collect(element);
}

@Override
public void processWatermark(Watermark mark) throws Exception {
// 1.2-snapshot
super.processWatermark(mark);
// 1.1-snapshot
//super.output.emitWatermark(mark);
System.out.println("WM: " + mark);
}
}

}

Best,
Yassine

2016-12-06 5:57 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:

> Hi,
> could you please try adding this custom watermark debugger to see what's
> going on with the element timestamps and watermarks:
>
> public static class WatermarkDebugger
> extends AbstractStreamOperator implements
> OneInputStreamOperator<T, T> {
> private static final long serialVersionUID = 1L;
>
> @Override
> public void processElement(StreamRecord element) throws Exception {
> System.out.println("ELEMENT: " + element);
> output.collect(element);
> }
>
> @Override
> public void processWatermark(Watermark mark) throws Exception {
> super.processWatermark(mark);
> System.out.println("WM: " + mark);
> }
> }
>
> you can use it like this:
> input.transform("WatermarkDebugger", input.getType(), new
> WatermarkDebugger<Tuple2<String, Integer>>());
>
> That should give us something to work with.
>
> Cheers,
> Aljoscha
>
> On Mon, 5 Dec 2016 at 18:54 Robert Metzger <rmetz...@apache.org> wrote:
>
> I'll add Aljoscha and Kostas Kloudas to the conversation. They have the
> best overview over the changes to the window operator between 1.1. and 1.2.
>
> On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
> I forgot to mention : the watermark extractor is the one included in Flink
> API.
>
> 2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>
> Hi robert,
>
> Yes, I am using the same code, just swithcing the version in pom.xml to
> 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
> the time of the question)). Here is the watermark assignment :
>
> .assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor<Tuple3<Long,String,String>>()
> {
> @Override
> public long extractAscendingTimestamp(Tuple3<Long,String,String>
> tuple3) {
> return tuple3.f0;
> }
> })
>
> Best,
> Yassine
>
> 2016-12-05 11:24 GMT+01:00 Robert Metzger <rmetz...@apache.org>:
>
> Hi Yassine,
> are you sure your watermark extractor is the same between the two
> versions. It sounds a bit like the watermarks for the 1.2 code are not
> generated correctly.
>
> Regards,
> Robert
>
>
> On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
> Hi all,
>
> With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
> boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
> in memory and the windows results are not emitted until the whole stream is
> processed. Is this a temporary behaviour due to the developments in
> 1.2-SNAPSHOT, or a bug?
>
> I am using a code similar to the follwoing:
>
> env.setParallelism(1);
>
> DataStream sessions = env
> .readTextFile()
> .flatMap()
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
> .keyBy(1)
> .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
> .apply().setParallelism(32)
>
> sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
> sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
>
> Best,
> Yassine
>
>
>
>
>
>


Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

2016-12-05 Thread Yassine MARZOUGUI
I forgot to mention : the watermark extractor is the one included in Flink
API.

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

> Hi robert,
>
> Yes, I am using the same code, just swithcing the version in pom.xml to
> 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
> the time of the question)). Here is the watermark assignment :
>
> .assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor<Tuple3<Long,String,String>>()
> {
> @Override
> public long extractAscendingTimestamp(Tuple3<Long,String,String>
> tuple3) {
> return tuple3.f0;
> }
> })
>
> Best,
> Yassine
>
> 2016-12-05 11:24 GMT+01:00 Robert Metzger <rmetz...@apache.org>:
>
>> Hi Yassine,
>> are you sure your watermark extractor is the same between the two
>> versions. It sounds a bit like the watermarks for the 1.2 code are not
>> generated correctly.
>>
>> Regards,
>> Robert
>>
>>
>> On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <
>> y.marzou...@mindlytix.com> wrote:
>>
>>> Hi all,
>>>
>>> With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
>>> boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
>>> in memory and the windows results are not emitted until the whole stream is
>>> processed. Is this a temporary behaviour due to the developments in
>>> 1.2-SNAPSHOT, or a bug?
>>>
>>> I am using a code similar to the follwoing:
>>>
>>> env.setParallelism(1);
>>>
>>> DataStream sessions = env
>>> .readTextFile()
>>> .flatMap()
>>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
>>> .keyBy(1)
>>> .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
>>> .apply().setParallelism(32)
>>>
>>> sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
>>> sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
>>>
>>> Best,
>>> Yassine
>>>
>>
>>
>


Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

2016-12-05 Thread Yassine MARZOUGUI
Hi robert,

Yes, I am using the same code, just swithcing the version in pom.xml to
1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
the time of the question)). Here is the watermark assignment :

.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
@Override
public long extractAscendingTimestamp(Tuple3<Long,String,String>
tuple3) {
return tuple3.f0;
}
})

Best,
Yassine

2016-12-05 11:24 GMT+01:00 Robert Metzger <rmetz...@apache.org>:

> Hi Yassine,
> are you sure your watermark extractor is the same between the two
> versions. It sounds a bit like the watermarks for the 1.2 code are not
> generated correctly.
>
> Regards,
> Robert
>
>
> On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
>> Hi all,
>>
>> With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
>> boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
>> in memory and the windows results are not emitted until the whole stream is
>> processed. Is this a temporary behaviour due to the developments in
>> 1.2-SNAPSHOT, or a bug?
>>
>> I am using a code similar to the follwoing:
>>
>> env.setParallelism(1);
>>
>> DataStream sessions = env
>> .readTextFile()
>> .flatMap()
>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
>> .keyBy(1)
>> .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
>> .apply().setParallelism(32)
>>
>> sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
>> sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
>>
>> Best,
>> Yassine
>>
>
>


In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

2016-12-03 Thread Yassine MARZOUGUI
Hi all,

With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
in memory and the windows results are not emitted until the whole stream is
processed. Is this a temporary behaviour due to the developments in
1.2-SNAPSHOT, or a bug?

I am using a code similar to the follwoing:

env.setParallelism(1);

DataStream sessions = env
.readTextFile()
.flatMap()
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
.keyBy(1)
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.apply().setParallelism(32)

sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();

Best,
Yassine


Re: Sink not switched to "RUNUNG" even though a task slot is available

2016-11-23 Thread Yassine MARZOUGUI
Hi Robert,

I see, so the join needs to consume all data first and process it.
In my case, I couldn't wait long because the first join quickly generated a
lot of data that can't fit in the memory or in the disk. The solution was
then to manually specify a JoinHint and broadcast the small dataset, that
way I was able to get an output of the join as soon as data is received.

Thanks a lot for the clarification!

Best,
Yassine

2016-11-23 17:35 GMT+01:00 Robert Metzger <rmetz...@apache.org>:

> Hi Yassine,
>
> you don't necessarily need to set the parallelism of the last two
> operators of 31, the sink with parallelism 1 will fit still into the slots.
> A task slot can, by default, hold an entire "slice" or parallel instance
> of a job.
>
> The reason why the sink stays in state CREATE in the beginning is because
> it didn't receive any data yet. In batch mode, an operator is switching to
> RUNNING once it received the first record. In your case, there are some
> operations (reduce, join) before the sink that first need to consume all
> data and process it.
> If you wait long enough, you should see the sink to become active.
>
> Regards,
> Robert
>
>
>
> On Wed, Nov 23, 2016 at 2:03 PM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
>> Hi all,
>>
>> My batch job has the follwoing plan in the end (figure attached):
>>
>>
>> ​
>> I have a total of 32 task slots, and I have set the parallelism of the
>> last two operators before the sink to 31. The sink parallelism is 1. The
>> last operator before the sink is a MapOperator, so it doesn't need to
>> buffer all elements before emitting the output.
>>
>> Looking at the dashboard, I see that one task slot is available, but the
>> sink subtask stays in the state "CREATED".
>>
>> Any explanation for this behaviour? Thank you.
>>
>> Best,
>> Yassine
>>
>
>


Sink not switched to "RUNUNG" even though a task slot is available

2016-11-23 Thread Yassine MARZOUGUI
Hi all,

My batch job has the follwoing plan in the end (figure attached):


​
I have a total of 32 task slots, and I have set the parallelism of the last
two operators before the sink to 31. The sink parallelism is 1. The last
operator before the sink is a MapOperator, so it doesn't need to buffer all
elements before emitting the output.

Looking at the dashboard, I see that one task slot is available, but the
sink subtask stays in the state "CREATED".

Any explanation for this behaviour? Thank you.

Best,
Yassine


Re: Csv to windows?

2016-11-07 Thread Yassine MARZOUGUI
Hi Flelix,

As I see in kddcup.newtestdata_small_unlabeled_index
,
the first field of connectionRecords (splits[0]), is unique for each
record, therefore when apply keyBy(0), it will logically partition your
stream by that field and each partition will contain only one element. So
the countWindow(2) actually never fires because it never reaches 2
elements. That's why your files stay empty.

Could you please go into more detail about what the expected output is? Then
we might be able to figure out the proper way to achieve it.

Best,
Yassine

2016-11-07 19:18 GMT+01:00 Felix Neutatz :

> Hi Till,
>
> the mapper solution makes sense :)
>
> Unfortunately, in my case it was not a typo in the path. I checked and saw
> that the records are read.
>
> You can find the whole program here: https://github.com/
> FelixNeutatz/CluStream/blob/master/flink-java-project/src/
> main/java/org/apache/flink/clustream/StreamingJobIndex.java
>
> I am happy for any ideas.
>
> Best regards,
> Felix
>
> 2016-11-07 16:15 GMT+01:00 Till Rohrmann :
>
>> Hi Felix,
>>
>> I'm not sure whether grouping/keyBy by processing time makes semantically
>> any sense. This can be anything depending on the execution order.
>> Therefore, there is not build in mechanism to group by processing time. But
>> you can always write a mapper which assigns the current processing time to
>> the stream record and use this field for grouping.
>>
>> Concerning your second problem, could you check the path of the file? At
>> the moment Flink fails silently if the path is not valid. It might be that
>> you have a simple typo in the path. I've opened an issue to fix this issue
>> [1].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-5027
>>
>> Cheers,
>> Till
>>
>>
>>
>>
>>
>> On Sun, Nov 6, 2016 at 12:36 PM, Felix Neutatz 
>> wrote:
>>
>>> Hi everybody,
>>>
>>> I finally reached streaming territory. For a student project I want to
>>> implement CluStream for Flink. I guess this is especially interesting to
>>> try queryable state :)
>>>
>>> But I have problems at the first steps. My input data is a csv file of
>>> records. For the start I just want to window this csv. I don't want to use 
>>> AllWindows
>>> because it's not parallelizable.
>>>
>>> So my first question is: Can I window by processing time, like this:
>>>
>>> connectionRecordsT.keyBy(processing_time).timeWindow(Time.milliseconds(1000L))
>>>
>>> I didn't find a way, so I added in the csv an index column and tried to use 
>>> a countWindow:
>>>
>>> DataStreamSource source = env.readTextFile(file.getPath());
>>>
>>> DataStream> connectionRecords = source.map(new 
>>> MapToVector()).setParallelism(4);
>>>
>>> connectionRecords.keyBy(0).countWindow(10).apply (
>>>new WindowFunction, Tuple1, Tuple, 
>>> GlobalWindow>() {
>>>   public void apply (Tuple tuple,
>>>  GlobalWindow window,
>>>  Iterable> values,
>>>  Collector out) throws Exception {
>>>  int sum = 0;
>>>  Iterator iterator = values.iterator();
>>>  while (iterator.hasNext () ) {
>>> Tuple2 t = (Tuple2)iterator.next();
>>> sum += 1;
>>>  }
>>>  out.collect (new Tuple1(new Integer(sum)));
>>>   }
>>> }).writeAsCsv("text");
>>>
>>> To check whether everything works I just count the elements per window and 
>>> write them into a csv file.
>>>
>>> Flink generates the files but all are empty. Can you tell me, what I did 
>>> wrong?
>>>
>>> Best regards,
>>>
>>> Felix
>>>
>>>
>>
>


Re: Questions about FoldFunction and WindowFunction

2016-11-02 Thread Yassine MARZOUGUI
Yes, with please. Could you please assign it temporarily to me? (I am not
very familiar with the internal components of Flink and migh take some time
before contributing the code, if by the time you are ready to work on it I
am not yet done, you can reassign it to yourself)

2016-11-02 14:07 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:

> Would you be interested in contributing a fix for that? Otherwise I'll
> probably fix work on that in the coming weeks.
>
> On Wed, 2 Nov 2016 at 13:38 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
> wrote:
>
>> Thank you Aljoscha for your quick response.
>>
>> Best,
>> Yassine
>>
>> 2016-11-02 12:30 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>>
>> Hi Yassine,
>>
>> regarding 1. The close() method of the RichFoldFunction will only be
>> called at the very end of your streaming job, so in practise it will never
>> be called. This is there because of batch jobs, where you have an actual
>> end in your processing.
>>
>> regarding 2. I'm afraid you came across a bug: https://issues.apache.
>> org/jira/browse/FLINK-3869. We can't change this right now because we
>> cannot break API instability but right at the end of this issue I'm
>> proposing a different solution that we'll hopefully get in for the next
>> release.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 2 Nov 2016 at 10:42 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
>> wrote:
>>
>> Hi all,
>>
>> I have a couple questions about FoldFunction and WindowFunction:
>>
>> 1. When using a RichFoldFunction after a window as in
>> keyedStream.window().fold(new RichFoldFunction()), is the close() method
>> called after each window or after all the windows for that key are fired?
>>
>> 2. When applying a FoldFunction to a window followed by a WindowFunction
>> via apply
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/WindowedStream.html#apply-R-org.apache.flink.api.common.functions.FoldFunction-org.apache.flink.streaming.api.functions.windowing.WindowFunction->(R
>>  initialValue,
>> FoldFunction
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/FoldFunction.html>
>> > <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/WindowedStream.html>
>> ,R> foldFunction, WindowFunction
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.html>
>> <R,R,K
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/WindowedStream.html>
>> ,W
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/WindowedStream.html>
>> > function), why should the output of the WindowFunction be of the same
>> type as the input? It would be practical to have a different output type
>> sometimes, for example one would fold tuples in the FoldFunction and then
>> process the (only) aggregated tuple in the Window function and emit an
>> Integer.
>>
>> Best,
>> Yassine
>>
>>
>>


Re: Questions about FoldFunction and WindowFunction

2016-11-02 Thread Yassine MARZOUGUI
Thank you Aljoscha for your quick response.

Best,
Yassine

2016-11-02 12:30 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:

> Hi Yassine,
>
> regarding 1. The close() method of the RichFoldFunction will only be
> called at the very end of your streaming job, so in practise it will never
> be called. This is there because of batch jobs, where you have an actual
> end in your processing.
>
> regarding 2. I'm afraid you came across a bug: https://issues.apache.
> org/jira/browse/FLINK-3869. We can't change this right now because we
> cannot break API instability but right at the end of this issue I'm
> proposing a different solution that we'll hopefully get in for the next
> release.
>
> Cheers,
> Aljoscha
>
> On Wed, 2 Nov 2016 at 10:42 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
> wrote:
>
>> Hi all,
>>
>> I have a couple questions about FoldFunction and WindowFunction:
>>
>> 1. When using a RichFoldFunction after a window as in
>> keyedStream.window().fold(new RichFoldFunction()), is the close() method
>> called after each window or after all the windows for that key are fired?
>>
>> 2. When applying a FoldFunction to a window followed by a WindowFunction
>> via apply
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/WindowedStream.html#apply-R-org.apache.flink.api.common.functions.FoldFunction-org.apache.flink.streaming.api.functions.windowing.WindowFunction->(R
>>  initialValue,
>> FoldFunction
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/FoldFunction.html>
>> > <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/WindowedStream.html>
>> ,R> foldFunction, WindowFunction
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.html>
>> <R,R,K
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/WindowedStream.html>
>> ,W
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/WindowedStream.html>
>> > function), why should the output of the WindowFunction be of the same
>> type as the input? It would be practical to have a different output type
>> sometimes, for example one would fold tuples in the FoldFunction and then
>> process the (only) aggregated tuple in the Window function and emit an
>> Integer.
>>
>> Best,
>> Yassine
>>
>


Questions about FoldFunction and WindowFunction

2016-11-02 Thread Yassine MARZOUGUI
Hi all,

I have a couple questions about FoldFunction and WindowFunction:

1. When using a RichFoldFunction after a window as in
keyedStream.window().fold(new
RichFoldFunction()), is the close() method called after each window or
after all the windows for that key are fired?

2. When applying a FoldFunction to a window followed by a WindowFunction
via apply
(R
initialValue,
FoldFunction

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/WindowedStream.html>,R>
foldFunction,
WindowFunction


,W

> function), why should the output of the WindowFunction be of the same
type as the input? It would be practical to have a different output type
sometimes, for example one would fold tuples in the FoldFunction and then
process the (only) aggregated tuple in the Window function and emit an
Integer.

Best,
Yassine


Re: Bug: Plan generation for Unions picked a ship strategy between binary plan operators.

2016-10-25 Thread Yassine MARZOUGUI
Hi Fabian,

I commented on the issue and attached the program reproducing the bug, But
I couldn't find how to re-open it (I think maybe I don't have enough
permissions?).

Best,
Yassine


2016-10-25 12:49 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Yassine,
>
> I thought I had fixed that bug a few weeks a ago, but apparently the fix
> did not catch all cases.
> Can you please reopen FLINK-2662 and post the program to reproduce the bug
> there?
>
> Thanks,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-2662
>
> 2016-10-25 12:33 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>
>> Hi all,
>>
>> My job fails with the folowing exception : CompilerException: Bug: Plan
>> generation for Unions picked a ship strategy between binary plan operators.
>> The exception happens when adding partitionByRange(1).sortPartition(1,
>> Order.DESCENDING) to the union of datasets.
>>
>> I made a smaller version that reproduces the bug :
>>
>> import org.apache.flink.api.common.functions.GroupReduceFunction;
>> import org.apache.flink.api.common.operators.Order;
>> import org.apache.flink.api.java.DataSet;
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.util.Collector;
>> import java.util.Iterator;
>>
>> public class BugReproduce {
>> public static void main(String[] args) throws Exception {
>> final ExecutionEnvironment env = ExecutionEnvironment.getExecut
>> ionEnvironment();
>> DataSet wc1 = env.fromElements(new WC("first",1), new
>> WC("second",2),new WC("first",1),new WC("first",1),new WC("second",2));
>> DataSet wc2 = env.fromElements(new WC("third",1), new
>> WC("forth",2),new WC("forth",1),new WC("forth",1),new WC("third",2));
>> DataSet wc3 = env.fromElements(new WC("fifth",1), new
>> WC("fifth",2),new WC("fifth",1),new WC("fifth",1),new WC("fifth",2));
>>
>> DataSet<Tuple2<String,Integer>> aggregatedwc1 = aggregateWC(wc1);
>> DataSet<Tuple2<String,Integer>> aggregatedwc2 = aggregateWC(wc2);
>> DataSet<Tuple2<String,Integer>> aggregatedwc3 = aggregateWC(wc3);
>> DataSet<Tuple2<String,Integer>> all =
>> aggregatedwc1.union(aggregatedwc2).union(aggregatedwc3);
>> all.partitionByRange(1).sortPartition(1,
>> Order.DESCENDING).print();
>>
>> }
>>
>> public static DataSet<Tuple2<String,Integer>>
>> aggregateWC(DataSet input){
>> return input.groupBy("word").reduceGroup(new
>> GroupReduceFunction<WC, Tuple2<String, Integer>>() {
>> @Override
>> public void reduce(Iterable iterable,
>> Collector<Tuple2<String, Integer>> collector) throws Exception {
>> Integer count = 0;
>> Iterator iterator = iterable.iterator();
>> if (iterator.hasNext()) {
>> String word= iterator.next().word;
>> while (iterator.hasNext()) {
>> iterator.next();
>> count += 1;
>> }
>> collector.collect(Tuple2.of(word,count));
>> }
>> }
>> });
>> }
>>
>> public static class WC {
>> public String word;
>> public int count;
>>
>> public WC() {
>> }
>>
>> public WC(String word, int count) {
>> this.word = word;
>> this.count = count;
>> }
>>
>> public String getWord() {
>> return word;
>> }
>>
>> public void setWord(String word) {
>> this.word = word;
>> }
>>
>> public int getCount() {
>> return count;
>> }
>>
>> public void setCount(int count) {
>> this.count = count;
>> }
>> }
>> }
>>
>> Here is the exception stacktrace:
>>
>> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
>> Bug: Plan generation for Unions picked a ship strategy between binary plan
>> operators.
>> at org.apache.flink.optimizer.traversals.BinaryUnionReplac

Bug: Plan generation for Unions picked a ship strategy between binary plan operators.

2016-10-25 Thread Yassine MARZOUGUI
Hi all,

My job fails with the folowing exception : CompilerException: Bug: Plan
generation for Unions picked a ship strategy between binary plan operators.
The exception happens when adding partitionByRange(1).sortPartition(1,
Order.DESCENDING) to the union of datasets.

I made a smaller version that reproduces the bug :

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.Iterator;

public class BugReproduce {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
DataSet wc1 = env.fromElements(new WC("first",1), new
WC("second",2),new WC("first",1),new WC("first",1),new WC("second",2));
DataSet wc2 = env.fromElements(new WC("third",1), new
WC("forth",2),new WC("forth",1),new WC("forth",1),new WC("third",2));
DataSet wc3 = env.fromElements(new WC("fifth",1), new
WC("fifth",2),new WC("fifth",1),new WC("fifth",1),new WC("fifth",2));

DataSet> aggregatedwc1 = aggregateWC(wc1);
DataSet> aggregatedwc2 = aggregateWC(wc2);
DataSet> aggregatedwc3 = aggregateWC(wc3);
DataSet> all =
aggregatedwc1.union(aggregatedwc2).union(aggregatedwc3);
all.partitionByRange(1).sortPartition(1, Order.DESCENDING).print();

}

public static DataSet> aggregateWC(DataSet
input){
return input.groupBy("word").reduceGroup(new
GroupReduceFunction>() {
@Override
public void reduce(Iterable iterable,
Collector> collector) throws Exception {
Integer count = 0;
Iterator iterator = iterable.iterator();
if (iterator.hasNext()) {
String word= iterator.next().word;
while (iterator.hasNext()) {
iterator.next();
count += 1;
}
collector.collect(Tuple2.of(word,count));
}
}
});
}

public static class WC {
public String word;
public int count;

public WC() {
}

public WC(String word, int count) {
this.word = word;
this.count = count;
}

public String getWord() {
return word;
}

public void setWord(String word) {
this.word = word;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}
}
}

Here is the exception stacktrace:

Exception in thread "main" org.apache.flink.optimizer.CompilerException:
Bug: Plan generation for Unions picked a ship strategy between binary plan
operators.
at
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
at
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
at
org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
at
org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at
org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:516)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:185)
at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
at org.myorg.prod.BugReproduce.main(BugReproduce.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

I'm using Flink v1.1.3. Any help is appreciated. Thank you.

Best,
Yassine


Re: NoClassDefFoundError on cluster with httpclient 4.5.2

2016-10-21 Thread Yassine MARZOUGUI
Hi Till,

The httpclient jar is included in the job jar. Looking at a similar issue
FLINK-4587 <https://issues.apache.org/jira/browse/FLINK-4587>, It turns out
the problem is with maven shade plugin, since I'm building Flink from
sources with maven 3.3.x.
I was able to solve the problem by rebuilding "flink-dist" as suggested by
Stephan in the comments.

Best,
Yassine

2016-10-20 11:05 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:

> Hi Yassine,
>
> can you check whether the httpclient jar is contained in your job jar
> which you submit to the cluster?
>
> Cheers,
> Till
>
> On Wed, Oct 19, 2016 at 6:41 PM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
>> Hi all,
>>
>> I'm using httpclient with the following dependency:
>>
>> 
>> org.apache.httpcomponents
>> httpclient
>> 4.5.2
>> 
>>
>> On local mode, the program works correctly, but when executed on the
>> cluster, I get the following exception:
>>
>> java.lang.Exception: The user defined 'open(Configuration)' method in
>> class org.myorg.quickstart.Frequencies$2 caused an exception: Could not
>> initialize class org.apache.http.conn.ssl.SSLConnectionSocketFactory
>> at org.apache.flink.runtime.operators.BatchTask.openUserCode(
>> BatchTask.java:1337)
>> at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDr
>> iver.openTask(ChainedFlatMapDriver.java:47)
>> at org.apache.flink.runtime.operators.BatchTask.openChainedTask
>> s(BatchTask.java:1377)
>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(
>> DataSourceTask.java:124)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>> at java.lang.Thread.run(Unknown Source)
>> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
>> org.apache.http.conn.ssl.SSLConnectionSocketFactory
>> at org.apache.http.impl.client.HttpClientBuilder.build(HttpClie
>> ntBuilder.java:966)
>> at org.myorg.quickstart.Frequencies$2.open(Frequencies.java:82)
>> at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:38)
>> at org.apache.flink.runtime.operators.BatchTask.openUserCode(
>> BatchTask.java:1335)
>> ... 5 more
>>
>> I'm using Flink 1.1.3. Any idea how to solve the problem? Thank you.
>>
>> Best,
>> Yassine
>>
>
>


NoClassDefFoundError on cluster with httpclient 4.5.2

2016-10-19 Thread Yassine MARZOUGUI
Hi all,

I'm using httpclient with the following dependency:


org.apache.httpcomponents
httpclient
4.5.2


On local mode, the program works correctly, but when executed on the
cluster, I get the following exception:

java.lang.Exception: The user defined 'open(Configuration)' method in class
org.myorg.quickstart.Frequencies$2 caused an exception: Could not
initialize class org.apache.http.conn.ssl.SSLConnectionSocketFactory
at
org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337)
at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47)
at
org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:124)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.http.conn.ssl.SSLConnectionSocketFactory
at
org.apache.http.impl.client.HttpClientBuilder.build(HttpClientBuilder.java:966)
at org.myorg.quickstart.Frequencies$2.open(Frequencies.java:82)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1335)
... 5 more

I'm using Flink 1.1.3. Any idea how to solve the problem? Thank you.

Best,
Yassine


Re: BoundedOutOfOrdernessTimestampExtractor and allowedlateness

2016-10-17 Thread Yassine MARZOUGUI
Hi Fabian,

Thank you very much for the great answer and example, I appreciate it!
It is all clear now.

Best,
Yassine

2016-10-17 16:29 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> I have to extend my answer:
>
> The behavior allowedLateness that I described applies only if the window
> trigger calls FIRE when the window is evaluated (this is the default
> behavior of most triggers).
>
> In case the trigger calls FIRE_AND_PURGE, the state of the window is
> purged when the function is evaluated and late events are processed alone,
> i.e., in my example <12:09, G> would be processed without [A, B, C, D].
> When the allowed lateness is passed, all window state is purged regardless
> of the trigger.
>
> Best, Fabian
>
> 2016-10-17 16:24 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi Yassine,
>>
>> the difference is the following:
>>
>> 1) The BoundedOutOfOrdernessTimestampExtractor is a built-in timestamp
>> extractor and watermark assigner.
>> A timestamp extractor tells Flink when an event happened, i.e., it
>> extracts a timestamp from the event. A watermark assigner tells Flink what
>> the current logical time is.
>> The BoundedOutOfOrdernessTimestampExtractor works as follows: When Flink
>> asks what the current time is, it returns the latest observed timestamp
>> minus the a configurable bound. This is the safety margin for late data.
>>  A record whose timestamp is lower than the last watermark is considered
>> to be late.
>>
>> 2) The allowedLateness parameter of time windows tells Flink how long to
>> keep state around after the window was evaluated.
>> If data arrives after the evaluation and before the allowedLateness has
>> passed, the window function is applied again and an update is sent out.
>>
>> Let's look at an example.
>> Assume you have a BOOTE with a 2 minute bound and a 10 minute tumbling
>> window that starts at 12:00 and ends at 12:10:
>>
>> If you have the following data:
>>
>> 12:01, A
>> 12:04, B
>> WM, 12:02 // 12:04 - 2 minutes
>> 12:02, C
>> 12:08, D
>> 12:14, E
>> WM, 12:12
>> 12:16, F
>> WM, 12:14 // 12:16 - 2 minutes
>> 12:09, G
>>
>> == no allowed lateness
>> The window operator forwards the logical time to 12:12 when it receives
>> <WM, 12:12> and evaluates the window which contains [A, B, C, D] at this
>> time and finally purges its state. <12:09, G> is later ignored.
>>
>> == allowed lateness of 3 minutes
>> The window operator evaluates the window when <WM, 12:12> is received,
>> but its state is not purged yet. The state is purged when <WM, 12:14> is
>> received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is
>> again ignored.
>>
>> == allowed lateness of 5 minutes
>> The window operator evaluates the window when <WM, 12:12> is received,
>> but its state is not purged yet. When <12:09, G> is received, the window is
>> again evaluated but this time with [A, B, C, D, G] and an update is sent
>> out. The state is purged when a watermark of >=12:15 is received.
>>
>> So, watermarks tell the Flink what time it is and allowed lateness tells
>> the system when state should be discarded and all later arriving data be
>> ignored.
>> These issue are related but not exactly the same thing. For instance you
>> can counter late data by increasing the bound or the lateness parameter.
>> Increasing the watermark bound will yield higher latencies as windows are
>> evaluated later.
>> Configuring allowedLateness will allow for earlier results, but you have
>> to cope with the updates downstream.
>>
>> Please let me know, if you have questions.
>>
>> Best, Fabian
>>
>>
>>
>>
>>
>>
>>
>>
>> 2016-10-17 11:52 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>>
>>> Hi,
>>>
>>> I'm a bit confused about how Flink deals with late elements after the
>>> introduction of allowedlateness to windows. What is the difference between
>>> using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and
>>> allowedlateness(Time.seconds(X))? What if one is used and the other is
>>> not? and what if a different lateness is used in each one? Could you please
>>> clarify it on basis of a simple example? Thank you.
>>>
>>> Best,
>>> Yassine
>>>
>>
>>
>


BoundedOutOfOrdernessTimestampExtractor and allowedlateness

2016-10-17 Thread Yassine MARZOUGUI
Hi,

I'm a bit confused about how Flink deals with late elements after the
introduction of allowedlateness to windows. What is the difference between
using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and
allowedlateness(Time.seconds(X))? What if one is used and the other is not?
and what if a different lateness is used in each one? Could you please
clarify it on basis of a simple example? Thank you.

Best,
Yassine


Re: "java.net.SocketException: No buffer space available (maximum connections reached?)" when reading from HDFS

2016-10-17 Thread Yassine MARZOUGUI
That solved my problem, Thank you!

Best,
Yassine

2016-10-16 19:18 GMT+02:00 Stephan Ewen <se...@apache.org>:

> Hi!
>
> Looks to me that this is the following problem: The Decompression Streams
> did not properly forward the "close()" calls.
>
> It is in the lastest 1.2-SNAPSHOT, but did not make it into version 1.1.3.
> The fix is in that pull request: https://github.com/apache/flink/pull/2581
>
> I have pushed the fix into the latest 1.1-SNAPSHOT branch.
>
> If you get the code via "git clone -b release-1.1
> https://github.com/apache/flink.git; you will get the code that is the
> same as the 1.1.3 release, plus the patch to this problem.
>
> Greetings,
> Stephan
>
>
> On Sat, Oct 15, 2016 at 10:11 PM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
>> Hi all,
>>
>> I'm reading a large number of small files from HDFS in batch mode (about
>> 20 directories, each directory contains about 3000 files, using
>> recursive.file.enumeration=true), and each time, at about 200 GB of
>> received data, my job fails with the following exception:
>>
>> java.io.IOException: Error opening the Input Split
>> hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block:
>> BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
>> file=/filepath/filename.csv.gz
>> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>> tFormat.java:693)
>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>> imitedInputFormat.java:424)
>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>> imitedInputFormat.java:47)
>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(
>> DataSourceTask.java:140)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> at java.lang.Thread.run(Unknown Source)
>> Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not
>> obtain block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
>> file=/filepath/filename.csv.gz
>> at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInpu
>> tStream.java:984)
>> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt
>> ream.java:642)
>> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn
>> putStream.java:882)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.
>> java:934)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.
>> java:735)
>> at java.io.FilterInputStream.read(Unknown Source)
>> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
>> HadoopDataInputStream.java:59)
>> at java.util.zip.CheckedInputStream.read(Unknown Source)
>> at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>> at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>> at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>> at java.util.zip.GZIPInputStream.(Unknown Source)
>> at java.util.zip.GZIPInputStream.(Unknown Source)
>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44)
>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31)
>> at org.apache.flink.api.common.io.FileInputFormat.decorateInput
>> Stream(FileInputFormat.java:717)
>> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>> tFormat.java:689)
>> ... 5 more
>>
>> I checked the file each time and it exists and is healthy. Looking at the
>> taskmanager logs, I found the following exceptions which suggests it is
>> running out of connections:
>>
>> 2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.BlockReaderFactory
>> - I/O error constructing remote block reader.
>> java.net.SocketException: No buffer space available (maximum connections
>> reached?): connect
>> at sun.nio.ch.Net.connect0(Native Method)
>> at sun.nio.ch.Net.connect(Unknown Source)
>> at sun.nio.ch.Net.connect(Unknown Source)
>> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
>> at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWi
>> thTimeout.java:192)
>> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
>> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
>> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockR
>> 

"java.net.SocketException: No buffer space available (maximum connections reached?)" when reading from HDFS

2016-10-15 Thread Yassine MARZOUGUI
Hi all,

I'm reading a large number of small files from HDFS in batch mode (about 20
directories, each directory contains about 3000 files, using
recursive.file.enumeration=true), and each time, at about 200 GB of
received data, my job fails with the following exception:

java.io.IOException: Error opening the Input Split
hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block:
BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
file=/filepath/filename.csv.gz
at
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:693)
at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:424)
at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:140)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain
block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
file=/filepath/filename.csv.gz
at
org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:984)
at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:642)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
at
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
at java.io.FilterInputStream.read(Unknown Source)
at
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:59)
at java.util.zip.CheckedInputStream.read(Unknown Source)
at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
at java.util.zip.GZIPInputStream.(Unknown Source)
at java.util.zip.GZIPInputStream.(Unknown Source)
at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:44)
at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:31)
at
org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:717)
at
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:689)
... 5 more

I checked the file each time and it exists and is healthy. Looking at the
taskmanager logs, I found the following exceptions which suggests it is
running out of connections:

2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.BlockReaderFactory
- I/O error constructing remote block reader.
java.net.SocketException: No buffer space available (maximum connections
reached?): connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Unknown Source)
at sun.nio.ch.Net.connect(Unknown Source)
at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
at
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
at
org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:777)
at
org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:694)
at
org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:673)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
at java.io.FilterInputStream.read(Unknown Source)
at
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:59)
at java.util.zip.CheckedInputStream.read(Unknown Source)
at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
at java.util.zip.GZIPInputStream.(Unknown Source)
at java.util.zip.GZIPInputStream.(Unknown Source)
at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:44)
at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:31)
at
org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:717)
at
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:689)
at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:424)
at 

Re: Handling decompression exceptions

2016-10-11 Thread Yassine MARZOUGUI
Thank you Fabian and Flavio for your help.

Best,
Yassine

2016-10-11 14:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:

> I posted a workaround for that at https://github.com/okkam-it/
> flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/
> datasourcemanager/importers/Csv2RowExample.java
>
> On 11 Oct 2016 1:57 p.m., "Fabian Hueske" <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> Flink's String parser does not support escaped quotes. You data contains
>> a double double quote (""). The parser identifies this as the end of the
>> string field.
>> As a workaround, you can read the file as a regular text file, line by
>> line and do the parsing in a MapFunction.
>>
>> Best, Fabian
>>
>> 2016-10-11 13:37 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>>
>>> Forgot to add parseQuotedStrings('"'). After adding it I'm getting the
>>> same exception with the second code too.
>>>
>>> 2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
>>> :
>>>
>>>> Hi Fabian,
>>>>
>>>> I tried to debug the code, and it turns out a line in my csv data is
>>>> causing the ArrayIndexOutOfBoundsException, here is the exception
>>>> stacktrace:
>>>>
>>>> java.lang.ArrayIndexOutOfBoundsException: -1
>>>> at org.apache.flink.types.parser.StringParser.parseField(String
>>>> Parser.java:49)
>>>> at org.apache.flink.types.parser.StringParser.parseField(String
>>>> Parser.java:28)
>>>> at org.apache.flink.types.parser.FieldParser.resetErrorStateAnd
>>>> Parse(FieldParser.java:98)
>>>> at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRe
>>>> cord(GenericCsvInputFormat.java:395)
>>>> at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvIn
>>>> putFormat.java:110)
>>>> at org.apache.flink.api.common.io.DelimitedInputFormat.nextReco
>>>> rd(DelimitedInputFormat.java:470)
>>>> at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvIn
>>>> putFormat.java:78)
>>>> at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputF
>>>> ormat.java:106)
>>>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat
>>>> aSourceTask.java:162)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> And here is a sample CSV:
>>>>
>>>> timestamp,url,id
>>>> 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infogr
>>>> aphie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",000
>>>>
>>>> Using my code, I get the previous exception when parsing the sample
>>>> CSV. If I use the following code, I get an incorrect result : (2016-08-31
>>>> 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31
>>>> 12:08:11.223, 000)
>>>>
>>>> DataSet<Tuple2<String, String>> withReadCSV = 
>>>> env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv")
>>>> .ignoreFirstLine()
>>>> .fieldDelimiter(",")
>>>> .includeFields("101")
>>>> .ignoreInvalidLines()
>>>> .types(String.class, String.class);
>>>> withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", 
>>>> FileSystem.WriteMode.OVERWRITE).setParallelism(1);
>>>>
>>>>
>>>> Is it a bug in Flink or is my data not compliant with the csv standards?
>>>>
>>>> Thanks,
>>>> Yassine
>>>>
>>>>
>>>> 2016-10-11 11:21 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>>>
>>>>> Hi Yassine,
>>>>>
>>>>> I ran your code without problems and got the correct result.
>>>>> Can you provide the Stacktrace of the Exception?
>>>>>
>>>>> Thanks, Fabian
>>>>>
>>>>> 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <
>>>>> y.marzou...@mindlytix.com>:
>>>>>
>>>>>> Thank you Fabian and Stephan for the suggestions.
>>>>>> I couldn't override "readLine()" because it's final, so went with
>>>>>> Fabian's solution, but I'm struggling with csv field masks.

Re: Handling decompression exceptions

2016-10-11 Thread Yassine MARZOUGUI
Forgot to add parseQuotedStrings('"'). After adding it I'm getting the same
exception with the second code too.

2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

> Hi Fabian,
>
> I tried to debug the code, and it turns out a line in my csv data is
> causing the ArrayIndexOutOfBoundsException, here is the exception
> stacktrace:
>
> java.lang.ArrayIndexOutOfBoundsException: -1
> at org.apache.flink.types.parser.StringParser.parseField(
> StringParser.java:49)
> at org.apache.flink.types.parser.StringParser.parseField(
> StringParser.java:28)
> at org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(
> FieldParser.java:98)
> at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(
> GenericCsvInputFormat.java:395)
> at org.apache.flink.api.java.io.CsvInputFormat.readRecord(
> CsvInputFormat.java:110)
> at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(
> DelimitedInputFormat.java:470)
> at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(
> CsvInputFormat.java:78)
> at org.myorg.quickstart.MyCsvInputFormat.nextRecord(
> MyCsvInputFormat.java:106)
> at org.apache.flink.runtime.operators.DataSourceTask.
> invoke(DataSourceTask.java:162)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> And here is a sample CSV:
>
> timestamp,url,id
> 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/
> infographie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",000
>
> Using my code, I get the previous exception when parsing the sample CSV.
> If I use the following code, I get an incorrect result : (2016-08-31
> 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31
> 12:08:11.223, 000)
>
> DataSet<Tuple2<String, String>> withReadCSV = 
> env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv")
> .ignoreFirstLine()
> .fieldDelimiter(",")
> .includeFields("101")
> .ignoreInvalidLines()
> .types(String.class, String.class);
> withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", 
> FileSystem.WriteMode.OVERWRITE).setParallelism(1);
>
>
> Is it a bug in Flink or is my data not compliant with the csv standards?
>
> Thanks,
> Yassine
>
>
> 2016-10-11 11:21 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi Yassine,
>>
>> I ran your code without problems and got the correct result.
>> Can you provide the Stacktrace of the Exception?
>>
>> Thanks, Fabian
>>
>> 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>>
>>> Thank you Fabian and Stephan for the suggestions.
>>> I couldn't override "readLine()" because it's final, so went with
>>> Fabian's solution, but I'm struggling with csv field masks. Any help is
>>> appreciated.
>>> I created an Input Format which is basically TupleCsvInputFormat for
>>> which I overrode the nextRecord() method to catch the exceptions. But I'm
>>> having a *java.lang.ArrayIndexOutOfBoundsException*  when I add a
>>> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field
>>> mask, the job succeeds but outputs the first and second columns. Here is my
>>> code:
>>>
>>> TupleTypeInfo<Tuple2<String, String>> typeInfo =
>>> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
>>> Path histPath = new Path("hdfs:///shared/file.csv");
>>>
>>> CsvInputFormat <Tuple2<String, String>> myInputFormt = new
>>> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
>>> myInputFormt.enableQuotedStringParsing('"');
>>> myInputFormt.setSkipFirstLineAsHeader(true);
>>> myInputFormt.setLenient(true);
>>>
>>> DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,t
>>> ypeInfo).withParameters(parameters);
>>> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE);
>>>
>>> and here is the  custom input format:
>>>
>>> public class MyCsvInputFormat extends CsvInputFormat {
>>> private static final long serialVersionUID = 1L;
>>> private TupleSerializerBase tupleSerializer;
>>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase
>>> tupleTypeInfo) {
>>> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
>>> tupleTypeInfo);
>>> }
>>> p

Re: Handling decompression exceptions

2016-10-11 Thread Yassine MARZOUGUI
Hi Fabian,

I tried to debug the code, and it turns out a line in my csv data is
causing the ArrayIndexOutOfBoundsException, here is the exception
stacktrace:

java.lang.ArrayIndexOutOfBoundsException: -1
at
org.apache.flink.types.parser.StringParser.parseField(StringParser.java:49)
at
org.apache.flink.types.parser.StringParser.parseField(StringParser.java:28)
at
org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:98)
at
org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(GenericCsvInputFormat.java:395)
at
org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:110)
at
org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:470)
at
org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
at
org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputFormat.java:106)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

And here is a sample CSV:

timestamp,url,id
2016-08-31 12:08:11.223,"
https://www.toyota.fr/hybrid-innovation/infographie.jsontcgcc, ce)_13h00
/""=/-3h00 %=) 1",000

Using my code, I get the previous exception when parsing the sample CSV. If
I use the following code, I get an incorrect result : (2016-08-31
12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31
12:08:11.223, 000)

DataSet<Tuple2<String, String>> withReadCSV =
env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv")
.ignoreFirstLine()
.fieldDelimiter(",")
.includeFields("101")
.ignoreInvalidLines()
.types(String.class, String.class);
withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt",
FileSystem.WriteMode.OVERWRITE).setParallelism(1);


Is it a bug in Flink or is my data not compliant with the csv standards?

Thanks,
Yassine


2016-10-11 11:21 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Yassine,
>
> I ran your code without problems and got the correct result.
> Can you provide the Stacktrace of the Exception?
>
> Thanks, Fabian
>
> 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>
>> Thank you Fabian and Stephan for the suggestions.
>> I couldn't override "readLine()" because it's final, so went with
>> Fabian's solution, but I'm struggling with csv field masks. Any help is
>> appreciated.
>> I created an Input Format which is basically TupleCsvInputFormat for
>> which I overrode the nextRecord() method to catch the exceptions. But I'm
>> having a *java.lang.ArrayIndexOutOfBoundsException*  when I add a
>> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field
>> mask, the job succeeds but outputs the first and second columns. Here is my
>> code:
>>
>> TupleTypeInfo<Tuple2<String, String>> typeInfo =
>> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
>> Path histPath = new Path("hdfs:///shared/file.csv");
>>
>> CsvInputFormat <Tuple2<String, String>> myInputFormt = new
>> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
>> myInputFormt.enableQuotedStringParsing('"');
>> myInputFormt.setSkipFirstLineAsHeader(true);
>> myInputFormt.setLenient(true);
>>
>> DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,t
>> ypeInfo).withParameters(parameters);
>> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE);
>>
>> and here is the  custom input format:
>>
>> public class MyCsvInputFormat extends CsvInputFormat {
>> private static final long serialVersionUID = 1L;
>> private TupleSerializerBase tupleSerializer;
>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase
>> tupleTypeInfo) {
>> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
>> tupleTypeInfo);
>> }
>> public MyCsvInputFormat(Path filePath, String lineDelimiter, String
>> fieldDelimiter, TupleTypeInfoBase tupleTypeInfo) {
>> this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo,
>> createDefaultMask(tupleTypeInfo.getArity()));
>> }
>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase
>> tupleTypeInfo, int[] includedFieldsMask) {
>> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
>> tupleTypeInfo, includedFieldsMask);
>> }
>> public MyCsvInputFormat(Path filePath, String lineDelimiter, String
>> fieldDelimiter, TupleTypeInfoBase tupleTypeInf

Re: Handling decompression exceptions

2016-10-10 Thread Yassine MARZOUGUI
r would also catch and ignore the EOFException.
>>
>> If you do that, you would not be able to use the env.readCsvFile()
>> shortcut but would need to create an instance of your own InputFormat and
>> add it with
>> env.readFile(yourIF).
>>
>> Hope this helps,
>> Fabian
>>
>> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>>
>>> Hi all,
>>>
>>> I am reading a large number of GZip compressed csv files, nested in a
>>> HDFS directory:
>>>
>>> Configuration parameters = new Configuration();
>>> parameters.setBoolean("recursive.file.enumeration", true);
>>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share
>>> d/logs/")
>>> .ignoreFirstLine()
>>> .fieldDelimiter("|")
>>> .includeFields("011000")
>>> .types(String.class, Long.class)
>>> .withParameters(parameters);
>>>
>>> My job is failing with the following exception:
>>>
>>> 2016-10-04 17:19:59,933 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager- Status of 
>>> job 66fce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.
>>>
>>> java.io.EOFException: Unexpected end of ZLIB input stream
>>>
>>> at java.util.zip.InflaterInputStream.fill(Unknown Source)
>>>
>>> at java.util.zip.InflaterInputStream.read(Unknown Source)
>>>
>>> at java.util.zip.GZIPInputStream.read(Unknown Source)
>>>
>>> at 
>>> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)
>>>
>>> at 
>>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)
>>>
>>> at 
>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)
>>>
>>> at 
>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)
>>>
>>> at 
>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
>>>
>>> at 
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
>>>
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>>>
>>> at java.lang.Thread.run(Unknown Source)
>>>
>>> I think it is due to some unproperly compressed files, how can I handle and 
>>> ignore such exceptions? Thanks.
>>>
>>>
>>> Best,
>>> Yassine
>>>
>>>
>>
>


Re: Simple batch job hangs if run twice

2016-09-23 Thread Yassine MARZOUGUI
Hi Fabian,

Not sure if this answers your question, here is the stack I got when
debugging the combine and datasource operators when the job got stuck:

"DataSource (at main(BatchTest.java:28)
(org.apache.flink.api.java.io.TupleCsvInputFormat)) (1/8)"
at java.lang.Object.wait(Object.java)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:93)
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:163)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

"Combine (GroupReduce at first(DataSet.java:573)) (1/8)"
at java.lang.Object.wait(Object.java)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:93)
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at
org.apache.flink.api.java.functions.FirstReducer.reduce(FirstReducer.java:41)
at
org.apache.flink.api.java.functions.FirstReducer.combine(FirstReducer.java:52)
at
org.apache.flink.runtime.operators.AllGroupReduceDriver.run(AllGroupReduceDriver.java:152)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

Best,
Yassine


2016-09-23 11:28 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

> Hi Fabian,
>
> Is it different from the output I already sent? (see attached file). If
> yes, how can I obtain the stacktrace of the job programmatically? Thanks.
>
> Best,
> Yassine
>
> 2016-09-23 10:55 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi Yassine, can you share a stacktrace of the job when it got stuck?
>>
>> Thanks, Fabian
>>
>> 2016-09-22 14:03 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>>
>>> The input splits are correctly assgined. I noticed that whenever the job
>>> is stuck, that is because the task *Combine (GroupReduce at
>>> first(DataSet.java:573)) *keeps RUNNING and never switches to FINISHED.
>>> I tried to debug the program at the *first(100), *but I couldn't do
>>> much. I attahced the full DEBUG output.
>>>
>>> 2016-09-22 12:10 GMT+02:00 Robert Metzger <rmetz...@apache.org>:
>>>
>>>> Can you try running with DEBUG logging level?
>>>> Then you should see if input splits are assigned.
>>>> Also, you could try to use a debugger to see what's going on.
>>>>
>>>> On Mon, Sep 19, 2016 at 2:04 PM, Yassine MARZOUGUI <
>>>> y.marzou...@mindlytix.com> wrote:
>>>>
>>>>> Hi Chensey,
>>>>>
>>>>> I am running Flink 1.1.2, and using NetBeans 8.1.
>>>>> I made a screencast reproducing the problem here:
>>>>> http://recordit.co/P53OnFokN4 <http://recordit.co/VRBpBlb51A>.
>>>>>
>>>>> Best,
>>>>> Yassine
>>>>>
>>>>>
>>>>> 2016-09-19 10:04 GMT+02:00 Chesnay Schepler <ches...@apache.org>:
>>>>>
>>>>>> No, I can't recall that i had this happen to me.
>>>>>>
>>>>>> I would enable logging and try again, as well as checking whether the
>>>>>> second job is actually running through the WebInterface.
>>>>>>
>>>>>> If you tell me your NetBeans version i can try to reproduce it.
>>>>>>
>>>>>> Also, which version of Flink are you using?
>>>>>>
>>>>>>
>>>>>> On 19.09.2016 07:45, Aljoscha Krettek wrote:
>>>>>>
>>>>>> Hmm, this sound like it could be IDE/Windows specific, unfortunately
>>>>>> I don't have access to a windows machine. I'll loop in Chesnay how is 
>>>>>> using
>>>>>> windows.
>>>>>>
>>>>>> Chesnay, do you maybe have an idea what could be the problem? Have
>>

Re: Simple batch job hangs if run twice

2016-09-17 Thread Yassine MARZOUGUI
Hi Aljoscha,

Thanks for your response. By the first time I mean I hit run from the IDE
(I am using Netbeans on Windows) the first time after building the program.
If then I stop it and run it again (without rebuidling) It is stuck in the
state RUNNING. Sometimes I have to rebuild it, or close the IDE to be able
to get an output. The behaviour is random, maybe it's related to the IDE or
the OS and not necessarily Flink itself.

On Sep 17, 2016 15:16, "Aljoscha Krettek" <aljos...@apache.org> wrote:

> Hi,
> when is the "first time". It seems you have tried this repeatedly so what
> differentiates a "first time" from the other times? Are you closing your
> IDE in-between or do you mean running the job a second time within the same
> program?
>
> Cheers,
> Aljoscha
>
> On Fri, 9 Sep 2016 at 16:40 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
> wrote:
>
>> Hi all,
>>
>> When I run the following batch job inside the IDE for the first time, it
>> outputs results and switches to FINISHED, but when I run it again it is
>> stuck in the state RUNNING. The csv file size is 160 MB. What could be the
>> reason for this behaviour?
>>
>> public class BatchJob {
>>
>> public static void main(String[] args) throws Exception {
>> final ExecutionEnvironment env = ExecutionEnvironment.
>> getExecutionEnvironment();
>>
>> env.readCsvFile("dump.csv")
>> .ignoreFirstLine()
>> .fieldDelimiter(";")
>> .includeFields("111000")
>> .types(String.class, String.class, String.class)
>> .first(100)
>> .print();
>>
>> }
>> }
>>
>> Best,
>> Yassine
>>
>


Re: Handle deserialization error

2016-08-26 Thread Yassine Marzougui
Hi Jack,

As Robert Metzger mentioned in a previous thread, there's an ongoing
discussion about the issue in this JIRA:
https://issues.apache.org/jira/browse/FLINK-3679.

A possible workaround is to use a SimpleStringSchema in the Kafka source,
and chain it with a flatMap operator where you can use your custom
deserializer and handle deserialization errors.

Best,
Yassine

On Aug 27, 2016 02:37, "Jack Huang"  wrote:

> Hi all,
>
> I have a custom deserializer which I pass to a Kafka source to transform
> JSON string to Scala case class.
>
> val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new 
> JsonSerde(classOf[Event], new Event), kafkaProp))
>
> ​
>
> There are time when the JSON message is malformed, in which case I want to
> catch the exception, log some error message, and go on to the next message
> without producing an event to the downstream. It doesn't seem like the 
> DeserializationSchema
> interface allows such behavior. How could I achieve this?
>
> Thanks,
> Jack
>


Re: how to get rid of duplicate rows group by in DataStream

2016-08-24 Thread Yassine Marzougui
Sorry I mistyped the code, it should be
*timeWindow**(Time.minutes(10))* instead
of *window**(Time.minutes(10)).*

On Wed, Aug 24, 2016 at 9:29 PM, Yassine Marzougui <yassmar...@gmail.com>
wrote:

> Hi subash,
>
> A stream is infinite, hence it has no notion of "final" count. To get
> distinct counts you need to define a period (= a window [1] ) over which
> you count elements and emit a result, by adding a winow operator before the
> reduce.
> For example the following will emit distinct counts every 10 minutes over
> the last 10 minutes period:
>
> *stream.keyby(2)*
> *  .window(Time.minutes(10))*
> *  .reduce(new GridPointsCount())*
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/apis/streaming/windows.html
>
>
> On Wed, Aug 24, 2016 at 6:14 PM, subash basnet <yasub...@gmail.com> wrote:
>
>> Hello Kostas,
>>
>> Sorry for late reply. But I couldn't understand how to apply split in
>> datastream, such as in below to get the distinct output stream element with
>> the count after applying group by and reduce.
>>
>> DataStream<Tuple2<String, Long>> gridWithDensity =
>> pointsWithGridCoordinates.map(new AddCountAppender())
>> .keyBy(2).reduce(*new GridPointsCount()*).map(new
>> RetrieveGridWithCount());
>> gridWithDensity.print();
>>
>> Current Output:
>>   Required Output:
>> (0,1)
>>  (0,3)
>> (0,2)
>>  (0,4)
>> (0,1)
>> (0,2)
>> (0,3)
>> (0,3)
>> (0,4)
>>
>> public static final class GridPointsCount implements
>> ReduceFunction<Tuple4<Point, Grid, String, Long>> {
>> @Override
>> public Tuple4<Point, Grid, String, Long> reduce(Tuple4<Point, Grid,
>> String, Long> val1,
>> Tuple4<Point, Grid, String, Long> val2) {
>> return new Tuple4<Point, Grid, String, Long>(val1.f0, val1.f1, val1.f2,
>> val1.f3 + val2.f3);
>> }
>> }
>>
>>
>> Regards,
>> Subash Basnet
>>
>> On Mon, Aug 22, 2016 at 6:34 PM, Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> [image: Boxbe] <https://www.boxbe.com/overview> This message is
>>> eligible for Automatic Cleanup! (k.klou...@data-artisans.com) Add
>>> cleanup rule
>>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Fkey%3DDbXSEeCvlLA38dy4LWQ%252Bbi5EVsEyM7uPcveSQFq%252FvFY%253D%26token%3DiyAq2d4gLBvR1lxgjbsxqD%252BdBWvTfV7BV7%252BvSygyQXwgHoGt5X14QdpMF1iSW4G0Qw7Sb6h%252FaXTQuS4dPnyuWCemTmCcMq0fJSpZwsztLpp9PMU7tCLvpRqvo9N%252B9Aj7ixZD8zvIdLvXB2%252FQqkPEDw%253D%253D_serial=26521059433_rand=1244322567_source=stf_medium=email_campaign=ANNO_CLEANUP_ADD_content=001>
>>> | More info
>>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=26521059433_rand=1244322567_source=stf_medium=email_campaign=ANNO_CLEANUP_ADD_content=001>
>>>
>>> Hi Subash,
>>>
>>> You should also split your elements in windows.
>>> If not, Flink emits an element for each incoming record.
>>> That is why you have:
>>>
>>> (1,1)
>>> (1,2)
>>> (1,3)
>>>
>>> …
>>>
>>> Kostas
>>>
>>> > On Aug 22, 2016, at 5:58 PM, subash basnet <yasub...@gmail.com> wrote:
>>> >
>>> > Hello all,
>>> >
>>> > I grouped by the input based on it's id to count the number of
>>> elements in each group.
>>> > DataStream<Tuple2<String, Long>> gridWithCount;
>>> > Upon printing the above datastream it shows with duplicate rows:
>>> > Output:
>>> > (1, 1)
>>> > (1,2)
>>> > (2,1)
>>> > (1,3)
>>> > (2,2)...
>>> >
>>> > Whereas I wanted the distinct rows with final count:
>>> > Needed Output:
>>> > (1,3)
>>> > (2,2)..
>>> >
>>> > What could be the way to achieve this.
>>> >
>>> >
>>> > Regards,
>>> > Subash Basnet
>>>
>>>
>>>
>>
>


Re: how to get rid of duplicate rows group by in DataStream

2016-08-24 Thread Yassine Marzougui
Hi subash,

A stream is infinite, hence it has no notion of "final" count. To get
distinct counts you need to define a period (= a window [1] ) over which
you count elements and emit a result, by adding a winow operator before the
reduce.
For example the following will emit distinct counts every 10 minutes over
the last 10 minutes period:

*stream.keyby(2)*
*  .window(Time.minutes(10))*
*  .reduce(new GridPointsCount())*

[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html


On Wed, Aug 24, 2016 at 6:14 PM, subash basnet  wrote:

> Hello Kostas,
>
> Sorry for late reply. But I couldn't understand how to apply split in
> datastream, such as in below to get the distinct output stream element with
> the count after applying group by and reduce.
>
> DataStream> gridWithDensity =
> pointsWithGridCoordinates.map(new AddCountAppender())
> .keyBy(2).reduce(*new GridPointsCount()*).map(new
> RetrieveGridWithCount());
> gridWithDensity.print();
>
> Current Output:
>   Required Output:
> (0,1)
>  (0,3)
> (0,2)
>  (0,4)
> (0,1)
> (0,2)
> (0,3)
> (0,3)
> (0,4)
>
> public static final class GridPointsCount implements
> ReduceFunction> {
> @Override
> public Tuple4 reduce(Tuple4 String, Long> val1,
> Tuple4 val2) {
> return new Tuple4(val1.f0, val1.f1, val1.f2,
> val1.f3 + val2.f3);
> }
> }
>
>
> Regards,
> Subash Basnet
>
> On Mon, Aug 22, 2016 at 6:34 PM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> [image: Boxbe]  This message is eligible
>> for Automatic Cleanup! (k.klou...@data-artisans.com) Add cleanup rule
>> 
>> | More info
>> 
>>
>> Hi Subash,
>>
>> You should also split your elements in windows.
>> If not, Flink emits an element for each incoming record.
>> That is why you have:
>>
>> (1,1)
>> (1,2)
>> (1,3)
>>
>> …
>>
>> Kostas
>>
>> > On Aug 22, 2016, at 5:58 PM, subash basnet  wrote:
>> >
>> > Hello all,
>> >
>> > I grouped by the input based on it's id to count the number of elements
>> in each group.
>> > DataStream> gridWithCount;
>> > Upon printing the above datastream it shows with duplicate rows:
>> > Output:
>> > (1, 1)
>> > (1,2)
>> > (2,1)
>> > (1,3)
>> > (2,2)...
>> >
>> > Whereas I wanted the distinct rows with final count:
>> > Needed Output:
>> > (1,3)
>> > (2,2)..
>> >
>> > What could be the way to achieve this.
>> >
>> >
>> > Regards,
>> > Subash Basnet
>>
>>
>>
>


FLINK-4329 fix version

2016-08-23 Thread Yassine Marzougui
Hi all,

The fix version of FLINK-4329
 in JIRA is set to 1.1.1,
but 1.1.1 is already released. Should I expect it to be fixed in the next
release? and will a patch be available meanwhile? Thanks.

Yassine


Re: Kafka topic with an empty parition : parallelism issue and Timestamp monotony violated

2016-08-15 Thread Yassine Marzougui
I think I also figured out the reason of the behavior I described when one
Kafka partition is empty.
According to the JavaDocs
<https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#DataStream-org.apache.flink.streaming.api.environment.StreamExecutionEnvironment-org.apache.flink.streaming.api.transformations.StreamTransformation->,
the datastream partitioning is set to *forward* by default, i.e. each map
sub-task will receive data from exactly one source sub-task. For one of the
stream partitions (corresponding to the empty Kafka partition) resulting
from the map operator, the watermark does not advance, which makes the
window operator wait forever.
Now if the map and source operators have a different parallelism, Flink
uses rebalance partitioning to redistribute the stream as pointed out in this
mailing list thread
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Forward-Partitioning-same-Parallelism-1-1-communication-tp2373p2382.html>,
therefore the watermark advances for all the stream partitions output from
the map operator.
Some of the details regarding the partitioning were mentioned in the 0.9
docs
<https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/streaming_guide.html#partitioning>,
but unfortunately they aren't in the  1.x docs
<https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#physical-partitioning>
.

On Mon, Aug 15, 2016 at 12:38 PM, Yassine Marzougui <yassmar...@gmail.com>
wrote:

> Hi all,
>
> I have a Kafka topic with two partitions, messages within each partition
> are ordered in ascending timestamps.
>
> The following code works correctly (I'm running this on my local machine,
> the default parallelism is the number of cores=8):
>
> stream = env.addSource(myFlinkKafkaConsumer09)
> stream.map(mapper)
>   .assignTimestampsAndWatermarks(ascendingTimestampExtractor)
>   .keyby(0)
>   .timeWindow(Time.minutes(10))
>   .reduce(reducer)
>   .print()
>
> But if I explicitly set 
> env.addSource(myFlinkKafkaConsumer09).setparallelism(n),
> where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp
> monotony violated" warnings. My understanding is that only 2 sources will
> be mapped to the topic partitions and since messages are ordered within
> each partition, timestamps assignment should happen correctly regardless of
> the parallelsim as long as it is >= 2.
> *Question 1 *: What is the explanation of this?
>
>
> Now I add an other empty partition to the topic. The job doesn't give any
> output anymore and that's expected since it keeps waiting forever for the
> empty partition's watermark. What I don't understand though, is a
> strange behavior when set the parallelism explicitly at the source :
> *Question 2 *: Why am I able to get an output if I explicitly set
> env.addSource(myFlinkKafkaConsumer09).setparallelism(n), shouldn't the
> empty partition argument apply here too? And why is that output seen only
> when n != 8 ?
>
> Best,
> Yassine
>


Re: Kafka topic with an empty parition : parallelism issue and Timestamp monotony violated

2016-08-15 Thread Yassine Marzougui
I think I figured out the explanation of the first part. Looks like the
stream gets distributed and merged between the source and the map operator
because their parallelisms are different, and therefore the messages
resulting from the map operator become out of order. The "Timestamp
monotony violated" warnings disappeared when I set the source and the map
operator to the same parallelism.
I found about operator chaining and I tried to chain the source and map
operators (as in here : https://ci.apache.org/projects/flink/flink-docs-
release-1.0/concepts/concepts.html#tasks--operator-chains) in order to have
the same parallelism, but I didn't succeed. Isn't doing env.addSource().
setparallelism(n).startNewChain().map(...)disableChaining() equivalent to
setting source and map parallelism to the same value?



On Mon, Aug 15, 2016 at 12:38 PM, Yassine Marzougui <yassmar...@gmail.com>
wrote:

> Hi all,
>
> I have a Kafka topic with two partitions, messages within each partition
> are ordered in ascending timestamps.
>
> The following code works correctly (I'm running this on my local machine,
> the default parallelism is the number of cores=8):
>
> stream = env.addSource(myFlinkKafkaConsumer09)
> stream.map(mapper)
>   .assignTimestampsAndWatermarks(ascendingTimestampExtractor)
>   .keyby(0)
>   .timeWindow(Time.minutes(10))
>   .reduce(reducer)
>   .print()
>
> But if I explicitly set 
> env.addSource(myFlinkKafkaConsumer09).setparallelism(n),
> where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp
> monotony violated" warnings. My understanding is that only 2 sources will
> be mapped to the topic partitions and since messages are ordered within
> each partition, timestamps assignment should happen correctly regardless of
> the parallelsim as long as it is >= 2.
> *Question 1 *: What is the explanation of this?
>
>
> Now I add an other empty partition to the topic. The job doesn't give any
> output anymore and that's expected since it keeps waiting forever for the
> empty partition's watermark. What I don't understand though, is a
> strange behavior when set the parallelism explicitly at the source :
> *Question 2 *: Why am I able to get an output if I explicitly set
> env.addSource(myFlinkKafkaConsumer09).setparallelism(n), shouldn't the
> empty partition argument apply here too? And why is that output seen only
> when n != 8 ?
>
> Best,
> Yassine
>


Kafka topic with an empty parition : parallelism issue and Timestamp monotony violated

2016-08-15 Thread Yassine Marzougui
Hi all,

I have a Kafka topic with two partitions, messages within each partition
are ordered in ascending timestamps.

The following code works correctly (I'm running this on my local machine,
the default parallelism is the number of cores=8):

stream = env.addSource(myFlinkKafkaConsumer09)
stream.map(mapper)
  .assignTimestampsAndWatermarks(ascendingTimestampExtractor)
  .keyby(0)
  .timeWindow(Time.minutes(10))
  .reduce(reducer)
  .print()

But if I explicitly set
env.addSource(myFlinkKafkaConsumer09).setparallelism(n),
where n > (number of partitions = 2) and n !=8, I get a bunch of "Timestamp
monotony violated" warnings. My understanding is that only 2 sources will
be mapped to the topic partitions and since messages are ordered within
each partition, timestamps assignment should happen correctly regardless of
the parallelsim as long as it is >= 2.
*Question 1 *: What is the explanation of this?


Now I add an other empty partition to the topic. The job doesn't give any
output anymore and that's expected since it keeps waiting forever for the
empty partition's watermark. What I don't understand though, is a
strange behavior when set the parallelism explicitly at the source :
*Question 2 *: Why am I able to get an output if I explicitly set
env.addSource(myFlinkKafkaConsumer09).setparallelism(n),
shouldn't the empty partition argument apply here too? And why is that
output seen only when n != 8 ?

Best,
Yassine


Re: No output when using event time with multiple Kafka partitions

2016-08-15 Thread Yassine Marzougui
Hi Aljoscha,

Sorry for the late response, I was busy and couldn't make time to work on
this again again until now.
Indeed, it turns out only one of the partitions is not receiving elements.
The reason is that the producer will stick to a partition for
topic.metadata.refresh.interval.ms (defaults to 10 mins) time before
picking another partition at random. So I reduced the
topic.metadata.refresh.interval.ms, and I was able to get an output as soon
as the messages are produced.
I still have some questions about an unclear behavior regarding the
parallelism and watermarks assignment when one partition is empty, which I
will ask in a new mailing thread.
Thanks a lot for your help!

Best,
Yassine

On Fri, Jul 29, 2016 at 12:43 PM, Aljoscha Krettek 
wrote:

> Hi,
> when running in local mode the default parallelism is always the number of
> (possibly virtual) CPU cores. The parallelism of the sink is set before it
> gets a chance to find out how many Kafka partitions there are. I think the
> reason for the behavior you're observing is that only one of your two
> partitions ever receives elements and that thus the watermark does not
> advance for that partition. Could that be the case?
>
> Cheers,
> Aljoscha
>
> On Wed, 27 Jul 2016 at 14:58 Yassin Marzouki  wrote:
>
>> I just tried playing with the source paralleism setting, and I got a very
>> strange result:
>>
>> If specify the source parallism using env.addSource(kafka).setParallelism(N),
>> results are printed correctly for any number N except for N=4. I guess
>> that's related to the number of task slots since I have a 4 CPU cores, but
>> what is the explanation of that?
>> So I suppose that if I don't specify the source parallelism, it is set
>> automatically to 4. Isn't it supposed to be set to the number of topic
>> patitions (= 2) by default?
>>
>>
>> On Wed, Jul 27, 2016 at 2:33 PM, Yassin Marzouki 
>> wrote:
>>
>>> Hi Kostas,
>>>
>>> When I remove the window and the apply() and put print() after
>>> assignTimestampsAndWatermarks, the messages are printed correctly:
>>>
>>> 2> Request{ts=2015-01-01, 06:15:34:000}
>>> 2> Request{ts=2015-01-02, 16:38:10:000}
>>> 2> Request{ts=2015-01-02, 18:58:41:000}
>>> 2> Request{ts=2015-01-02, 19:10:00:000}
>>> 2> Request{ts=2015-01-02, 23:36:51:000}
>>> 2> Request{ts=2015-01-03, 17:38:47:000}
>>> ...
>>>
>>> But strangely using only one task. If I set the source parallelism to 1
>>> using env.addSource(kafka).setParallelism(1) (the window and the
>>> apply() still removed), results are printed using all available slots
>>> (number of CPU cores):
>>>
>>> 4> Request{ts=2015-01-01, 06:15:34:000}
>>> 4> Request{ts=2015-01-02, 16:38:10:000}
>>> 2> Request{ts=2015-01-02, 19:10:00:000}
>>> 4> Request{ts=2015-01-02, 23:36:51:000}
>>> 1> Request{ts=2015-01-02, 18:58:41:000}
>>> 2> Request{ts=2015-01-03, 17:38:47:000}
>>> 3> Request{ts=2015-01-03, 17:56:42:000}
>>> ...
>>>
>>> Now if I keep the window and apply() with without specifying source
>>> parallelism, no messages are printed (only regular kafka consumer and flink
>>> logs), and if the source parallelism is set to 1, messages are printed
>>> correctly:
>>>
>>> 1> Window: TimeWindow{start=142007040, end=142015680}
>>> 2> Request{ts=2015-01-01, 06:15:34:000}
>>> 1> Request{ts=2015-01-02, 16:38:10:000}
>>> 4> Request{ts=2015-01-02, 19:10:00:000}
>>> 3> Window: TimeWindow{start=142015680, end=142024320}
>>> 3> Request{ts=2015-01-02, 18:58:41:000}
>>> 2> Request{ts=2015-01-02, 23:36:51:000}
>>> 3> Window: TimeWindow{start=142041600, end=142050240}
>>> 2> Request{ts=2015-01-03, 17:38:47:000}
>>> 4> Window: TimeWindow{start=142024320, end=142032960}
>>> 1> Request{ts=2015-01-03, 17:56:42:000}
>>> 1> Request{ts=2015-01-05, 17:13:45:000}
>>> 4> Request{ts=2015-01-05, 01:25:55:000}
>>> 2> Request{ts=2015-01-05, 14:27:45:000}
>>> ...
>>>
>>> On Wed, Jul 27, 2016 at 1:41 PM, Kostas Kloudas <
>>> k.klou...@data-artisans.com> wrote:
>>>
 Hi Yassine,

 Could you just remove the window and the apply, and  just put a print()
 after the:

 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()
 {
 @Override
 public long extractAscendingTimestamp(Request req) {
 return req.ts;
 }
 })


 This at least will tell us if reading from Kafka works as expected.

 Kostas

 On Jul 25, 2016, at 3:39 PM, Yassin Marzouki 
 wrote:

 Hi everyone,

 I am reading messages from a Kafka topic with 2 partitions and using
 event time. This is my code:

 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()
 {
 @Override
 public long extractAscendingTimestamp(Request req) {
 return req.ts;
 }
 })
 .windowAll(TumblingEventTimeWindows.of(Time.days(1)))
 .apply((TimeWindow window, Iterable 

Re: Window not emitting output after upgrade to Flink 1.1.1

2016-08-12 Thread Yassine Marzougui
Hi Kostas,

Yes, that's the case. I will revert back to 1.0.3 until the bug is fixed.
Thank you.

Best,
Yassine

On Fri, Aug 12, 2016 at 10:34 AM, Kostas Kloudas <
k.klou...@data-artisans.com> wrote:

> Hi Yassine,
>
> Are you reading from a file and use ingestion time?
> If yes, then the problem can be related to this:
>
> https://issues.apache.org/jira/browse/FLINK-4329
>
> Is this the case?
>
> Best,
> Kostas
>
> On Aug 12, 2016, at 10:30 AM, Yassine Marzougui <yassmar...@gmail.com>
> wrote:
>
> Hi all,
>
> The following code works under Flink 1.0.3, but under 1.1.1 it just
> switches to FINISHED and doesn't output any result.
>
> stream.map(new RichMapFunction<String, Request>() {
>
> private ObjectMapper objectMapper;
>
> @Override
> public void open(Configuration parameters) {
> objectMapper = new ObjectMapper();
> }
>
> @Override
> public Request map(String value) throws Exception {
> return objectMapper.readValue(value, Request.class);
> }
>
> })
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor()
> {
> @Override
> public long extractAscendingTimestamp(Request req) {
> return req.ts;
> }
> })
> .map((Request req) -> new Tuple3<String, String, Integer>(req.userId,
> req.location, 1))
> .keyBy(0)
> .timeWindow(Time.minutes(10))
> .apply(
> (Tuple3<String, String, Integer> x, Tuple3<String, String,
> Integer> y) -> y,
> (Tuple key, TimeWindow w, Iterable<Tuple3<String, String,
> Integer>> itrbl, Collector<Tuple2<String, Integer>> clctr) -> {
> Tuple3<String, String, Integer> res =
> itrbl.iterator().next();
> clctr.collect(new Tuple2<>(res.f1, res.f2));
> })
> .print();
>
> The problem is with the window operator because I could print results
> before it.
>
> Best,
> Yassine
>
>
>


Window not emitting output after upgrade to Flink 1.1.1

2016-08-12 Thread Yassine Marzougui
Hi all,

The following code works under Flink 1.0.3, but under 1.1.1 it just
switches to FINISHED and doesn't output any result.

stream.map(new RichMapFunction() {

private ObjectMapper objectMapper;

@Override
public void open(Configuration parameters) {
objectMapper = new ObjectMapper();
}

@Override
public Request map(String value) throws Exception {
return objectMapper.readValue(value, Request.class);
}

})
.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(Request req) {
return req.ts;
}
})
.map((Request req) -> new Tuple3(req.userId,
req.location, 1))
.keyBy(0)
.timeWindow(Time.minutes(10))
.apply(
(Tuple3 x, Tuple3 y) -> y,
(Tuple key, TimeWindow w, Iterable> itrbl, Collector> clctr) -> {
Tuple3 res =
itrbl.iterator().next();
clctr.collect(new Tuple2<>(res.f1, res.f2));
})
.print();

The problem is with the window operator because I could print results
before it.

Best,
Yassine