Using Kafka and Flink for batch processing of a batch data source

2016-07-19 Thread Leith Mudge
I am currently working on an architecture for a big data streaming and batch 
processing platform. I am planning on using Apache Kafka for a distributed 
messaging system to handle data from streaming data sources and then pass on to 
Apache Flink for stream processing. I would also like to use Flink's batch 
processing capabilities to process batch data.

Does it make sense to pass the batched data through Kafka on a periodic basis 
as a source for Flink batch processing (is this even possible?) or should I 
just write the batch data to a data store and then process by reading into 
Flink?



| All rights in this email and any attached documents or files are expressly 
reserved. This e-mail, and any files transmitted with it, contains confidential 
information which may be subject to legal privilege. If you are not the 
intended recipient, please delete it and notify Palamir Pty Ltd by e-mail. 
Palamir Pty Ltd does not warrant this transmission or attachments are free from 
viruses or similar malicious code and does not accept liability for any 
consequences to the recipient caused by opening or using this e-mail. For the 
legal protection of our business, any email sent or received by us may be 
monitored or intercepted. | Please consider the environment before printing 
this email. |


Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse

2016-07-19 Thread Biplob Biswas
Thanks a ton, Till.

That worked. Thank you so much.

-Biplob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016p8035.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Aggregate events in time window

2016-07-19 Thread Till Rohrmann
Hi Dominique,

your problem sounds like a good use case for session windows [1, 2]. If you
know that there is only a maximum gap between your request and response
message, then you could create a session window via:

input
.keyBy("ReqRespID")

.window(EventTimeSessionWindows.withGap(Time.minutes(MaxTimeBetweenReqResp)))
.(/* calculate time */);

[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#session-windows
[2] http://data-artisans.com/session-windowing-in-flink/

Cheers,
Till
​

On Tue, Jul 19, 2016 at 7:04 PM, Sameer W  wrote:

> How about using EventTime windows with watermark assignment and bounded
> delays. That way you allow more than 5 minutes (bounded delay) for your
> request and responses to arrive. Do you have a way to assign timestamp to
> the responses based on the request timestamp (does the response contain the
> request timestamp in some form). That way you add them to the same window.
>
> Sameer
>
> On Tue, Jul 19, 2016 at 12:31 PM, Dominique Rondé <
> dominique.ro...@allsecur.de> wrote:
>
>> Hi all,
>>
>> once again I need a "kick" to the right direction. I have a datastream
>> with request and responses identified by an ReqResp-ID. I like to calculate
>> the (avg, 95%, 99%) time between the request and response and also like to
>> count them. I thought of
>> ".keyBy("ReqRespID").timeWindowAll(Time.minutes(5)).apply(function)" would
>> do the job, but there are some cases were a Request is in the first and the
>> Response is in the second window. But if i use a overlapping time window
>> (i.e. timeWindowAll(Time.minutes(5),Time.seconds(60))) I have a lot of
>> requests more then one time in the apply-function.
>>
>> Do you have any hint for me?
>>
>> Thanks a lot!
>>
>> Dominique
>>
>>
>


Re: DataStreamUtils not working properly

2016-07-19 Thread subash basnet
Hello Till,

Yup I can see the log output in my console, but there is no information
there regarding if there is any error in conversion. Just normal warn and
info as below:
22:09:16,676 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
  - No state backend has been specified, using default state backend
(Memory / JobManager)
22:09:16,676 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
  - State backend is set to heap memory (checkpoint to jobmanager)

The above message is always there when I run my project. It would be great
if someone could check why the collection of datastream via DataStreamUtils
is giving empty result.

Best Regards,
Subash Basnet

On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmann  wrote:

> It depends if you have a log4j.properties file specified in your
> classpath. If you see log output on the console, then it should also print
> errors there.
>
> Cheers,
> Till
>
> On Tue, Jul 19, 2016 at 3:08 PM, subash basnet  wrote:
>
>> Hello Till,
>>
>> Shouldn't it write something in the eclipse console if there is any error
>> or warning. But nothing about error is printed on the console. And I
>> checked the flink project folder: flink-core, flink streaming as such but
>> couldn't find where the log is written when run via eclipse.
>>
>> Best Regards,
>> Subash Basnet
>>
>> On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann 
>> wrote:
>>
>>> Have you checked your logs whether they contain some problems? In
>>> general it is not recommended collecting the streaming result back to your
>>> client. It might also be a problem with `DataStreamUtils.collect`.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet 
>>> wrote:
>>>
 Hello all,

 I tried to check if it works for tuple but same problem, the collection
 still shows blank result. I took the id of centroid tuple and printed it,
 but the collection displays empty.

 DataStream centroids = newCentroidDataStream.map(new
 TupleCentroidConverter());
 DataStream centroidId = centroids.map(new TestMethod());
 centroidId.print();
 Iterator iter = DataStreamUtils.collect(centroidId);
 Collection testCentroids = Lists.newArrayList(iter);
 for (Tuple1 c : testCentroids) {
 System.out.println(c);
 }
 Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016)
 (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18
 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(),
 but no output for System.out.println(c); Best Regards, Subash Basnet

 On Tue, Jul 19, 2016 at 10:48 AM, subash basnet 
 wrote:

> Hello all,
>
> I am trying to convert datastream to collection, but it's shows blank
> result. There is a stream of data which can be viewed on the console on
> print(), but the collection of the same stream shows empty after
> conversion. Below is the code:
>
> DataStream centroids = newCentroidDataStream.map(new
> TupleCentroidConverter());
> centroids.print();
> Iterator iter = DataStreamUtils.collect(centroids);
> Collection testCentroids = Lists.newArrayList(iter);
> for(Centroid c: testCentroids){
> System.out.println(c);
> }
>
> The above *centroids.print()* gives the following output in console:
>
> Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
> Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
> Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
> Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
> Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0
>
> But the next *System.out.println(c) *within the for loop prints
> nothing. What could be the problem.
>
> My maven has following configuration for dataStreamUtils:
> 
> org.apache.flink
> flink-streaming-contrib_2.10
> ${flink.version}
> 
>
>
> Best Regards,
> Subash Basnet
>
>

>>>
>>
>


Re: Aggregate events in time window

2016-07-19 Thread Sameer W
How about using EventTime windows with watermark assignment and bounded
delays. That way you allow more than 5 minutes (bounded delay) for your
request and responses to arrive. Do you have a way to assign timestamp to
the responses based on the request timestamp (does the response contain the
request timestamp in some form). That way you add them to the same window.

Sameer

On Tue, Jul 19, 2016 at 12:31 PM, Dominique Rondé <
dominique.ro...@allsecur.de> wrote:

> Hi all,
>
> once again I need a "kick" to the right direction. I have a datastream
> with request and responses identified by an ReqResp-ID. I like to calculate
> the (avg, 95%, 99%) time between the request and response and also like to
> count them. I thought of
> ".keyBy("ReqRespID").timeWindowAll(Time.minutes(5)).apply(function)" would
> do the job, but there are some cases were a Request is in the first and the
> Response is in the second window. But if i use a overlapping time window
> (i.e. timeWindowAll(Time.minutes(5),Time.seconds(60))) I have a lot of
> requests more then one time in the apply-function.
>
> Do you have any hint for me?
>
> Thanks a lot!
>
> Dominique
>
>


Aggregate events in time window

2016-07-19 Thread Dominique Rondé

Hi all,

once again I need a "kick" to the right direction. I have a datastream 
with request and responses identified by an ReqResp-ID. I like to 
calculate the (avg, 95%, 99%) time between the request and response and 
also like to count them. I thought of 
".keyBy("ReqRespID").timeWindowAll(Time.minutes(5)).apply(function)" 
would do the job, but there are some cases were a Request is in the 
first and the Response is in the second window. But if i use a 
overlapping time window (i.e. 
timeWindowAll(Time.minutes(5),Time.seconds(60))) I have a lot of 
requests more then one time in the apply-function.


Do you have any hint for me?

Thanks a lot!

Dominique



Re: DataStreamUtils not working properly

2016-07-19 Thread Till Rohrmann
It depends if you have a log4j.properties file specified in your classpath.
If you see log output on the console, then it should also print errors
there.

Cheers,
Till

On Tue, Jul 19, 2016 at 3:08 PM, subash basnet  wrote:

> Hello Till,
>
> Shouldn't it write something in the eclipse console if there is any error
> or warning. But nothing about error is printed on the console. And I
> checked the flink project folder: flink-core, flink streaming as such but
> couldn't find where the log is written when run via eclipse.
>
> Best Regards,
> Subash Basnet
>
> On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann 
> wrote:
>
>> Have you checked your logs whether they contain some problems? In general
>> it is not recommended collecting the streaming result back to your client.
>> It might also be a problem with `DataStreamUtils.collect`.
>>
>> Cheers,
>> Till
>>
>> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet 
>> wrote:
>>
>>> Hello all,
>>>
>>> I tried to check if it works for tuple but same problem, the collection
>>> still shows blank result. I took the id of centroid tuple and printed it,
>>> but the collection displays empty.
>>>
>>> DataStream centroids = newCentroidDataStream.map(new
>>> TupleCentroidConverter());
>>> DataStream centroidId = centroids.map(new TestMethod());
>>> centroidId.print();
>>> Iterator iter = DataStreamUtils.collect(centroidId);
>>> Collection testCentroids = Lists.newArrayList(iter);
>>> for (Tuple1 c : testCentroids) {
>>> System.out.println(c);
>>> }
>>> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016)
>>> (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18
>>> 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(),
>>> but no output for System.out.println(c); Best Regards, Subash Basnet
>>>
>>> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet 
>>> wrote:
>>>
 Hello all,

 I am trying to convert datastream to collection, but it's shows blank
 result. There is a stream of data which can be viewed on the console on
 print(), but the collection of the same stream shows empty after
 conversion. Below is the code:

 DataStream centroids = newCentroidDataStream.map(new
 TupleCentroidConverter());
 centroids.print();
 Iterator iter = DataStreamUtils.collect(centroids);
 Collection testCentroids = Lists.newArrayList(iter);
 for(Centroid c: testCentroids){
 System.out.println(c);
 }

 The above *centroids.print()* gives the following output in console:

 Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
 Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
 Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
 Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
 Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0

 But the next *System.out.println(c) *within the for loop prints
 nothing. What could be the problem.

 My maven has following configuration for dataStreamUtils:
 
 org.apache.flink
 flink-streaming-contrib_2.10
 ${flink.version}
 


 Best Regards,
 Subash Basnet


>>>
>>
>


Re: Parallelizing openCV libraries in Flink

2016-07-19 Thread Debaditya Roy
Hello,

I cannot have an access to the web interface from the nodes I am using.
However I will check the logs for anything suspicious and get back.
Thanks :-)

Regards,
Debaditya

On Tue, Jul 19, 2016 at 4:46 PM, Till Rohrmann  wrote:

> Hi Debaditya,
>
> you can see in the web interface how much data each source has sent to the
> downstream tasks and how much data was consumed by the sinks. This should
> tell you whether your sources have actually read some data. You can also
> check the log files whether you find anything suspicious there.
>
> Cheers,
> Till
>
> On Tue, Jul 19, 2016 at 10:33 AM, Debaditya Roy 
> wrote:
>
>> Hello users,
>>
>> I am currently doing a project in image processing with Open CV library.
>> Have anyone here faced any issue with parallelizing the library in flink? I
>> have written a code which is running fine on local environment, however
>> when I try to run it in distributed environment it writes (it was supposed
>> to write some result) in the sink files. I suspect that it is having
>> problem with reading the video file which I have supplied the source
>> directory.
>> Any comments and similar experience will be extremely helpful.
>>
>> Warm Regards,
>> Debaditya
>>
>
>


Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse

2016-07-19 Thread Till Rohrmann
Hi Biplob,

if you want to start the web interface from within your IDE, then you have
to create a local execution environment as Ufuk told you:

Configuration config = new Configuration();
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(getP, config);

and you have to add the following dependency to your pom.xml:


org.apache.flink
flink-runtime-web_2.10
${flink.version}


Cheers,
Till
​

On Tue, Jul 19, 2016 at 2:27 PM, Sameer W  wrote:

> Yes you have to provide the path of your jar. The reason is:
> 1. When you start in the pseudo-cluster mode the tasks are started in
> their own JVM's with their own class loader.
> 2. You client program has access to your custom operator classes but the
> remote JVM's don't. Hence you need to ship the JAR file to these remote
> Task nodes. The getRemoteExcecutionEnvironment() method has overloaded
> version which takes a JAR file. Just provide your local path to it and it
> will ship it when it starts
>
> Sameer
>
> On Tue, Jul 19, 2016 at 6:51 AM, Biplob Biswas 
> wrote:
>
>> Hi Sameer,
>>
>> Thanks for that quick reply, I was using flink streaming so the program
>> keeps on running until i close it. But anyway I am ready to try this
>> getRemoteExecutionEnvironment(), I checked but it ask me for the jar file,
>> which is weird because I am running the program directly.
>>
>> Does it mean I create a jar package and then run it via eclipse?
>>
>> If not, could you point me to some resources?
>>
>> Thanks
>> Biplob
>>
>>
>> Sameer W wrote
>> > From Eclipse it creates a local environment and runs in the IDE. When
>> the
>> > program finishes so does the Flink execution instance. I have never
>> tried
>> > accessing the console when the program is running but one the program is
>> > finished there is nothing to connect to.
>> >
>> > If you need to access the dashboard, start Flink in the pseudo-cluster
>> > mode
>> > and connect to it using the getRemoteExecutionEnvironment(). That will
>> > allow you to access the jobs statuses on the dashboard when you finish
>> > running your job.
>> >
>> > Sameer
>> >
>> > On Tue, Jul 19, 2016 at 6:35 AM, Biplob Biswas 
>>
>> > revolutionisme@
>>
>> > 
>> > wrote:
>> >
>> >> Hi,
>> >>
>> >> I am running my flink program using Eclipse and I can't access the
>> >> dashboard
>> >> at http://localhost:8081, can someone help me with this?
>> >>
>> >> I read that I need to check my flink-conf.yaml, but its a maven project
>> >> and
>> >> I don't have a flink-conf.
>> >>
>> >> Any help would be really appreciated.
>> >>
>> >> Thanks a lot
>> >> Biplob
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> >>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html
>> >> Sent from the Apache Flink User Mailing List archive. mailing list
>> >> archive
>> >> at Nabble.com.
>> >>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016p8018.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


Re: Intermediate Data Caching

2016-07-19 Thread Saliya Ekanayake
Thank you, Ufuk!

On Tue, Jul 19, 2016 at 5:51 AM, Ufuk Celebi  wrote:

> PS: I forgot to mention that also constant iteration input is cached.
>
> On Mon, Jul 18, 2016 at 11:27 AM, Ufuk Celebi  wrote:
> > Hey Saliya,
> >
> > the result of each iteration (super step) that is fed back to the
> > iteration is cached. For the iterate operator that is the last partial
> > solution and for the delta iterate operator it's the current solution
> > set (
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html
> ).
> >
> > Internally, this works via custom iteration operator implementations
> > for head and tail tasks, which are co-located and share a hash table.
> > I think that the internals of this are not documented, you would have
> > to look into the code for this. Most of the relevant implementations
> > are found in the "org.apache.flink.runtime.iterative.task" package.
> >
> > Hope this helps...
> >
> > Ufuk
> >
> >
> > On Sun, Jul 17, 2016 at 9:36 PM, Saliya Ekanayake 
> wrote:
> >> Hi,
> >>
> >> I am trying to understand what's the intermediate caching support in
> Flink.
> >> For example, when there's an iterative dataset what's being cached
> between
> >> iterations. Is there some documentation on this?
> >>
> >> Thank you,
> >> Saliya
> >>
> >> --
> >> Saliya Ekanayake
> >> Ph.D. Candidate | Research Assistant
> >> School of Informatics and Computing | Digital Science Center
> >> Indiana University, Bloomington
> >>
>



-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington


Re: DataStreamUtils not working properly

2016-07-19 Thread subash basnet
Hello Till,

Shouldn't it write something in the eclipse console if there is any error
or warning. But nothing about error is printed on the console. And I
checked the flink project folder: flink-core, flink streaming as such but
couldn't find where the log is written when run via eclipse.

Best Regards,
Subash Basnet

On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann  wrote:

> Have you checked your logs whether they contain some problems? In general
> it is not recommended collecting the streaming result back to your client.
> It might also be a problem with `DataStreamUtils.collect`.
>
> Cheers,
> Till
>
> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet  wrote:
>
>> Hello all,
>>
>> I tried to check if it works for tuple but same problem, the collection
>> still shows blank result. I took the id of centroid tuple and printed it,
>> but the collection displays empty.
>>
>> DataStream centroids = newCentroidDataStream.map(new
>> TupleCentroidConverter());
>> DataStream centroidId = centroids.map(new TestMethod());
>> centroidId.print();
>> Iterator iter = DataStreamUtils.collect(centroidId);
>> Collection testCentroids = Lists.newArrayList(iter);
>> for (Tuple1 c : testCentroids) {
>> System.out.println(c);
>> }
>> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016)
>> (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18
>> 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(),
>> but no output for System.out.println(c); Best Regards, Subash Basnet
>>
>> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet 
>> wrote:
>>
>>> Hello all,
>>>
>>> I am trying to convert datastream to collection, but it's shows blank
>>> result. There is a stream of data which can be viewed on the console on
>>> print(), but the collection of the same stream shows empty after
>>> conversion. Below is the code:
>>>
>>> DataStream centroids = newCentroidDataStream.map(new
>>> TupleCentroidConverter());
>>> centroids.print();
>>> Iterator iter = DataStreamUtils.collect(centroids);
>>> Collection testCentroids = Lists.newArrayList(iter);
>>> for(Centroid c: testCentroids){
>>> System.out.println(c);
>>> }
>>>
>>> The above *centroids.print()* gives the following output in console:
>>>
>>> Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
>>> Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
>>> Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
>>> Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
>>> Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0
>>>
>>> But the next *System.out.println(c) *within the for loop prints
>>> nothing. What could be the problem.
>>>
>>> My maven has following configuration for dataStreamUtils:
>>> 
>>> org.apache.flink
>>> flink-streaming-contrib_2.10
>>> ${flink.version}
>>> 
>>>
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>>
>>
>


Re: DataStreamUtils not working properly

2016-07-19 Thread Till Rohrmann
Have you checked your logs whether they contain some problems? In general
it is not recommended collecting the streaming result back to your client.
It might also be a problem with `DataStreamUtils.collect`.

Cheers,
Till

On Tue, Jul 19, 2016 at 2:42 PM, subash basnet  wrote:

> Hello all,
>
> I tried to check if it works for tuple but same problem, the collection
> still shows blank result. I took the id of centroid tuple and printed it,
> but the collection displays empty.
>
> DataStream centroids = newCentroidDataStream.map(new
> TupleCentroidConverter());
> DataStream centroidId = centroids.map(new TestMethod());
> centroidId.print();
> Iterator iter = DataStreamUtils.collect(centroidId);
> Collection testCentroids = Lists.newArrayList(iter);
> for (Tuple1 c : testCentroids) {
> System.out.println(c);
> }
> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016)
> (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18
> 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(),
> but no output for System.out.println(c); Best Regards, Subash Basnet
>
> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet 
> wrote:
>
>> Hello all,
>>
>> I am trying to convert datastream to collection, but it's shows blank
>> result. There is a stream of data which can be viewed on the console on
>> print(), but the collection of the same stream shows empty after
>> conversion. Below is the code:
>>
>> DataStream centroids = newCentroidDataStream.map(new
>> TupleCentroidConverter());
>> centroids.print();
>> Iterator iter = DataStreamUtils.collect(centroids);
>> Collection testCentroids = Lists.newArrayList(iter);
>> for(Centroid c: testCentroids){
>> System.out.println(c);
>> }
>>
>> The above *centroids.print()* gives the following output in console:
>>
>> Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
>> Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
>> Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
>> Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
>> Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0
>>
>> But the next *System.out.println(c) *within the for loop prints nothing.
>> What could be the problem.
>>
>> My maven has following configuration for dataStreamUtils:
>> 
>> org.apache.flink
>> flink-streaming-contrib_2.10
>> ${flink.version}
>> 
>>
>>
>> Best Regards,
>> Subash Basnet
>>
>>
>


Re: DataStreamUtils not working properly

2016-07-19 Thread subash basnet
Hello all,

I tried to check if it works for tuple but same problem, the collection
still shows blank result. I took the id of centroid tuple and printed it,
but the collection displays empty.

DataStream centroids = newCentroidDataStream.map(new
TupleCentroidConverter());
DataStream centroidId = centroids.map(new TestMethod());
centroidId.print();
Iterator iter = DataStreamUtils.collect(centroidId);
Collection testCentroids = Lists.newArrayList(iter);
for (Tuple1 c : testCentroids) {
System.out.println(c);
}
Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016)
(Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18
17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(),
but no output for System.out.println(c); Best Regards, Subash Basnet

On Tue, Jul 19, 2016 at 10:48 AM, subash basnet  wrote:

> Hello all,
>
> I am trying to convert datastream to collection, but it's shows blank
> result. There is a stream of data which can be viewed on the console on
> print(), but the collection of the same stream shows empty after
> conversion. Below is the code:
>
> DataStream centroids = newCentroidDataStream.map(new
> TupleCentroidConverter());
> centroids.print();
> Iterator iter = DataStreamUtils.collect(centroids);
> Collection testCentroids = Lists.newArrayList(iter);
> for(Centroid c: testCentroids){
> System.out.println(c);
> }
>
> The above *centroids.print()* gives the following output in console:
>
> Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
> Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
> Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
> Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
> Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0
>
> But the next *System.out.println(c) *within the for loop prints nothing.
> What could be the problem.
>
> My maven has following configuration for dataStreamUtils:
> 
> org.apache.flink
> flink-streaming-contrib_2.10
> ${flink.version}
> 
>
>
> Best Regards,
> Subash Basnet
>
>


Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse

2016-07-19 Thread Sameer W
Yes you have to provide the path of your jar. The reason is:
1. When you start in the pseudo-cluster mode the tasks are started in their
own JVM's with their own class loader.
2. You client program has access to your custom operator classes but the
remote JVM's don't. Hence you need to ship the JAR file to these remote
Task nodes. The getRemoteExcecutionEnvironment() method has overloaded
version which takes a JAR file. Just provide your local path to it and it
will ship it when it starts

Sameer

On Tue, Jul 19, 2016 at 6:51 AM, Biplob Biswas 
wrote:

> Hi Sameer,
>
> Thanks for that quick reply, I was using flink streaming so the program
> keeps on running until i close it. But anyway I am ready to try this
> getRemoteExecutionEnvironment(), I checked but it ask me for the jar file,
> which is weird because I am running the program directly.
>
> Does it mean I create a jar package and then run it via eclipse?
>
> If not, could you point me to some resources?
>
> Thanks
> Biplob
>
>
> Sameer W wrote
> > From Eclipse it creates a local environment and runs in the IDE. When the
> > program finishes so does the Flink execution instance. I have never tried
> > accessing the console when the program is running but one the program is
> > finished there is nothing to connect to.
> >
> > If you need to access the dashboard, start Flink in the pseudo-cluster
> > mode
> > and connect to it using the getRemoteExecutionEnvironment(). That will
> > allow you to access the jobs statuses on the dashboard when you finish
> > running your job.
> >
> > Sameer
> >
> > On Tue, Jul 19, 2016 at 6:35 AM, Biplob Biswas 
>
> > revolutionisme@
>
> > 
> > wrote:
> >
> >> Hi,
> >>
> >> I am running my flink program using Eclipse and I can't access the
> >> dashboard
> >> at http://localhost:8081, can someone help me with this?
> >>
> >> I read that I need to check my flink-conf.yaml, but its a maven project
> >> and
> >> I don't have a flink-conf.
> >>
> >> Any help would be really appreciated.
> >>
> >> Thanks a lot
> >> Biplob
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html
> >> Sent from the Apache Flink User Mailing List archive. mailing list
> >> archive
> >> at Nabble.com.
> >>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016p8018.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse

2016-07-19 Thread Biplob Biswas
Thanks Ufuk, for the input. I tried what u suggested as well ( as follows)

Configuration config = new Configuration(); 
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); 

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(getP, config);

But still i get nothing, i tried 127.0.0.1:8081 and localhost:8081 both as I
was getting this message in my log "No hostname could be resolved for the IP
address 127.0.0.1, using IP address as host name."

But still nothing. 

Anything else I can try, before installing flink and running it via command
line? 

Thanks for such quick replie btw :)

Biplob

Ufuk Celebi wrote
> You can explicitly create a LocalEnvironment and provide a Configuration:
> 
> Configuration config = new Configuration();
> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
> 
> ExecutionEnvironment env = new LocalEnvironment(config);
> ...
> 
> 
> On Tue, Jul 19, 2016 at 1:28 PM, Sameer W 

> sameer@

>  wrote:
>> From Eclipse it creates a local environment and runs in the IDE. When the
>> program finishes so does the Flink execution instance. I have never tried
>> accessing the console when the program is running but one the program is
>> finished there is nothing to connect to.
>>
>> If you need to access the dashboard, start Flink in the pseudo-cluster
>> mode
>> and connect to it using the getRemoteExecutionEnvironment(). That will
>> allow
>> you to access the jobs statuses on the dashboard when you finish running
>> your job.
>>
>> Sameer
>>
>> On Tue, Jul 19, 2016 at 6:35 AM, Biplob Biswas 

> revolutionisme@

> 
>> wrote:
>>>
>>> Hi,
>>>
>>> I am running my flink program using Eclipse and I can't access the
>>> dashboard
>>> at http://localhost:8081, can someone help me with this?
>>>
>>> I read that I need to check my flink-conf.yaml, but its a maven project
>>> and
>>> I don't have a flink-conf.
>>>
>>> Any help would be really appreciated.
>>>
>>> Thanks a lot
>>> Biplob
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive
>>> at Nabble.com.
>>
>>





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016p8020.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse

2016-07-19 Thread Ufuk Celebi
You can explicitly create a LocalEnvironment and provide a Configuration:

Configuration config = new Configuration();
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

ExecutionEnvironment env = new LocalEnvironment(config);
...


On Tue, Jul 19, 2016 at 1:28 PM, Sameer W  wrote:
> From Eclipse it creates a local environment and runs in the IDE. When the
> program finishes so does the Flink execution instance. I have never tried
> accessing the console when the program is running but one the program is
> finished there is nothing to connect to.
>
> If you need to access the dashboard, start Flink in the pseudo-cluster mode
> and connect to it using the getRemoteExecutionEnvironment(). That will allow
> you to access the jobs statuses on the dashboard when you finish running
> your job.
>
> Sameer
>
> On Tue, Jul 19, 2016 at 6:35 AM, Biplob Biswas 
> wrote:
>>
>> Hi,
>>
>> I am running my flink program using Eclipse and I can't access the
>> dashboard
>> at http://localhost:8081, can someone help me with this?
>>
>> I read that I need to check my flink-conf.yaml, but its a maven project
>> and
>> I don't have a flink-conf.
>>
>> Any help would be really appreciated.
>>
>> Thanks a lot
>> Biplob
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive
>> at Nabble.com.
>
>


Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse

2016-07-19 Thread Biplob Biswas
Hi Sameer,

Thanks for that quick reply, I was using flink streaming so the program
keeps on running until i close it. But anyway I am ready to try this
getRemoteExecutionEnvironment(), I checked but it ask me for the jar file,
which is weird because I am running the program directly. 

Does it mean I create a jar package and then run it via eclipse? 

If not, could you point me to some resources?

Thanks
Biplob


Sameer W wrote
> From Eclipse it creates a local environment and runs in the IDE. When the
> program finishes so does the Flink execution instance. I have never tried
> accessing the console when the program is running but one the program is
> finished there is nothing to connect to.
> 
> If you need to access the dashboard, start Flink in the pseudo-cluster
> mode
> and connect to it using the getRemoteExecutionEnvironment(). That will
> allow you to access the jobs statuses on the dashboard when you finish
> running your job.
> 
> Sameer
> 
> On Tue, Jul 19, 2016 at 6:35 AM, Biplob Biswas 

> revolutionisme@

> 
> wrote:
> 
>> Hi,
>>
>> I am running my flink program using Eclipse and I can't access the
>> dashboard
>> at http://localhost:8081, can someone help me with this?
>>
>> I read that I need to check my flink-conf.yaml, but its a maven project
>> and
>> I don't have a flink-conf.
>>
>> Any help would be really appreciated.
>>
>> Thanks a lot
>> Biplob
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> at Nabble.com.
>>





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016p8018.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse

2016-07-19 Thread Sameer W
>From Eclipse it creates a local environment and runs in the IDE. When the
program finishes so does the Flink execution instance. I have never tried
accessing the console when the program is running but one the program is
finished there is nothing to connect to.

If you need to access the dashboard, start Flink in the pseudo-cluster mode
and connect to it using the getRemoteExecutionEnvironment(). That will
allow you to access the jobs statuses on the dashboard when you finish
running your job.

Sameer

On Tue, Jul 19, 2016 at 6:35 AM, Biplob Biswas 
wrote:

> Hi,
>
> I am running my flink program using Eclipse and I can't access the
> dashboard
> at http://localhost:8081, can someone help me with this?
>
> I read that I need to check my flink-conf.yaml, but its a maven project and
> I don't have a flink-conf.
>
> Any help would be really appreciated.
>
> Thanks a lot
> Biplob
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Can't access Flink Dashboard at 8081, running Flink program using Eclipse

2016-07-19 Thread Biplob Biswas
Hi,

I am running my flink program using Eclipse and I can't access the dashboard
at http://localhost:8081, can someone help me with this?

I read that I need to check my flink-conf.yaml, but its a maven project and
I don't have a flink-conf.

Any help would be really appreciated.

Thanks a lot
Biplob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Data point goes missing within iteration

2016-07-19 Thread Biplob Biswas
Hi Ufuk,

Thanks for the update, is there any known way to fix this issue? Any
workaround that you know of, which I can try?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8015.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Unable to get the value of datatype in datastream

2016-07-19 Thread Aljoscha Krettek
Hi,
you have to ensure to filter the data that you send back on the feedback
edge, i.e. the loop.closeWith(newCentroids.broadcast()); statement needs to
take a stream that only has the centroids that you want to send back. And
you need to make sure to emit centroids with a good timestamp if you want
to preserve timestamps.

What you can also do is to union the stream of initial centroids with the
new centroids on the feedback edge, i.e:
loop.closeWith(newCentroids.union(initialCentroids).broadcast())

Cheers,
Aljoscha


On Mon, 18 Jul 2016 at 12:59 subash basnet  wrote:

> Hello all,
>
> I am trying to cluster datastream points around a centroid. My input is
> stock data where the centroid id I have taken as the timestamp of the
> stock. The error I am facing is in getting *id *of the *centroid* within
> *flatMap2*. Below is my code if you could look:
>
> ConnectedIterativeStreams loop =
> points.iterate().withFeedbackType(Centroid.class);
> DataStream newCentroids = loop.flatMap(new
> SelectNearestCenter(10)).map(new CountAppender()).keyBy(0)
> .reduce(new CentroidAccumulator()).map(new CentroidAverager());
> DataStream finalCentroids =
> loop.closeWith(newCentroids.broadcast());
>
> public static final class SelectNearestCenter implements
> CoFlatMapFunction> {
> private Centroid[] centroids;
> private int size = 0;
> private int count = 0;
> private boolean flag = true;
>
> public SelectNearestCenter(int size) {
> this.size = size;
> }
>
> @Override
> public void flatMap1(Point p, Collector> out) throws
> Exception {
> double minDistance = Double.MAX_VALUE;
> *String closestCentroidId = "-1";*
> if (centroids != null) {
> // let's assume minimum size 20 for now
> for (Centroid centroid : centroids) {
> // compute distance
> double distance = p.euclideanDistance(centroid);
> // update nearest cluster if necessary
> if (distance < minDistance) {
> minDistance = distance;
> closestCentroidId = centroid.id;
> }
> }
> }
> // emit a new record with the center id and the data point.
> out.collect(new Tuple2(closestCentroidId, p));
> }
>
> @Override
> public void flatMap2(Centroid value, Collector> out)
> throws Exception {
> if (flag) {
> centroids = new Centroid[size];
> flag = false;
> }
> if (count < size) {
> *System.out.println(value);*
> centroids[count] = value;
> count++;
> }
> }
> }
>
>
> The centroid datastreams looks as below with string timestamp as id.
> Fri Jul 15 15:30:55 CEST 2016  117.8818 117.9 117.8 117.835 1383700.0
> Fri Jul 15 15:31:58 CEST 2016  117.835 117.99 117.82 117.885 118900.0
>
> But now if I print the *centroid value *in *flatMap2* it shows with the
> id as '-1':
> -1  117.8818 117.9 117.8 117.835 1383700.0
> -1  117.5309 117.575 117.48245 117.52 707100.0
>
> This '-1' is from *flatMap1 *which get's assigned initially. To get rid
> of this if I put the out.collect statement within the if centroids is not
> null condition, it never goes inside the if condition as intially the
> centroids is null, hence the execution never comes out of *flatMap1*.
> It would be great if you could suggest what could be the probable problem
> or solution to the case.
>
>
> Best Regards,
> Subash Basnet
>


Re: Intermediate Data Caching

2016-07-19 Thread Ufuk Celebi
PS: I forgot to mention that also constant iteration input is cached.

On Mon, Jul 18, 2016 at 11:27 AM, Ufuk Celebi  wrote:
> Hey Saliya,
>
> the result of each iteration (super step) that is fed back to the
> iteration is cached. For the iterate operator that is the last partial
> solution and for the delta iterate operator it's the current solution
> set 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html).
>
> Internally, this works via custom iteration operator implementations
> for head and tail tasks, which are co-located and share a hash table.
> I think that the internals of this are not documented, you would have
> to look into the code for this. Most of the relevant implementations
> are found in the "org.apache.flink.runtime.iterative.task" package.
>
> Hope this helps...
>
> Ufuk
>
>
> On Sun, Jul 17, 2016 at 9:36 PM, Saliya Ekanayake  wrote:
>> Hi,
>>
>> I am trying to understand what's the intermediate caching support in Flink.
>> For example, when there's an iterative dataset what's being cached between
>> iterations. Is there some documentation on this?
>>
>> Thank you,
>> Saliya
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>>


Re: Data point goes missing within iteration

2016-07-19 Thread Ufuk Celebi
Unfortunately, no. It's expected for streaming iterations to loose
data (known shortcoming), but I don't see why they never see the
initial input. Maybe Gyula or Paris (they worked on this previously)
can chime in.

– Ufuk

On Tue, Jul 19, 2016 at 10:15 AM, Biplob Biswas
 wrote:
> Hi Ufuk,
>
> Did you get time to go through my issue, just wanted to follow up to see
> whether I can get a solution or not.
>
>
>
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8010.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.


Re: Issue with running Flink Python jobs on cluster

2016-07-19 Thread Maximilian Michels
Hi!

HDFS is mentioned in the docs but not explicitly listed as a requirement:
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/python.html#project-setup

I suppose the Python API could also distribute its libraries through
Flink's BlobServer.

Cheers,
Max

On Tue, Jul 19, 2016 at 9:24 AM, Chesnay Schepler 
wrote:

> Glad to hear it! The HDFS requirement should most definitely be
> documented; i assumed it already was actually...
>
>
> On 19.07.2016 03:42, Geoffrey Mon wrote:
>
> Hello Chesnay,
>
> Thank you very much! With your help I've managed to set up a Flink cluster
> that can run Python jobs successfully. I solved my issue by removing
> local=True and installing HDFS in a separate cluster.
>
> I don't think it was clearly mentioned in the documentation that HDFS was
> required for Python-running clusters. Would it be a good idea to include
> that in the documentation?
>
> Cheers,
> Geoffrey
>
> On Sun, Jul 17, 2016 at 11:58 AM Chesnay Schepler 
> wrote:
>
>> well now i know what the problem could be.
>>
>> You are trying to execute a job on a cluster (== not local), but have set
>> the local flag to true.
>> env.execute(local=True)
>>
>> Due to this flag the files are only copied into the tmp directory of the
>> node where you execute the plan, and are thus not accessible from other
>> worker nodes.
>>
>> In order to use the Python API on a cluster you *must* have a filesystem
>> that is accessible by all workers (like HDFS) to which the files can be
>> copied. From there they can be distributed to the nodes via the DC.
>>
>>
>> On 17.07.2016 17:33, Geoffrey Mon wrote:
>>
>> I haven't yet figured out how to write a Java job to test
>> DistributedCache functionality between machines; I've only gotten worker
>> nodes to create caches from local files (on the same worker nodes), rather
>> than on files from the master node. The DistributedCache test I've been
>> using (based on the DistributedCacheTest unit test) is here:
>> https://gist.github.com/GEOFBOT/041d76b47f08919305493f57ebdde0f7
>>
>> I realized that this test only tested local files because I was getting
>> an error that the file used for the cache was not found until I created
>> that file on the worker node in the location specified in the plan.
>>
>> I've been trying to run a simple Python example that does word counting:
>> https://gist.github.com/GEOFBOT/dbdc30120fb4d71383d9e3eff5f93c1f
>>
>> I've tried three different setups so far: I've tried virtual machines,
>> AWS virtual machine instances, and physical machines. With each setup, I
>> get the same errors.
>>
>> Although with all three of these setups, basic Java jobs can be run (like
>> WordCount, PageRank), Python programs cannot be run because the files
>> needed to run them are not properly distributed to the worker nodes. I've
>> found that although the master node reads the Python libraries and plan
>> files (presumably to send them to the worker), the worker node never writes
>> any of those files to disk, despite the files being added to the list of
>> files in the distributed cache via DistributedCache.writeFileInfotoConfig
>> (which I found via remote debugging).
>>
>> When a Python program is run via pyflink, it executes but crashes as soon
>> as there is any sort of operation requiring mapping. The following
>> exception is thrown:
>>
>> 2016-07-17 09:39:50,857 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph-
>> MapPartition (PythonFlatMap -> PythonMap) (1/1)
>> (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED
>> 2016-07-17 09:39:50,863 INFO
>>  org.apache.flink.runtime.jobmanager.JobManager- Status of
>> job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49
>> EDT 2016) changed to FAILING.
>> java.lang.Exception: The user defined 'open()' method caused an
>> exception: An error occurred while copying the file.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)
>> 
>> Caused by: java.lang.RuntimeException: An error occurred while copying
>> the file.
>> at
>> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
>> at
>> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102)
>> 
>> Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not
>> exist or the user running Flink ('gmon') has insufficient permissions to
>> access it.
>> at
>> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109)
>> at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
>> at
>> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)
>> 
>> ... 1 more
>>
>> If the pyflink library is manually copied into place at /tmp/flink, that
>> error will be replaced by the following:
>>
>> 2016-07-17 00:10:54,342 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph-

Re: Data point goes missing within iteration

2016-07-19 Thread Biplob Biswas
Hi Ufuk,

Did you get time to go through my issue, just wanted to follow up to see
whether I can get a solution or not.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8010.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


DataStreamUtils not working properly

2016-07-19 Thread subash basnet
Hello all,

I am trying to convert datastream to collection, but it's shows blank
result. There is a stream of data which can be viewed on the console on
print(), but the collection of the same stream shows empty after
conversion. Below is the code:

DataStream centroids = newCentroidDataStream.map(new
TupleCentroidConverter());
centroids.print();
Iterator iter = DataStreamUtils.collect(centroids);
Collection testCentroids = Lists.newArrayList(iter);
for(Centroid c: testCentroids){
System.out.println(c);
}

The above *centroids.print()* gives the following output in console:

Mon Jul 18 21:29:01 CEST 2016  119.3701 119.4 119.3701 119.38 27400.0
Mon Jul 18 21:23:00 CEST 2016  119.3463 119.37 119.315 119.37 48200.0
Mon Jul 18 21:27:59 CEST 2016  119.3401 119.3401 119.26 119.265 50300.0
Mon Jul 18 21:36:00 CEST 2016  119.48 119.505 119.47 119.4741 37400.0
Mon Jul 18 21:33:00 CEST 2016  119.535 119.54 119.445 119.455 152900.0

But the next *System.out.println(c) *within the for loop prints nothing.
What could be the problem.

My maven has following configuration for dataStreamUtils:

org.apache.flink
flink-streaming-contrib_2.10
${flink.version}



Best Regards,
Subash Basnet


Parallelizing openCV libraries in Flink

2016-07-19 Thread Debaditya Roy
Hello users,

I am currently doing a project in image processing with Open CV library.
Have anyone here faced any issue with parallelizing the library in flink? I
have written a code which is running fine on local environment, however
when I try to run it in distributed environment it writes (it was supposed
to write some result) in the sink files. I suspect that it is having
problem with reading the video file which I have supplied the source
directory.
Any comments and similar experience will be extremely helpful.

Warm Regards,
Debaditya


Re: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?

2016-07-19 Thread Ufuk Celebi
Feel free to do the contribution at any time you like. We can also
always make it part of a bugfix release if it does not make it into
the upcoming 1.1 RC (probably end of this week or beginning of next).
Feel free to ping me if you need any feed back or pointers.

– Ufuk


On Mon, Jul 18, 2016 at 9:52 PM, Clifford Resnick
 wrote:
> In 1.1, AbstractYarnClusterDescriptor pushes contents of flink/lib (local to 
> where the yarn app is launched) to Yarn with a single directory copy. In 
> 1.0.3 it looked like it was copying the individual jars.
>
> So, yes I did actually change HDFSCopyToLocal, which was easy, but the job 
> staging in the above class also needs altering. I’m happy to contribute on 
> both though I won’t be able to get to it until later this week.
>
> -Cliff
>
>
>
> On 7/18/16, 3:38 PM, "Ufuk Celebi"  wrote:
>
> Hey Cliff! Good to see that we came to the same conclusion :-) What do
> you mean with copying of the "lib" folder? This issue should be the
> same for both 1.0 and 1.1. Another work around could be to use the
> fully async RocksDB snapshots with Flink 1.1-SNAPSHOT.
>
> If you like, you could also work on the issue I've created by
> implementing the recursive File copy in Flink (in HDFSCopyToLocal) and
> contribute this via a pull request.
>
> – Ufuk
>
>
> On Mon, Jul 18, 2016 at 7:22 PM, Clifford Resnick
>  wrote:
> > Hi Ufuk,
> >
> > My mail was down, so I missed this response. Thanks for that.
> >
> > On 7/18/16, 10:38 AM, "Ufuk Celebi"  wrote:
> >
> > Hey Cliff!
> >
> > I was able to reproduce this by locally running a job and RocksDB 
> semi
> > asynchronous checkpoints (current default) to S3A. I've created an
> > issue here: https://issues.apache.org/jira/browse/FLINK-4228.
> >
> > Running with S3N it is working as expected. You can use that
> > implementation as a work around. I don't know whether it's possible 
> to
> > disable creation of MD5 hashes for S3A.
> >
> > – Ufuk
> >
> > On Sat, Jul 16, 2016 at 6:26 PM, Clifford Resnick
> >  wrote:
> > > Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4
> > >
> > >
> > >
> > > The error I’m getting is :
> > >
> > >
> > >
> > > 11:05:44,425 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask
> > > - Caught exception while materializing asynchronous checkpoints.
> > >
> > > com.amazonaws.AmazonClientException: Unable to calculate MD5 hash:
> > > 
> /var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1
> > > (Is a directory)
> > >
> > > at
> > > 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266)
> > >
> > > at
> > > 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
> > >
> > > at
> > > 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
> > >
> > > at
> > > 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
> > >
> > > at
> > > 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
> > >
> > > at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > >
> > > at
> > > 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> > >
> > > at
> > > 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> > >
> > > at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > >
> > > In the debugger I noticed that some of the uploaded checkpoints 
> are from the
> > > configured /tmp location. These succeed as file in the request is 
> fully
> > > qualified, but I guess it’s different for WindowOperators? Here 
> the file in
> > > the request (using a different /var/folders.. location not 
> configured by me
> > > – must be a mac thing?) is actually a directory. The AWS api is 
> failing when
> > > it tries to calculate an MD5 of the directory. The Flink side of 
> the
> > > codepath is hard to discern from debugging because it’s 
> asynchronous.
> > >
> > >
> > >
> > > I get the same issue whether local or on a 

Re: Issue with running Flink Python jobs on cluster

2016-07-19 Thread Chesnay Schepler
Glad to hear it! The HDFS requirement should most definitely be 
documented; i assumed it already was actually...


On 19.07.2016 03:42, Geoffrey Mon wrote:

Hello Chesnay,

Thank you very much! With your help I've managed to set up a Flink 
cluster that can run Python jobs successfully. I solved my issue by 
removing local=True and installing HDFS in a separate cluster.


I don't think it was clearly mentioned in the documentation that HDFS 
was required for Python-running clusters. Would it be a good idea to 
include that in the documentation?


Cheers,
Geoffrey

On Sun, Jul 17, 2016 at 11:58 AM Chesnay Schepler > wrote:


well now i know what the problem could be.

You are trying to execute a job on a cluster (== not local), but
have set the local flag to true.
env.execute(local=True)

Due to this flag the files are only copied into the tmp directory
of the node where you execute the plan, and are thus not
accessible from other worker nodes.

In order to use the Python API on a cluster you *must* have a
filesystem that is accessible by all workers (like HDFS) to which
the files can be copied. From there they can be distributed to the
nodes via the DC.


On 17.07.2016 17:33, Geoffrey Mon wrote:

I haven't yet figured out how to write a Java job to test
DistributedCache functionality between machines; I've only gotten
worker nodes to create caches from local files (on the same
worker nodes), rather than on files from the master node. The
DistributedCache test I've been using (based on the
DistributedCacheTest unit test) is here:
https://gist.github.com/GEOFBOT/041d76b47f08919305493f57ebdde0f7

I realized that this test only tested local files because I was
getting an error that the file used for the cache was not found
until I created that file on the worker node in the location
specified in the plan.

I've been trying to run a simple Python example that does word
counting:
https://gist.github.com/GEOFBOT/dbdc30120fb4d71383d9e3eff5f93c1f

I've tried three different setups so far: I've tried virtual
machines, AWS virtual machine instances, and physical machines.
With each setup, I get the same errors.

Although with all three of these setups, basic Java jobs can be
run (like WordCount, PageRank), Python programs cannot be run
because the files needed to run them are not properly distributed
to the worker nodes. I've found that although the master node
reads the Python libraries and plan files (presumably to send
them to the worker), the worker node never writes any of those
files to disk, despite the files being added to the list of files
in the distributed cache via
DistributedCache.writeFileInfotoConfig (which I found via remote
debugging).

When a Python program is run via pyflink, it executes but crashes
as soon as there is any sort of operation requiring mapping. The
following exception is thrown:

2016-07-17 09:39:50,857 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph-
MapPartition (PythonFlatMap -> PythonMap) (1/1)
(12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED
2016-07-17 09:39:50,863 INFO
 org.apache.flink.runtime.jobmanager.JobManager  - Status
of job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun
Jul 17 09:39:49 EDT 2016) changed to FAILING.
java.lang.Exception: The user defined 'open()' method caused an
exception: An error occurred while copying the file.
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481)

Caused by: java.lang.RuntimeException: An error occurred while
copying the file.
at

org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78)
at

org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102)

Caused by: java.io.FileNotFoundException: File file:/tmp/flink
does not exist or the user running Flink ('gmon') has
insufficient permissions to access it.
at

org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109)
at
org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242)
at

org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322)

... 1 more

If the pyflink library is manually copied into place at
/tmp/flink, that error will be replaced by the following:

2016-07-17 00:10:54,342 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph-
MapPartition (PythonFlatMap -> PythonMap) (1/1)
(23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED
2016-07-17 00:10:54,348 INFO
 org.apache.flink.runtime.jobmanager.JobManager-
Status of job e072403ffec32bd14b54416b53cb46ae (Flink