回复:Flink batch processing fault tolerance

2017-02-16 Thread wangzhijiang999
yes, it is really a critical problem for large batch job because the unexpected 
failure is a common case. And we are already focusing on realizing the ideas 
mentioned in FLIP1, wish to contirbute to flink in months.
Best,
Zhijiang--发件人:Si-li
 Liu 发送时间:2017年2月17日(星期五) 11:22收件人:user 
主 题:Re: Flink batch processing fault tolerance
Hi, 
It's the reason why I gave up use Flink for my current project and pick up 
traditional Hadoop Framework again. 
2017-02-17 10:56 GMT+08:00 Renjie Liu :
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures
This FLIP may help.
On Thu, Feb 16, 2017 at 7:34 PM Anton Solovev  wrote:
Hi Aljoscha,
Could you share your plans of resolving it? Best,Anton  From: Aljoscha Krettek 
[mailto:aljos...@apache.org]

Sent: Thursday, February 16, 2017 2:48 PM
To: user@flink.apache.org
Subject: Re: Flink batch processing fault tolerance Hi,yes, this is indeed 
true. We had some plans for how to resolve this but they never materialised 
because of the focus on Stream Processing. We might unite the two in the future 
and then you will get fault-tolerant batch/stream processing
 in the same API. Best,Aljoscha On Wed, 15 Feb 2017 at 09:28 Renjie Liu 
 wrote:Hi, all:

I'm learning flink's doc and curious about the fault tolerance of batch process 
jobs. It seems that when one of task execution fails, the whole job will be 
restarted, is it true? If so, isn't it impractical to deploy large flink batch 
jobs? -- Liu, RenjieSoftware Engineer, MVAD-- 
Liu, RenjieSoftware Engineer, MVAD


-- 
Best regards


Sili Liu


Re: "Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

2017-02-16 Thread Tzu-Li (Gordon) Tai
Hi Geoffrey,

Thanks for investigating and updating on this. Good to know that it is working!

Just to clarify, was your series of jobs submitted to a “yarn session + regular 
bin/flink run”, or “per job yarn cluster”?
I’m asking just to make sure of the limitations Robert mentioned.

Cheers,
Gordon


On February 17, 2017 at 3:37:27 AM, Geoffrey Mon (geof...@gmail.com) wrote:

Hi Robert,

Thanks for your reply. I've done some further testing and (hopefully) solved 
the issue; this turned out to be a red herring.  After discovering that the 
same issue manifested itself when testing on my local machine, I found that 
multiple jobs can be submitted from a main() function for both temporary and 
permanent Flink YARN clusters, and that the issue was not with Flink or with 
YARN, but with my job file.

In one part of my job, I need to fill in missing components of a vector with 
zeroes. I did this by combining the vector DataSet with another DataSet 
containing indexed zeroes using a union operation and an aggregation operation. 
In my problematic job, I used ExecutionEnvironment#fromElements to make a 
DataSet out of an ArrayList of Tuples containing an index and a zero. However, 
for input files with very large parameters, I needed to generate very large 
length DataSets of zeroes, and since I was using fromElements, the client 
needed to send the Flink runtime all of the elements with which to create the 
DataSet (lots and lots of zeroes). This caused the job to time out before 
execution, making me think that the job had not been properly received by the 
runtime.

I've replaced this with ExecutionEnvironment#generateSequence and a map 
function mapping each number of the generated sequence to a tuple with a zero. 
This has solved the issue and my job seems to be running fine for now.
(https://github.com/quinngroup/flink-r1dl/blame/9bb651597405fcea1fc4c7ba3e4229ca45305176/src/main/java/com/github/quinngroup/R1DL.java#L370)

Again, thank you very much for your help.

Sincerely,
Geoffrey

On Wed, Feb 15, 2017 at 7:43 AM Robert Metzger  wrote:
Hi Geoffrey,

I think the "per job yarn cluster" feature does probably not work for one 
main() function submitting multiple jobs.
If you have a yarn session + regular "flink run" it should work. 

On Mon, Feb 13, 2017 at 5:37 PM, Geoffrey Mon  wrote:
Just to clarify, is Flink designed to allow submitting multiple jobs from a 
single program class when using a YARN cluster? I wasn't sure based on the 
documentation.

Cheers,
Geoffrey


On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon  wrote:
Hello all,

I'm running a Flink plan made up of multiple jobs. The source for my job can be 
found here if it would help in any way: 
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
Each of the jobs (except for the first job) depends on files generated by the 
previous job; I'm running it on an AWS EMR cluster using YARN.

When I submit the plan file, the first job runs as planned. After it completes, 
the second job is submitted by the YARN client:


02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED 
02/09/2017 16:39:43 Job execution switched to status FINISHED.
2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient           
            - Waiting until all TaskManagers have connected
Waiting until all TaskManagers have connected
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient           
            - TaskManager status (5/5)
TaskManager status (5/5)
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient           
            - All TaskManagers are connected
All TaskManagers are connected
2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient           
            - Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. 
Waiting for job completion.
Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job 
completion.
Connected to JobManager at 
Actor[akka.tcp://flink@.ec2.internal:35598/user/jobmanager#68430682]

If the input file is small and the first job runs quickly (~1 minute works for 
me), then the second job runs fine. However, if the input file for my first job 
is large and the first job takes more than a minute or so to complete, Flink 
will not acknowledge receiving the next job; the web Flink console does not 
show any new jobs and Flink logs do not mention receiving any new jobs after 
the first job has completed. The YARN client's job submission times out after 
Flink does not respond:

Caused by: 
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
submission to the JobManager timed out. You may increase 'akka.client.timeout' 
in case the JobManager needs more time to configure and confirm the job 
submission.
at 
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at 

Re: Flink batch processing fault tolerance

2017-02-16 Thread Si-li Liu
Hi,

It's the reason why I gave up use Flink for my current project and pick up
traditional Hadoop Framework again.

2017-02-17 10:56 GMT+08:00 Renjie Liu :

> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 1+%3A+Fine+Grained+Recovery+from+Task+Failures
> This FLIP may help.
>
> On Thu, Feb 16, 2017 at 7:34 PM Anton Solovev 
> wrote:
>
>> Hi Aljoscha,
>>
>> Could you share your plans of resolving it?
>>
>>
>>
>> Best,
>>
>> Anton
>>
>>
>>
>>
>>
>> *From:* Aljoscha Krettek [mailto:aljos...@apache.org]
>> *Sent:* Thursday, February 16, 2017 2:48 PM
>> *To:* user@flink.apache.org
>> *Subject:* Re: Flink batch processing fault tolerance
>>
>>
>>
>> Hi,
>>
>> yes, this is indeed true. We had some plans for how to resolve this but
>> they never materialised because of the focus on Stream Processing. We might
>> unite the two in the future and then you will get fault-tolerant
>> batch/stream processing in the same API.
>>
>>
>>
>> Best,
>>
>> Aljoscha
>>
>>
>>
>> On Wed, 15 Feb 2017 at 09:28 Renjie Liu  wrote:
>>
>> Hi, all:
>> I'm learning flink's doc and curious about the fault tolerance of batch
>> process jobs. It seems that when one of task execution fails, the whole job
>> will be restarted, is it true? If so, isn't it impractical to deploy large
>> flink batch jobs?
>>
>> --
>>
>> Liu, Renjie
>>
>> Software Engineer, MVAD
>>
>> --
> Liu, Renjie
> Software Engineer, MVAD
>



-- 
Best regards

Sili Liu


Re: Flink batch processing fault tolerance

2017-02-16 Thread Renjie Liu
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures
This FLIP may help.

On Thu, Feb 16, 2017 at 7:34 PM Anton Solovev 
wrote:

> Hi Aljoscha,
>
> Could you share your plans of resolving it?
>
>
>
> Best,
>
> Anton
>
>
>
>
>
> *From:* Aljoscha Krettek [mailto:aljos...@apache.org]
> *Sent:* Thursday, February 16, 2017 2:48 PM
> *To:* user@flink.apache.org
> *Subject:* Re: Flink batch processing fault tolerance
>
>
>
> Hi,
>
> yes, this is indeed true. We had some plans for how to resolve this but
> they never materialised because of the focus on Stream Processing. We might
> unite the two in the future and then you will get fault-tolerant
> batch/stream processing in the same API.
>
>
>
> Best,
>
> Aljoscha
>
>
>
> On Wed, 15 Feb 2017 at 09:28 Renjie Liu  wrote:
>
> Hi, all:
> I'm learning flink's doc and curious about the fault tolerance of batch
> process jobs. It seems that when one of task execution fails, the whole job
> will be restarted, is it true? If so, isn't it impractical to deploy large
> flink batch jobs?
>
> --
>
> Liu, Renjie
>
> Software Engineer, MVAD
>
> --
Liu, Renjie
Software Engineer, MVAD


Re: Reliable Distributed FS support (HCFS)

2017-02-16 Thread Vijay Srinivasaraghavan
Following up on my question regarding backed Filesystem (HCFS) requirements. 
Appreciate any inputs.
---Regarding the Filesystem abstraction support, we are planning to use a 
distributed file system which complies with Hadoop Compatible File System 
(HCFS) standard in place of standard HDFS.
According to the documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html),
 persistence gurantees is listed as one of the main requirement and to be 
precises it qualifies both visibility and durability gurantees.
My question is,
1) Are we expecting the file system to support "Atomic Rename" characteristics? 
I believe checkpoint mechanism involves in renaming the files and will that 
have an impact if "atomic rename" is not guranteed by the underlying file 
system?
2) How does one certify Flink with HCFS (in place of standard HDFS) in terms of 
the scenarios/usecase that needs to be tested? Is there any general guidance on 
this?---
RegardsVijay 

On Wednesday, February 15, 2017 11:28 AM, Vijay Srinivasaraghavan 
 wrote:
 

 Hello,
Regarding the Filesystem abstraction support, we are planning to use a 
distributed file system which complies with Hadoop Compatible File System 
(HCFS) standard in place of standard HDFS.
According to the documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/filesystems.html),
 persistence gurantees is listed as one of the main requirement and to be 
precises it qualifies both visibility and durability gurantees.
My question is,
1) Are we expecting the file system to support "Atomic Rename" characteristics? 
I believe checkpoint mechanism involves in renaming the files and will that 
have an impact if "atomic rename" is not guranteed by the underlying file 
system?
2) How does one certify Flink with HCFS (in place of standard HDFS) in terms of 
the scenarios/usecase that needs to be tested? Is there any general guidance on 
this?
ThanksVijay

   

Re: "Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

2017-02-16 Thread Geoffrey Mon
Hi Robert,

Thanks for your reply. I've done some further testing and (hopefully)
solved the issue; this turned out to be a red herring.  After discovering
that the same issue manifested itself when testing on my local machine, I
found that multiple jobs can be submitted from a main() function for both
temporary and permanent Flink YARN clusters, and that the issue was not
with Flink or with YARN, but with my job file.

In one part of my job, I need to fill in missing components of a vector
with zeroes. I did this by combining the vector DataSet with another
DataSet containing indexed zeroes using a union operation and an
aggregation operation. In my problematic job, I used
ExecutionEnvironment#fromElements to make a DataSet out of an ArrayList of
Tuples containing an index and a zero. However, for input files with very
large parameters, I needed to generate very large length DataSets of
zeroes, and since I was using fromElements, the client needed to send the
Flink runtime all of the elements with which to create the DataSet (lots
and lots of zeroes). This caused the job to time out before execution,
making me think that the job had not been properly received by the runtime.

I've replaced this with ExecutionEnvironment#generateSequence and a map
function mapping each number of the generated sequence to a tuple with a
zero. This has solved the issue and my job seems to be running fine for now.
(
https://github.com/quinngroup/flink-r1dl/blame/9bb651597405fcea1fc4c7ba3e4229ca45305176/src/main/java/com/github/quinngroup/R1DL.java#L370
)

Again, thank you very much for your help.

Sincerely,
Geoffrey

On Wed, Feb 15, 2017 at 7:43 AM Robert Metzger  wrote:

Hi Geoffrey,

I think the "per job yarn cluster" feature does probably not work for one
main() function submitting multiple jobs.
If you have a yarn session + regular "flink run" it should work.

On Mon, Feb 13, 2017 at 5:37 PM, Geoffrey Mon  wrote:

Just to clarify, is Flink designed to allow submitting multiple jobs from a
single program class when using a YARN cluster? I wasn't sure based on the
documentation.

Cheers,
Geoffrey


On Thu, Feb 9, 2017 at 6:34 PM Geoffrey Mon  wrote:

Hello all,

I'm running a Flink plan made up of multiple jobs. The source for my job
can be found here if it would help in any way:
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
Each of the jobs (except for the first job) depends on files generated by
the previous job; I'm running it on an AWS EMR cluster using YARN.

When I submit the plan file, the first job runs as planned. After it
completes, the second job is submitted by the YARN client:


02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
02/09/2017 16:39:43 Job execution switched to status FINISHED.
2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient
- Waiting until all TaskManagers have connected
Waiting until all TaskManagers have connected
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
- TaskManager status (5/5)
TaskManager status (5/5)
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
- All TaskManagers are connected
All TaskManagers are connected
2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient
- Submitting job with JobID:
b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for
job completion.
Connected to JobManager at Actor[akka.tcp://flink@
.ec2.internal:35598/user/jobmanager#68430682]

If the input file is small and the first job runs quickly (~1 minute works
for me), then the second job runs fine. However, if the input file for my
first job is large and the first job takes more than a minute or so to
complete, Flink will not acknowledge receiving the next job; the web Flink
console does not show any new jobs and Flink logs do not mention receiving
any new jobs after the first job has completed. The YARN client's job
submission times out after Flink does not respond:

Caused by:
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
Job submission to the JobManager timed out. You may increase
'akka.client.timeout' in case the JobManager needs more time to configure
and confirm the job submission.
at
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)

I have tried increasing akka.client.timeout to large values such as 1200s
(20 

Re: Akka 2.4

2017-02-16 Thread Ted Yu
Please see FLINK-3662

On Thu, Feb 16, 2017 at 9:01 AM, Dmitry Golubets 
wrote:

> Hi,
>
> Can I force Flink to use Akka 2.4 (recompile if needed)?
> Is it going to misbehave in a subtle way?
>
>
> Best regards,
> Dmitry
>


Akka 2.4

2017-02-16 Thread Dmitry Golubets
Hi,

Can I force Flink to use Akka 2.4 (recompile if needed)?
Is it going to misbehave in a subtle way?


Best regards,
Dmitry


Re: Reading compressed XML data

2017-02-16 Thread Sebastian Neef
Hi Robert,

sorry for the long delay.

> I wonder why the decompression with the XmlInputFormat doesn't work. Did
> you get any exception?

I didn't get any exception, it just seems to read nothing (or at least
don't match any opening/closing tags).

I digged a bit into the code and found out, that Mahout's XmlInputFormat
[0] extends the TextInputFormat [1].  TextInputFormat then uses the
LineRecordReader [2] which handles the compressed data.

However, the Mahout XMLRecordReader [3] does not contain the compression
handling. So I tried to build a XmlRecordReader which tries to achieve
that [4]. I use it to split the wikipedia dumps into pages with 
and  tags. [5]

It does work, but somehow misses some data sometimes and I guess this is
because of the different splits. How do FileSplits work? Can a process
read beyond the FileSplit boundary or not?

I'm also a bit confused why the Flink Doc says that Bzip2 is not
splittable? [6]
Afaik hadoop (and flink in compatibility mode) does support splittable,
compressed data.

I would appreciate some input/ideas/help with this.

All the best,
Sebastian


[0]:
https://github.com/apache/mahout/blob/master/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java

[1]:
https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java

[2]:
https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java

[3]:
https://github.com/apache/mahout/blob/master/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java#L64

[4]:
http://paste.gehaxelt.in/?69af3c91480b6bfb#ze+G/X9b3yTHfu1QW70aJioDvXWKoFFOCnLND1ow0sU=

[5]:
http://paste.gehaxelt.in/?e869d1f1f9f6f1be#kXrNaWXTNqLiHEKL4a6rWVMxhbVcmpXu24jGqJcap1A=

[6]:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#read-compressed-files


Re: Streaming data from MongoDB using Flink

2017-02-16 Thread Tzu-Li (Gordon) Tai
Good to know!


On February 16, 2017 at 10:13:28 PM, Pedro Monteiro 
(pedro.mlmonte...@gmail.com) wrote:

Dear Gordon,

Thanks for your help, I think I am on the right track as of now.

On the other hand, I have another question: is it possible to add sources to 
environments that are already executing? In what I am currently developing, I 
need to add new sources as they arrive to my system.

I will wait to hear from you!

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 11:29, Pedro Monteiro  wrote:
Thank you again for your prompt response.

I will give it a try and will come back to you.

Pedro Lima Monteiro

On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai  wrote:
I would recommend checking out the Flink RabbitMQ Source for examples:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java

For your case, you should extend the `RichSourceFunction` which provides 
additional access to override the `open()` life cycle method.
In that method, you instantiate your MongoDB client connection and  fetch the 
cursor. In the `run()` method, you should essentially have a while loop that 
polls the MongoDB cursor and emits the fetched documents using the 
`SourceContext`.

If your also looking to implement a MongoDB source that works with Flink’s 
checkpointing for exactly-once, be sure to check out:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#stateful-source-functions

Cheers,
Gordon
On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) 
wrote:

Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my 
StreamExecutionEnvironment. When I go ahead and attempt to do:
​env.addSource();  
​It requests an implementation of a Source Function interface: ​
env.addSource(new SourceFunction() {
            ​​ @Override
            public void run(SourceFunction.SourceContext ctx) throws 
Exception {
                 ​// TO DO​
            }

            @Override
            public void cancel() {
                ​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access 
my MongoDB's cursor in any of this methods (I suppose the most adequate would 
be the "run" method) in a way it would allow me to return a new MongoDB 
document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai  wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that 
uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) 
wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro





Re: Flink jdbc

2017-02-16 Thread Punit Tandel
Thanks for the info, At the moment i used the flink-jdbc to write the 
streaming data coming from kafka which i can process and write those 
data in postgres or mysql database configured on cluster or sandbox, 
However when trying to write integration tests i am using in memory H2 
database which some what acting strange as i can not see any error being 
thrown by write record method but at the same time nothing is written in 
database. So kinda a little hard to figure whats going wrong here.


Thanks


On 02/16/2017 02:02 PM, Fabian Hueske wrote:

The JdbcOutputFormat was originally meant for batch jobs.
It should be possible to use it for streaming jobs as well, however, 
you should be aware that it is not integrated with Flink checkpointing 
mechanism.

So, you might have duplicate data in case of failures.

I also don't know if or how well it works with H2.

Best, Fabian

2017-02-16 11:06 GMT+01:00 Punit Tandel >:


Yes  i have been following the tutorials and reading from H2 and
writing to H2 works fine, But problem here is data coming from
kafka and writing them to h2 engine does not seems to work and
cant see any error thrown while writing into in memory H2
database, So couldnt say whats the error and why those data are
not inserted.

Have been trying to find out cause and looking for logs while
flink processes the operations but couldnt find any error being
thrown at the time of writing data. Any where i can check for logs ?

Thanks


On 02/16/2017 01:10 AM, Ted Yu wrote:

See the tutorial at the beginning of:


flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java

Looks like plugging in "org.h2.Driver" should do.

On Wed, Feb 15, 2017 at 4:59 PM, Punit Tandel
> wrote:

Hi All

Does flink jdbc support writing the data into H2 Database?

Thanks
Punit









Re: Streaming data from MongoDB using Flink

2017-02-16 Thread Pedro Monteiro
Dear Gordon,

Thanks for your help, I think I am on the right track as of now.

On the other hand, I have another question: is it possible to add sources
to environments that are already executing? In what I am currently
developing, I need to add new sources as they arrive to my system.

I will wait to hear from you!

Cumprimentos,

*Pedro Lima Monteiro*

On 16 February 2017 at 11:29, Pedro Monteiro 
wrote:

> Thank you again for your prompt response.
>
> I will give it a try and will come back to you.
>
> *Pedro Lima Monteiro*
>
> On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai 
> wrote:
>
>> I would recommend checking out the Flink RabbitMQ Source for examples:
>> https://github.com/apache/flink/blob/master/flink-connectors
>> /flink-connector-rabbitmq/src/main/java/org/apache/flink/
>> streaming/connectors/rabbitmq/RMQSource.java
>>
>> For your case, you should extend the `RichSourceFunction` which provides
>> additional access to override the `open()` life cycle method.
>> In that method, you instantiate your MongoDB client connection and  fetch
>> the cursor. In the `run()` method, you should essentially have a while loop
>> that polls the MongoDB cursor and emits the fetched documents using the
>> `SourceContext`.
>>
>> If your also looking to implement a MongoDB source that works with
>> Flink’s checkpointing for exactly-once, be sure to check out:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> dev/stream/state.html#stateful-source-functions
>>
>> Cheers,
>> Gordon
>>
>> On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (
>> pedro.mlmonte...@gmail.com) wrote:
>>
>> Dear Tzu-Li,
>>
>> Thank you so much for your prompt response.
>>
>> Lets assume I have a variable, in Java, env which is my
>> StreamExecutionEnvironment. When I go ahead and attempt to do:
>>
>>> ​env.addSource();
>>>
>>
>> ​It requests an implementation of a Source Function interface:
>> ​
>>
>>> env.addSource(new SourceFunction() {
>>
>>
>>> ​​
>>> @Override
>>
>> public void run(SourceFunction.SourceContext ctx)
>>> throws Exception {
>>
>>
>>> ​// TO DO​
>>>
>> }
>>
>>
>>> @Override
>>
>> public void cancel() {
>>
>>
>>> ​// TO DO​
>>>
>> }
>>
>> });
>>
>> ​And this is where I'm somehow stuck. I do not understand how should I
>> access my MongoDB's cursor in any of this methods (I suppose the most
>> adequate would be the "run" method) in a way it would allow me to return a
>> new MongoDB document as it arrived to the database from another source.
>>
>> Once again, thank you so much for your help.
>>
>> I will wait to hear from you!​
>>
>> Cumprimentos,
>>
>> *Pedro Lima Monteiro*
>>
>> On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi Pedro!
>>>
>>> This is definitely possible, by simply writing a Flink `SourceFunction`
>>> that uses MongoDB clients to fetch the data.
>>> It should be straightforward and works well with MongoDB’s cursor APIs.
>>>
>>> Could you explain a bit which part in particular you were stuck with?
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (
>>> pedro.mlmonte...@gmail.com) wrote:
>>>
>>> Good morning,
>>>
>>> I am trying to get data from MongoDB to be analysed in Flink.
>>> I would like to know if it is possible to stream data from MongoDB into
>>> Flink. I have looked into Flink's source function to add in the addSource
>>> method of the StreamExecutionEnvironment but I had no luck.
>>> Can anyone help me out?
>>> Thanks.
>>>
>>> *Pedro Lima Monteiro*
>>>
>>>
>>
>


Re: Flink jdbc

2017-02-16 Thread Fabian Hueske
The JdbcOutputFormat was originally meant for batch jobs.
It should be possible to use it for streaming jobs as well, however, you
should be aware that it is not integrated with Flink checkpointing
mechanism.
So, you might have duplicate data in case of failures.

I also don't know if or how well it works with H2.

Best, Fabian

2017-02-16 11:06 GMT+01:00 Punit Tandel :

> Yes  i have been following the tutorials and reading from H2 and writing
> to H2 works fine, But problem here is data coming from kafka and writing
> them to h2 engine does not seems to work and cant see any error thrown
> while writing into in memory H2 database, So couldnt say whats the error
> and why those data are not inserted.
>
> Have been trying to find out cause and looking for logs while flink
> processes the operations but couldnt find any error being thrown at the
> time of writing data. Any where i can check for logs ?
>
> Thanks
>
> On 02/16/2017 01:10 AM, Ted Yu wrote:
>
> See the tutorial at the beginning of:
>
> flink-connectors/flink-jdbc/src/main/java/org/apache/
> flink/api/java/io/jdbc/JDBCInputFormat.java
>
> Looks like plugging in "org.h2.Driver" should do.
>
> On Wed, Feb 15, 2017 at 4:59 PM, Punit Tandel 
> wrote:
>
>> Hi All
>>
>> Does flink jdbc support writing the data into H2 Database?
>>
>> Thanks
>> Punit
>>
>>
>
>


Re: Resource under-utilization when using RocksDb state backend [SOLVED]

2017-02-16 Thread vinay patil
Hi Cliff,

It will be really helpful if you could share your RocksDB configuration.

I am also running on c3.4xlarge EC2 instances backed by SSD's .

I had tried with FLASH_SSD_OPTIMIZED option which works great but somehow
the pipeline stops in between and the overall processing time increases,

I tried to set different values as mentioned in this video, but somehow I
am not getting it right, the TM's is getting killed after sometime.


Regards,
Vinay Patil

On Thu, Dec 8, 2016 at 10:19 PM, Cliff Resnick [via Apache Flink User
Mailing List archive.]  wrote:

> It turns out that most of the time in RocksDBFoldingState was spent on
> serialization/deserializaton. RocksDb read/write was performing well. By
> moving from Kryo to custom serialization we were able to increase
> throughput dramatically. Load is now where it should be.
>
> On Mon, Dec 5, 2016 at 1:15 PM, Robert Metzger <[hidden email]
> > wrote:
>
>> Another Flink user using RocksDB with large state on SSDs recently posted
>> this video for oprimizing the performance of Rocks on SSDs:
>> https://www.youtube.com/watch?v=pvUqbIeoPzM
>> That could be relevant for you.
>>
>> For how long did you look at iotop. It could be that the IO access
>> happens in bursts, depending on how data is cached.
>>
>> I'll also add Stefan Richter to the conversation, he has maybe some more
>> ideas what we can do here.
>>
>>
>> On Mon, Dec 5, 2016 at 6:19 PM, Cliff Resnick <[hidden email]
>> > wrote:
>>
>>> Hi Robert,
>>>
>>> We're following 1.2-SNAPSHOT,  using event time. I have tried "iotop"
>>> and I see usually less than 1 % IO. The most I've seen was a quick flash
>>> here or there of something substantial (e.g. 19%, 52%) then back to
>>> nothing. I also assumed we were disk-bound, but to use your metaphor I'm
>>> having trouble finding any smoke. However, I'm not very experienced in
>>> sussing out IO issues so perhaps there is something else I'm missing.
>>>
>>> I'll keep investigating. If I continue to come up empty then I guess my
>>> next steps may be to stage some independent tests directly against RocksDb.
>>>
>>> -Cliff
>>>
>>>
>>> On Mon, Dec 5, 2016 at 5:52 AM, Robert Metzger <[hidden email]
>>> > wrote:
>>>
 Hi Cliff,

 which Flink version are you using?
 Are you using Eventtime or processing time windows?

 I suspect that your disks are "burning" (= your job is IO bound). Can
 you check with a tool like "iotop" how much disk IO Flink is producing?
 Then, I would set this number in relation with the theoretical maximum
 of your SSD's (a good rough estimate is to use dd for that).

 If you find that your disk bandwidth is saturated by Flink, you could
 look into tuning the RocksDB settings so that it uses more memory for
 caching.

 Regards,
 Robert


 On Fri, Dec 2, 2016 at 11:34 PM, Cliff Resnick <[hidden email]
 > wrote:

> In tests comparing RocksDb to fs state backend we observe much lower
> throughput, around 10x slower. While the lowered throughput is expected,
> what's perplexing is that machine load is also very low with RocksDb,
> typically falling to  < 25% CPU and negligible IO wait (around 0.1%). Our
> test instances are EC2 c3.xlarge which are 4 virtual CPUs and 7.5G RAM,
> each running a single TaskManager in YARN, with 6.5G allocated memory per
> TaskManager. The instances also have 2x40G attached SSDs which we have
> mapped to `taskmanager.tmp.dir`.
>
> With FS state and 4 slots per TM, we will easily max out with an
> average load average around 5 or 6, so we actually need throttle down the
> slots to 3. With RocksDb using the Flink SSD configured options we see a
> load average at around 1. Also, load (and actual) throughput remain more 
> or
> less constant no matter how many slots we use. The weak load is spread 
> over
> all CPUs.
>
> Here is a sample top:
>
> Cpu0  : 20.5%us,  0.0%sy,  0.0%ni, 79.5%id,  0.0%wa,  0.0%hi,  0.0%si,
>  0.0%st
> Cpu1  : 18.5%us,  0.0%sy,  0.0%ni, 81.5%id,  0.0%wa,  0.0%hi,  0.0%si,
>  0.0%st
> Cpu2  : 11.6%us,  0.7%sy,  0.0%ni, 87.0%id,  0.7%wa,  0.0%hi,  0.0%si,
>  0.0%st
> Cpu3  : 12.5%us,  0.3%sy,  0.0%ni, 86.8%id,  0.0%wa,  0.0%hi,  0.3%si,
>  0.0%st
>
> Our pipeline uses tumbling windows, each with a ValueState keyed to a
> 3-tuple of one string and two ints.. Each ValueState comprises a small set
> of tuples around 5-7 fields each. The WindowFunction simply diffs agains
> the set and updates state if there is a diff.
>
> Any ideas as to what the bottleneck is here? Any suggestions welcomed!
>
> -Cliff
>
>
>
>
>
>
>

Re: Clarification: use of AllWindowedStream.apply() function

2017-02-16 Thread nsengupta
Thanks, Aljoscha for the clarification.

I understand that instead of using a flatMap() in the way I am using, I am
better off using :
* a fold (init, fold_func, window_func) first and then
* map to a different type of my choice, inside the window_func,
parameterised above

I hope I am correct. If so, you don't need to spend time to comment;
☺otherwise, please give a hint.

-- Nirmalya

-

On Thu, Feb 16, 2017 at 4:12 PM, Aljoscha Krettek [via Apache Flink User
Mailing List archive.]  wrote:

> Hi,
> you would indeed use apply(), or better fold(,
> , ) to map the result of folding your
> window to some other data type. If you will, a WindowFunction allows
> "mapping" the result of your windowing to a different type.
>
> Best,
> Aljoscha
>
> On Wed, 15 Feb 2017 at 06:14 nsengupta <[hidden email]
> > wrote:
>
>> I have gone through this  post
>> > n4.nabble.com/WindowedStream-operation-questions-td6006.html>
>> , where Aljoscha explains that /mapping/ on WindowedStream is /not/
>> allowed.
>>
>> So, I think I haven't asked the question properly. Here is (hopefully) a
>> better and easier version:
>>
>> 1.I begin with records of type RawMITSIMTuple.
>> 2.When I group them using a Window, I get an
>> AllWindowedStream[RawMITSIMTuple].
>> 3.I /fold/ the tuples obtained in the Window, which gives me a
>> DataStream[Vector[RawMITSIMTuple].
>> 4.What I need is a DataStream[PositionReport]. So, I need to flatMap
>> the
>> output of previous step, where I first get hold of each of the
>> RawMITSIMTuple and map that to PositionReport.
>>
>> val positionReportStream = this
>>   .readRawMITSIMTuplesInjected(envDefault,args(0))
>>   .assignAscendingTimestamps(e => e.timeOfReport)
>>   .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
>>   .fold(Vector[RawMITSIMTuple]())((collectorBin,rawRecord) => {
>>   collectorBin :+ rawRecord)
>> })
>>   .flatMap(r => r.map(e => this.preparePositionReport(e)))
>>
>> This gives me what I want, but I feel this is verbose and inefficient. Am
>> I
>> thinking correctly? If so, what is a better idiom to use in such cases?
>>
>> -- Nirmalya
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-
>> mailing-list-archive.2336050.n4.nabble.com/Clarification-
>> use-of-AllWindowedStream-apply-function-tp11627p11630.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Clarification-use-of-AllWindowedStream-
> apply-function-tp11627p11665.html
> To unsubscribe from Clarification: use of AllWindowedStream.apply()
> function, click here
> 
> .
> NAML
> 
>



-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627p11677.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Cartesian product over windows

2017-02-16 Thread Ioannis Kontopoulos
Hello everyone,

Given a stream of events (each event has a timestamp and a key), I want to
create all possible combinations of the keys in a window (sliding, event
time) and then process those combinations in parallel.

For example, if the stream contains events with keys 1,2,3,4 in a given
window and the possible combinations are:

1-2
1-3
1-4
2-3
2-4
3-4

and if the parallelism is set to 2, I want to have events with these keys:

1-22-3
1-32-4
1-43-4

You can see that there is some replication. So when I use the apply method
on a window it will have the keys separated like the example above.

Is there a way to do that?


Re: Checkpointing with RocksDB as statebackend

2017-02-16 Thread vinay patil
I think its more of related to RocksDB, I am also not aware about RocksDB
but reading the tuning guide to understand the important values that can be
set

Regards,
Vinay Patil

On Thu, Feb 16, 2017 at 5:48 PM, Stefan Richter [via Apache Flink User
Mailing List archive.]  wrote:

> What kind of problem are we talking about? S3 related or RocksDB related.
> I am not aware of problems with RocksDB per se. I think seeing logs for
> this would be very helpful.
>
> Am 16.02.2017 um 11:56 schrieb Aljoscha Krettek <[hidden email]
> >:
>
> [hidden email]  and 
> [hidden
> email]  could this
> be the same problem that you recently saw when working with other people?
>
> On Wed, 15 Feb 2017 at 17:23 Vinay Patil <[hidden email]
> > wrote:
>
>> Hi Guys,
>>
>> Can anyone please help me with this issue
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Feb 15, 2017 at 6:17 PM, Vinay Patil <[hidden email]
>> > wrote:
>>
>> Hi Ted,
>>
>> I have 3 boxes in my pipeline , 1st and 2nd box containing source and s3
>> sink and the 3rd box is window operator followed by chained operators and a
>> s3 sink
>>
>> So in the details link section I can see that that S3 sink is taking time
>> for the acknowledgement and it is not even going to the window operator
>> chain.
>>
>> But as shown in the snapshot ,checkpoint id 19 did not get any
>> acknowledgement. Not sure what is causing the issue
>>
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Feb 15, 2017 at 5:51 PM, Ted Yu [via Apache Flink User Mailing
>> List archive.] <[hidden email]
>> > wrote:
>>
>> What did the More Details link say ?
>>
>> Thanks
>>
>> > On Feb 15, 2017, at 3:11 AM, vinay patil <[hidden email]
>> > wrote:
>> >
>> > Hi,
>> >
>> > I have kept the checkpointing interval to 6secs and minimum pause
>> between
>> > checkpoints to 5secs, while testing the pipeline I have observed that
>> that
>> > for some checkpoints it is taking long time , as you can see in the
>> attached
>> > snapshot checkpoint id 19 took the maximum time before it gets failed,
>> > although it has not received any acknowledgements, now during this
>> 10minutes
>> > the entire pipeline did not make any progress and no data was getting
>> > processed. (For Ex : In 13minutes 20M records were processed and when
>> the
>> > checkpoint took time there was no progress for the next 10minutes)
>> >
>> > I have even tried to set max checkpoint timeout to 3min, but in that
>> case as
>> > well multiple checkpoints were getting failed.
>> >
>> > I have set RocksDB FLASH_SSD_OPTION
>> > What could be the issue ?
>> >
>> > P.S. I am writing to 3 S3 sinks
>> >
>> > checkpointing_issue.PNG
>> > > n4.nabble.com/file/n11640/checkpointing_issue.PNG>
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-flink-user-
>> mailing-list-archive.2336050.n4.nabble.com/Checkpointing-
>> with-RocksDB-as-statebackend-tp11640.html
>> > Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/Checkpointing-with-RocksDB-as-statebackend-
>> tp11640p11641.html
>>
>> To start a new topic under Apache Flink User Mailing List archive., email 
>> [hidden
>> email] 
>>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> 
>>
>>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Checkpointing-with-RocksDB-as-statebackend-
> tp11640p11673.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 

Re: Checkpointing with RocksDB as statebackend

2017-02-16 Thread Stefan Richter
What kind of problem are we talking about? S3 related or RocksDB related. I am 
not aware of problems with RocksDB per se. I think seeing logs for this would 
be very helpful.

> Am 16.02.2017 um 11:56 schrieb Aljoscha Krettek :
> 
> +Stefan Richter  and +Stephan Ewen 
>  could this be the same problem that you recently 
> saw when working with other people?
> 
> On Wed, 15 Feb 2017 at 17:23 Vinay Patil  > wrote:
> Hi Guys,
> 
> Can anyone please help me with this issue
> 
> Regards,
> Vinay Patil
> 
> On Wed, Feb 15, 2017 at 6:17 PM, Vinay Patil  > wrote:
> Hi Ted,
> 
> I have 3 boxes in my pipeline , 1st and 2nd box containing source and s3 sink 
> and the 3rd box is window operator followed by chained operators and a s3 sink
> 
> So in the details link section I can see that that S3 sink is taking time for 
> the acknowledgement and it is not even going to the window operator chain.
> 
> But as shown in the snapshot ,checkpoint id 19 did not get any 
> acknowledgement. Not sure what is causing the issue
> 
> Regards,
> Vinay Patil
> 
> On Wed, Feb 15, 2017 at 5:51 PM, Ted Yu [via Apache Flink User Mailing List 
> archive.]  > wrote:
> What did the More Details link say ? 
> 
> Thanks 
> 
> > On Feb 15, 2017, at 3:11 AM, vinay patil <[hidden email] 
> > > wrote: 
> > 
> > Hi, 
> > 
> > I have kept the checkpointing interval to 6secs and minimum pause between 
> > checkpoints to 5secs, while testing the pipeline I have observed that that 
> > for some checkpoints it is taking long time , as you can see in the 
> > attached 
> > snapshot checkpoint id 19 took the maximum time before it gets failed, 
> > although it has not received any acknowledgements, now during this 
> > 10minutes 
> > the entire pipeline did not make any progress and no data was getting 
> > processed. (For Ex : In 13minutes 20M records were processed and when the 
> > checkpoint took time there was no progress for the next 10minutes) 
> > 
> > I have even tried to set max checkpoint timeout to 3min, but in that case 
> > as 
> > well multiple checkpoints were getting failed. 
> > 
> > I have set RocksDB FLASH_SSD_OPTION 
> > What could be the issue ? 
> > 
> > P.S. I am writing to 3 S3 sinks 
> > 
> > checkpointing_issue.PNG 
> >  >  
> > >
> >
> > 
> > 
> > 
> > -- 
> > View this message in context: 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpointing-with-RocksDB-as-statebackend-tp11640.html
> >  
> > 
> > Sent from the Apache Flink User Mailing List archive. mailing list archive 
> > at Nabble.com. 
> 
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpointing-with-RocksDB-as-statebackend-tp11640p11641.html
>  
> 
> To start a new topic under Apache Flink User Mailing List archive., email 
> ml-node+s2336050n1...@n4.nabble.com 
>  
> To unsubscribe from Apache Flink User Mailing List archive., click here 
> .
> NAML 
> 


Re: Checkpointing with RocksDB as statebackend

2017-02-16 Thread vinay patil
Hi Aljoscha,

Which problem you are referring to ?

I am seeing unexpected stalls in between for a long time.

Also one thing I have observed with FLASH_SSD_OPTIMIZED option is that it
is using more amount of physical memory and not flushing the data to
storage.

I am trying to figure out the best possible rocksDB values for my
configuration, I am currently running the job on c3.4xlarge EC2 instances

Regards,
Vinay Patil

On Thu, Feb 16, 2017 at 4:22 PM, Aljoscha Krettek [via Apache Flink User
Mailing List archive.]  wrote:

> [hidden email]  and 
> [hidden
> email]  could this
> be the same problem that you recently saw when working with other people?
>
> On Wed, 15 Feb 2017 at 17:23 Vinay Patil <[hidden email]
> > wrote:
>
>> Hi Guys,
>>
>> Can anyone please help me with this issue
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Feb 15, 2017 at 6:17 PM, Vinay Patil <[hidden email]
>> > wrote:
>>
>> Hi Ted,
>>
>> I have 3 boxes in my pipeline , 1st and 2nd box containing source and s3
>> sink and the 3rd box is window operator followed by chained operators and a
>> s3 sink
>>
>> So in the details link section I can see that that S3 sink is taking time
>> for the acknowledgement and it is not even going to the window operator
>> chain.
>>
>> But as shown in the snapshot ,checkpoint id 19 did not get any
>> acknowledgement. Not sure what is causing the issue
>>
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Feb 15, 2017 at 5:51 PM, Ted Yu [via Apache Flink User Mailing
>> List archive.] <[hidden email]
>> > wrote:
>>
>> What did the More Details link say ?
>>
>> Thanks
>>
>> > On Feb 15, 2017, at 3:11 AM, vinay patil <[hidden email]
>> > wrote:
>> >
>> > Hi,
>> >
>> > I have kept the checkpointing interval to 6secs and minimum pause
>> between
>> > checkpoints to 5secs, while testing the pipeline I have observed that
>> that
>> > for some checkpoints it is taking long time , as you can see in the
>> attached
>> > snapshot checkpoint id 19 took the maximum time before it gets failed,
>> > although it has not received any acknowledgements, now during this
>> 10minutes
>> > the entire pipeline did not make any progress and no data was getting
>> > processed. (For Ex : In 13minutes 20M records were processed and when
>> the
>> > checkpoint took time there was no progress for the next 10minutes)
>> >
>> > I have even tried to set max checkpoint timeout to 3min, but in that
>> case as
>> > well multiple checkpoints were getting failed.
>> >
>> > I have set RocksDB FLASH_SSD_OPTION
>> > What could be the issue ?
>> >
>> > P.S. I am writing to 3 S3 sinks
>> >
>> > checkpointing_issue.PNG
>> > > n4.nabble.com/file/n11640/checkpointing_issue.PNG>
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-flink-user-
>> mailing-list-archive.2336050.n4.nabble.com/Checkpointing-
>> with-RocksDB-as-statebackend-tp11640.html
>> > Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/Checkpointing-with-RocksDB-as-statebackend-
>> tp11640p11641.html
>>
>> To start a new topic under Apache Flink User Mailing List archive., email 
>> [hidden
>> email] 
>>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> 
>>
>>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Checkpointing-with-RocksDB-as-statebackend-
> tp11640p11668.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 

RE: Flink batch processing fault tolerance

2017-02-16 Thread Anton Solovev
Hi Aljoscha,
Could you share your plans of resolving it?

Best,
Anton


From: Aljoscha Krettek [mailto:aljos...@apache.org]
Sent: Thursday, February 16, 2017 2:48 PM
To: user@flink.apache.org
Subject: Re: Flink batch processing fault tolerance

Hi,
yes, this is indeed true. We had some plans for how to resolve this but they 
never materialised because of the focus on Stream Processing. We might unite 
the two in the future and then you will get fault-tolerant batch/stream 
processing in the same API.

Best,
Aljoscha

On Wed, 15 Feb 2017 at 09:28 Renjie Liu 
> wrote:
Hi, all:
I'm learning flink's doc and curious about the fault tolerance of batch process 
jobs. It seems that when one of task execution fails, the whole job will be 
restarted, is it true? If so, isn't it impractical to deploy large flink batch 
jobs?
--
Liu, Renjie
Software Engineer, MVAD


Re: Streaming data from MongoDB using Flink

2017-02-16 Thread Pedro Monteiro
Thank you again for your prompt response.

I will give it a try and will come back to you.

*Pedro Lima Monteiro*

On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai 
wrote:

> I would recommend checking out the Flink RabbitMQ Source for examples:
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-rabbitmq/src/main/java/org/
> apache/flink/streaming/connectors/rabbitmq/RMQSource.java
>
> For your case, you should extend the `RichSourceFunction` which provides
> additional access to override the `open()` life cycle method.
> In that method, you instantiate your MongoDB client connection and  fetch
> the cursor. In the `run()` method, you should essentially have a while loop
> that polls the MongoDB cursor and emits the fetched documents using the
> `SourceContext`.
>
> If your also looking to implement a MongoDB source that works with Flink’s
> checkpointing for exactly-once, be sure to check out:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/stream/state.html#stateful-source-functions
>
> Cheers,
> Gordon
>
> On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (
> pedro.mlmonte...@gmail.com) wrote:
>
> Dear Tzu-Li,
>
> Thank you so much for your prompt response.
>
> Lets assume I have a variable, in Java, env which is my
> StreamExecutionEnvironment. When I go ahead and attempt to do:
>
>> ​env.addSource();
>>
>
> ​It requests an implementation of a Source Function interface:
> ​
>
>> env.addSource(new SourceFunction() {
>
>
>> ​​
>> @Override
>
> public void run(SourceFunction.SourceContext ctx)
>> throws Exception {
>
>
>> ​// TO DO​
>>
> }
>
>
>> @Override
>
> public void cancel() {
>
>
>> ​// TO DO​
>>
> }
>
> });
>
> ​And this is where I'm somehow stuck. I do not understand how should I
> access my MongoDB's cursor in any of this methods (I suppose the most
> adequate would be the "run" method) in a way it would allow me to return a
> new MongoDB document as it arrived to the database from another source.
>
> Once again, thank you so much for your help.
>
> I will wait to hear from you!​
>
> Cumprimentos,
>
> *Pedro Lima Monteiro*
>
> On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Pedro!
>>
>> This is definitely possible, by simply writing a Flink `SourceFunction`
>> that uses MongoDB clients to fetch the data.
>> It should be straightforward and works well with MongoDB’s cursor APIs.
>>
>> Could you explain a bit which part in particular you were stuck with?
>>
>> Cheers,
>> Gordon
>>
>>
>> On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (
>> pedro.mlmonte...@gmail.com) wrote:
>>
>> Good morning,
>>
>> I am trying to get data from MongoDB to be analysed in Flink.
>> I would like to know if it is possible to stream data from MongoDB into
>> Flink. I have looked into Flink's source function to add in the addSource
>> method of the StreamExecutionEnvironment but I had no luck.
>> Can anyone help me out?
>> Thanks.
>>
>> *Pedro Lima Monteiro*
>>
>>
>


Re: Log4J

2017-02-16 Thread Robert Metzger
I've also (successfully) tried running Flink with log4j2 to connect it to
greylog2. If I remember correctly, the biggest problem was "injecting" the
log4j2 properties file into the classpath (when running Flink on YARN).

Maybe you need to put the file into the lib/ folder, so that it is shipped
to all the nodes, and then loaded from the classpath (there is a special
name in the log4j2 documentation. If you use that name, it'll be loaded
from the classloader)

If you are running in standalone mode, you can just modify the scripts to
point the JVMs to the right config file.

On Thu, Feb 16, 2017 at 11:54 AM, Stephan Ewen  wrote:

> Hi!
>
> The bundled log4j version (1.x) does not support that.
>
> But you can replace the logging jars with those of a different framework
> (like log4j 2.x), which supports changing the configuration without
> stopping the application.
> You don't need to rebuild flink, simply replace two jars in the "lib"
> folder (and update the config file, because log4j 2.x has a different
> config format).
>
> This guide shows how to swap log4j 1.x for logback, and you should be able
> to swap in log4j 2.x in the exact same way.
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/monitoring/best_practices.html#use-logback-
> when-running-flink-on-a-cluster
>
>
> On Thu, Feb 16, 2017 at 5:20 AM, Chet Masterson  > wrote:
>
>> Is there a way to reload a log4j.properties file without stopping and
>> starting the job server?
>>
>
>


Re: Checkpointing with RocksDB as statebackend

2017-02-16 Thread Aljoscha Krettek
+Stefan Richter  and +Stephan Ewen
 could this be the same problem that you recently saw
when working with other people?

On Wed, 15 Feb 2017 at 17:23 Vinay Patil  wrote:

> Hi Guys,
>
> Can anyone please help me with this issue
>
> Regards,
> Vinay Patil
>
> On Wed, Feb 15, 2017 at 6:17 PM, Vinay Patil 
> wrote:
>
> Hi Ted,
>
> I have 3 boxes in my pipeline , 1st and 2nd box containing source and s3
> sink and the 3rd box is window operator followed by chained operators and a
> s3 sink
>
> So in the details link section I can see that that S3 sink is taking time
> for the acknowledgement and it is not even going to the window operator
> chain.
>
> But as shown in the snapshot ,checkpoint id 19 did not get any
> acknowledgement. Not sure what is causing the issue
>
>
> Regards,
> Vinay Patil
>
> On Wed, Feb 15, 2017 at 5:51 PM, Ted Yu [via Apache Flink User Mailing
> List archive.]  wrote:
>
> What did the More Details link say ?
>
> Thanks
>
> > On Feb 15, 2017, at 3:11 AM, vinay patil <[hidden email]
> > wrote:
> >
> > Hi,
> >
> > I have kept the checkpointing interval to 6secs and minimum pause
> between
> > checkpoints to 5secs, while testing the pipeline I have observed that
> that
> > for some checkpoints it is taking long time , as you can see in the
> attached
> > snapshot checkpoint id 19 took the maximum time before it gets failed,
> > although it has not received any acknowledgements, now during this
> 10minutes
> > the entire pipeline did not make any progress and no data was getting
> > processed. (For Ex : In 13minutes 20M records were processed and when
> the
> > checkpoint took time there was no progress for the next 10minutes)
> >
> > I have even tried to set max checkpoint timeout to 3min, but in that
> case as
> > well multiple checkpoints were getting failed.
> >
> > I have set RocksDB FLASH_SSD_OPTION
> > What could be the issue ?
> >
> > P.S. I am writing to 3 S3 sinks
> >
> > checkpointing_issue.PNG
> > <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11640/checkpointing_issue.PNG>
>
> >
> >
> >
> > --
> > View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpointing-with-RocksDB-as-statebackend-tp11640.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpointing-with-RocksDB-as-statebackend-tp11640p11641.html
>
> To start a new topic under Apache Flink User Mailing List archive., email
> ml-node+s2336050n1...@n4.nabble.com
>
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>
>


Re: Log4J

2017-02-16 Thread Stephan Ewen
Hi!

The bundled log4j version (1.x) does not support that.

But you can replace the logging jars with those of a different framework
(like log4j 2.x), which supports changing the configuration without
stopping the application.
You don't need to rebuild flink, simply replace two jars in the "lib"
folder (and update the config file, because log4j 2.x has a different
config format).

This guide shows how to swap log4j 1.x for logback, and you should be able
to swap in log4j 2.x in the exact same way.

https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/best_practices.html#use-logback-when-running-flink-on-a-cluster


On Thu, Feb 16, 2017 at 5:20 AM, Chet Masterson 
wrote:

> Is there a way to reload a log4j.properties file without stopping and
> starting the job server?
>


Re: Flink batch processing fault tolerance

2017-02-16 Thread Aljoscha Krettek
Hi,
yes, this is indeed true. We had some plans for how to resolve this but
they never materialised because of the focus on Stream Processing. We might
unite the two in the future and then you will get fault-tolerant
batch/stream processing in the same API.

Best,
Aljoscha

On Wed, 15 Feb 2017 at 09:28 Renjie Liu  wrote:

> Hi, all:
> I'm learning flink's doc and curious about the fault tolerance of batch
> process jobs. It seems that when one of task execution fails, the whole job
> will be restarted, is it true? If so, isn't it impractical to deploy large
> flink batch jobs?
> --
> Liu, Renjie
> Software Engineer, MVAD
>


Re: Clarification: use of AllWindowedStream.apply() function

2017-02-16 Thread Aljoscha Krettek
Hi,
you would indeed use apply(), or better fold(,
, ) to map the result of folding your
window to some other data type. If you will, a WindowFunction allows
"mapping" the result of your windowing to a different type.

Best,
Aljoscha

On Wed, 15 Feb 2017 at 06:14 nsengupta  wrote:

> I have gone through this  post
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WindowedStream-operation-questions-td6006.html
> >
> , where Aljoscha explains that /mapping/ on WindowedStream is /not/
> allowed.
>
> So, I think I haven't asked the question properly. Here is (hopefully) a
> better and easier version:
>
> 1.I begin with records of type RawMITSIMTuple.
> 2.When I group them using a Window, I get an
> AllWindowedStream[RawMITSIMTuple].
> 3.I /fold/ the tuples obtained in the Window, which gives me a
> DataStream[Vector[RawMITSIMTuple].
> 4.What I need is a DataStream[PositionReport]. So, I need to flatMap
> the
> output of previous step, where I first get hold of each of the
> RawMITSIMTuple and map that to PositionReport.
>
> val positionReportStream = this
>   .readRawMITSIMTuplesInjected(envDefault,args(0))
>   .assignAscendingTimestamps(e => e.timeOfReport)
>   .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
>   .fold(Vector[RawMITSIMTuple]())((collectorBin,rawRecord) => {
>   collectorBin :+ rawRecord)
> })
>   .flatMap(r => r.map(e => this.preparePositionReport(e)))
>
> This gives me what I want, but I feel this is verbose and inefficient. Am I
> thinking correctly? If so, what is a better idiom to use in such cases?
>
> -- Nirmalya
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627p11630.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: How important is 'registerType'?

2017-02-16 Thread Aljoscha Krettek
Hi,
are you changing anything on your job between performing the savepoint and
restoring the savepoint? Flink upgrade, Job upgrade, changing Kryo version,
changing order in which you register Kryo serialisers?

Best,
Aljoscha

On Fri, 10 Feb 2017 at 18:26 Dmitry Golubets  wrote:

> The docs say that it may improve performance.
>
> How true is it, when custom serializers are provided?
> There is also 'disableAutoTypeRegistration' method in the config class,
> implying Flink registers types automatically.
>
> So, given that I have an hierarchy:
> trait A
> class B extends A
> class C extends A
>
> and I do addDefaultKryoSerializer(classOf[A], classOf[ASerializer])
>
> should I care about registering B and C with 'registerType' method?
>
> It worth mentioning that when I registered my message class hierarchies, I
> got:
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> java.io.StreamCorruptedException: invalid type code: 00
> on restoring from savepoint
>
> After some debugging I found that 'registerType' was the cause.
> It might be possible that my code called registerType in different order.
> Could it be a problem?
>
> Best regards,
> Dmitry
>


Re: Flink Job Exception

2017-02-16 Thread Aljoscha Krettek
Hi Govindarajan,
the Jira issue that you linked to and which Till is currently fixing will
only fix the obvious type mismatch in the Akka messages. There is also an
underlying problem that causes this message to be sent in the first place.
In the case of the user who originally created the Jira issue the reason
was that the Max-Parallelism was set to a value smaller than the
parallelism. Can you try looking in the JobManager/TaskManager logs and see
if you find the original cause there?

Cheers,
Aljoscha

On Thu, 16 Feb 2017 at 09:36 Till Rohrmann  wrote:

> Hi Govindarajan,
>
> there is a pending PR for this issue. I think I can merge it today.
>
> Cheers,
> Till
>
> On Thu, Feb 16, 2017 at 12:50 AM, Govindarajan Srinivasaraghavan <
> govindragh...@gmail.com> wrote:
>
> Hi All,
>
> I'm trying to run a streaming job with flink 1.2 version and there are 3
> task managers with 12 task slots. Irrespective of the parallelism that I
> give it always fails with the below error and I found a JIRA link
> corresponding to this issue. Can I know by when this will be resolved since
> I'm not able to run any job in my current environment. Thanks.
>
> https://issues.apache.org/jira/browse/FLINK-5773
>
> java.lang.ClassCastException: Cannot cast scala.util.Failure to 
> org.apache.flink.runtime.messages.Acknowledge
>   at java.lang.Class.cast(Class.java:3369)
>   at scala.concurrent.Future$$anonfun$mapTo$1.apply(Future.scala:405)
>   at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
>   at scala.util.Try$.apply(Try.scala:192)
>   at scala.util.Success.map(Try.scala:237)
>   at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
>   at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>   at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>   at 
> scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>   at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>   at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>   at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
>   at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89)
>   at 
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:967)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>


Re: Streaming data from MongoDB using Flink

2017-02-16 Thread Tzu-Li (Gordon) Tai
I would recommend checking out the Flink RabbitMQ Source for examples:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java

For your case, you should extend the `RichSourceFunction` which provides 
additional access to override the `open()` life cycle method.
In that method, you instantiate your MongoDB client connection and  fetch the 
cursor. In the `run()` method, you should essentially have a while loop that 
polls the MongoDB cursor and emits the fetched documents using the 
`SourceContext`.

If your also looking to implement a MongoDB source that works with Flink’s 
checkpointing for exactly-once, be sure to check out:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#stateful-source-functions

Cheers,
Gordon
On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) 
wrote:

Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my 
StreamExecutionEnvironment. When I go ahead and attempt to do:
​env.addSource();  
​It requests an implementation of a Source Function interface: ​
env.addSource(new SourceFunction() {
            ​​ @Override
            public void run(SourceFunction.SourceContext ctx) throws 
Exception {
                 ​// TO DO​
            }

            @Override
            public void cancel() {
                ​// TO DO​
            }
        });
​And this is where I'm somehow stuck. I do not understand how should I access 
my MongoDB's cursor in any of this methods (I suppose the most adequate would 
be the "run" method) in a way it would allow me to return a new MongoDB 
document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

Pedro Lima Monteiro

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai  wrote:
Hi Pedro!

This is definitely possible, by simply writing a Flink `SourceFunction` that 
uses MongoDB clients to fetch the data.
It should be straightforward and works well with MongoDB’s cursor APIs.

Could you explain a bit which part in particular you were stuck with?

Cheers,
Gordon


On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) 
wrote:

Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

Pedro Lima Monteiro



Re: Flink jdbc

2017-02-16 Thread Punit Tandel
Yes  i have been following the tutorials and reading from H2 and writing 
to H2 works fine, But problem here is data coming from kafka and writing 
them to h2 engine does not seems to work and cant see any error thrown 
while writing into in memory H2 database, So couldnt say whats the error 
and why those data are not inserted.


Have been trying to find out cause and looking for logs while flink 
processes the operations but couldnt find any error being thrown at the 
time of writing data. Any where i can check for logs ?


Thanks


On 02/16/2017 01:10 AM, Ted Yu wrote:

See the tutorial at the beginning of:

flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java

Looks like plugging in "org.h2.Driver" should do.

On Wed, Feb 15, 2017 at 4:59 PM, Punit Tandel 
> wrote:


Hi All

Does flink jdbc support writing the data into H2 Database?

Thanks
Punit






Re: Streaming data from MongoDB using Flink

2017-02-16 Thread Pedro Monteiro
Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my
StreamExecutionEnvironment. When I go ahead and attempt to do:

> ​env.addSource();
>

​It requests an implementation of a Source Function interface:
​

> env.addSource(new SourceFunction() {


> ​​
> @Override

public void run(SourceFunction.SourceContext ctx)
> throws Exception {


> ​// TO DO​
>
}


> @Override

public void cancel() {


> ​// TO DO​
>
}

});

​And this is where I'm somehow stuck. I do not understand how should I
access my MongoDB's cursor in any of this methods (I suppose the most
adequate would be the "run" method) in a way it would allow me to return a
new MongoDB document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​

Cumprimentos,

*Pedro Lima Monteiro*

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai 
wrote:

> Hi Pedro!
>
> This is definitely possible, by simply writing a Flink `SourceFunction`
> that uses MongoDB clients to fetch the data.
> It should be straightforward and works well with MongoDB’s cursor APIs.
>
> Could you explain a bit which part in particular you were stuck with?
>
> Cheers,
> Gordon
>
>
> On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (
> pedro.mlmonte...@gmail.com) wrote:
>
> Good morning,
>
> I am trying to get data from MongoDB to be analysed in Flink.
> I would like to know if it is possible to stream data from MongoDB into
> Flink. I have looked into Flink's source function to add in the addSource
> method of the StreamExecutionEnvironment but I had no luck.
> Can anyone help me out?
> Thanks.
>
> *Pedro Lima Monteiro*
>
>


Streaming data from MongoDB using Flink

2017-02-16 Thread Pedro Monteiro
Good morning,

I am trying to get data from MongoDB to be analysed in Flink.
I would like to know if it is possible to stream data from MongoDB into
Flink. I have looked into Flink's source function to add in the addSource
method of the StreamExecutionEnvironment but I had no luck.
Can anyone help me out?
Thanks.

*Pedro Lima Monteiro*


Re: Flink Job Exception

2017-02-16 Thread Till Rohrmann
Hi Govindarajan,

there is a pending PR for this issue. I think I can merge it today.

Cheers,
Till

On Thu, Feb 16, 2017 at 12:50 AM, Govindarajan Srinivasaraghavan <
govindragh...@gmail.com> wrote:

> Hi All,
>
> I'm trying to run a streaming job with flink 1.2 version and there are 3
> task managers with 12 task slots. Irrespective of the parallelism that I
> give it always fails with the below error and I found a JIRA link
> corresponding to this issue. Can I know by when this will be resolved since
> I'm not able to run any job in my current environment. Thanks.
>
> https://issues.apache.org/jira/browse/FLINK-5773
>
> java.lang.ClassCastException: Cannot cast scala.util.Failure to 
> org.apache.flink.runtime.messages.Acknowledge
>   at java.lang.Class.cast(Class.java:3369)
>   at scala.concurrent.Future$$anonfun$mapTo$1.apply(Future.scala:405)
>   at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
>   at scala.util.Try$.apply(Try.scala:192)
>   at scala.util.Success.map(Try.scala:237)
>   at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
>   at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>   at 
> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>   at 
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>   at 
> scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>   at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>   at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>   at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:266)
>   at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89)
>   at 
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:967)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>