Using Kafka and Flink for batch processing of a batch data source
I am currently working on an architecture for a big data streaming and batch processing platform. I am planning on using Apache Kafka for a distributed messaging system to handle data from streaming data sources and then pass on to Apache Flink for stream processing. I would also like to use Flink's batch processing capabilities to process batch data. Does it make sense to pass the batched data through Kafka on a periodic basis as a source for Flink batch processing (is this even possible?) or should I just write the batch data to a data store and then process by reading into Flink? | All rights in this email and any attached documents or files are expressly reserved. This e-mail, and any files transmitted with it, contains confidential information which may be subject to legal privilege. If you are not the intended recipient, please delete it and notify Palamir Pty Ltd by e-mail. Palamir Pty Ltd does not warrant this transmission or attachments are free from viruses or similar malicious code and does not accept liability for any consequences to the recipient caused by opening or using this e-mail. For the legal protection of our business, any email sent or received by us may be monitored or intercepted. | Please consider the environment before printing this email. |
Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse
Thanks a ton, Till. That worked. Thank you so much. -Biplob -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016p8035.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Aggregate events in time window
Hi Dominique, your problem sounds like a good use case for session windows [1, 2]. If you know that there is only a maximum gap between your request and response message, then you could create a session window via: input .keyBy("ReqRespID") .window(EventTimeSessionWindows.withGap(Time.minutes(MaxTimeBetweenReqResp))) .(/* calculate time */); [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#session-windows [2] http://data-artisans.com/session-windowing-in-flink/ Cheers, Till On Tue, Jul 19, 2016 at 7:04 PM, Sameer Wwrote: > How about using EventTime windows with watermark assignment and bounded > delays. That way you allow more than 5 minutes (bounded delay) for your > request and responses to arrive. Do you have a way to assign timestamp to > the responses based on the request timestamp (does the response contain the > request timestamp in some form). That way you add them to the same window. > > Sameer > > On Tue, Jul 19, 2016 at 12:31 PM, Dominique Rondé < > dominique.ro...@allsecur.de> wrote: > >> Hi all, >> >> once again I need a "kick" to the right direction. I have a datastream >> with request and responses identified by an ReqResp-ID. I like to calculate >> the (avg, 95%, 99%) time between the request and response and also like to >> count them. I thought of >> ".keyBy("ReqRespID").timeWindowAll(Time.minutes(5)).apply(function)" would >> do the job, but there are some cases were a Request is in the first and the >> Response is in the second window. But if i use a overlapping time window >> (i.e. timeWindowAll(Time.minutes(5),Time.seconds(60))) I have a lot of >> requests more then one time in the apply-function. >> >> Do you have any hint for me? >> >> Thanks a lot! >> >> Dominique >> >> >
Re: DataStreamUtils not working properly
Hello Till, Yup I can see the log output in my console, but there is no information there regarding if there is any error in conversion. Just normal warn and info as below: 22:09:16,676 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been specified, using default state backend (Memory / JobManager) 22:09:16,676 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoint to jobmanager) The above message is always there when I run my project. It would be great if someone could check why the collection of datastream via DataStreamUtils is giving empty result. Best Regards, Subash Basnet On Tue, Jul 19, 2016 at 4:52 PM, Till Rohrmannwrote: > It depends if you have a log4j.properties file specified in your > classpath. If you see log output on the console, then it should also print > errors there. > > Cheers, > Till > > On Tue, Jul 19, 2016 at 3:08 PM, subash basnet wrote: > >> Hello Till, >> >> Shouldn't it write something in the eclipse console if there is any error >> or warning. But nothing about error is printed on the console. And I >> checked the flink project folder: flink-core, flink streaming as such but >> couldn't find where the log is written when run via eclipse. >> >> Best Regards, >> Subash Basnet >> >> On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann >> wrote: >> >>> Have you checked your logs whether they contain some problems? In >>> general it is not recommended collecting the streaming result back to your >>> client. It might also be a problem with `DataStreamUtils.collect`. >>> >>> Cheers, >>> Till >>> >>> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet >>> wrote: >>> Hello all, I tried to check if it works for tuple but same problem, the collection still shows blank result. I took the id of centroid tuple and printed it, but the collection displays empty. DataStream centroids = newCentroidDataStream.map(new TupleCentroidConverter()); DataStream centroidId = centroids.map(new TestMethod()); centroidId.print(); Iterator iter = DataStreamUtils.collect(centroidId); Collection testCentroids = Lists.newArrayList(iter); for (Tuple1 c : testCentroids) { System.out.println(c); } Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), but no output for System.out.println(c); Best Regards, Subash Basnet On Tue, Jul 19, 2016 at 10:48 AM, subash basnet wrote: > Hello all, > > I am trying to convert datastream to collection, but it's shows blank > result. There is a stream of data which can be viewed on the console on > print(), but the collection of the same stream shows empty after > conversion. Below is the code: > > DataStream centroids = newCentroidDataStream.map(new > TupleCentroidConverter()); > centroids.print(); > Iterator iter = DataStreamUtils.collect(centroids); > Collection testCentroids = Lists.newArrayList(iter); > for(Centroid c: testCentroids){ > System.out.println(c); > } > > The above *centroids.print()* gives the following output in console: > > Mon Jul 18 21:29:01 CEST 2016 119.3701 119.4 119.3701 119.38 27400.0 > Mon Jul 18 21:23:00 CEST 2016 119.3463 119.37 119.315 119.37 48200.0 > Mon Jul 18 21:27:59 CEST 2016 119.3401 119.3401 119.26 119.265 50300.0 > Mon Jul 18 21:36:00 CEST 2016 119.48 119.505 119.47 119.4741 37400.0 > Mon Jul 18 21:33:00 CEST 2016 119.535 119.54 119.445 119.455 152900.0 > > But the next *System.out.println(c) *within the for loop prints > nothing. What could be the problem. > > My maven has following configuration for dataStreamUtils: > > org.apache.flink > flink-streaming-contrib_2.10 > ${flink.version} > > > > Best Regards, > Subash Basnet > > >>> >> >
Re: Aggregate events in time window
How about using EventTime windows with watermark assignment and bounded delays. That way you allow more than 5 minutes (bounded delay) for your request and responses to arrive. Do you have a way to assign timestamp to the responses based on the request timestamp (does the response contain the request timestamp in some form). That way you add them to the same window. Sameer On Tue, Jul 19, 2016 at 12:31 PM, Dominique Rondé < dominique.ro...@allsecur.de> wrote: > Hi all, > > once again I need a "kick" to the right direction. I have a datastream > with request and responses identified by an ReqResp-ID. I like to calculate > the (avg, 95%, 99%) time between the request and response and also like to > count them. I thought of > ".keyBy("ReqRespID").timeWindowAll(Time.minutes(5)).apply(function)" would > do the job, but there are some cases were a Request is in the first and the > Response is in the second window. But if i use a overlapping time window > (i.e. timeWindowAll(Time.minutes(5),Time.seconds(60))) I have a lot of > requests more then one time in the apply-function. > > Do you have any hint for me? > > Thanks a lot! > > Dominique > >
Aggregate events in time window
Hi all, once again I need a "kick" to the right direction. I have a datastream with request and responses identified by an ReqResp-ID. I like to calculate the (avg, 95%, 99%) time between the request and response and also like to count them. I thought of ".keyBy("ReqRespID").timeWindowAll(Time.minutes(5)).apply(function)" would do the job, but there are some cases were a Request is in the first and the Response is in the second window. But if i use a overlapping time window (i.e. timeWindowAll(Time.minutes(5),Time.seconds(60))) I have a lot of requests more then one time in the apply-function. Do you have any hint for me? Thanks a lot! Dominique
Re: DataStreamUtils not working properly
It depends if you have a log4j.properties file specified in your classpath. If you see log output on the console, then it should also print errors there. Cheers, Till On Tue, Jul 19, 2016 at 3:08 PM, subash basnetwrote: > Hello Till, > > Shouldn't it write something in the eclipse console if there is any error > or warning. But nothing about error is printed on the console. And I > checked the flink project folder: flink-core, flink streaming as such but > couldn't find where the log is written when run via eclipse. > > Best Regards, > Subash Basnet > > On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmann > wrote: > >> Have you checked your logs whether they contain some problems? In general >> it is not recommended collecting the streaming result back to your client. >> It might also be a problem with `DataStreamUtils.collect`. >> >> Cheers, >> Till >> >> On Tue, Jul 19, 2016 at 2:42 PM, subash basnet >> wrote: >> >>> Hello all, >>> >>> I tried to check if it works for tuple but same problem, the collection >>> still shows blank result. I took the id of centroid tuple and printed it, >>> but the collection displays empty. >>> >>> DataStream centroids = newCentroidDataStream.map(new >>> TupleCentroidConverter()); >>> DataStream centroidId = centroids.map(new TestMethod()); >>> centroidId.print(); >>> Iterator iter = DataStreamUtils.collect(centroidId); >>> Collection testCentroids = Lists.newArrayList(iter); >>> for (Tuple1 c : testCentroids) { >>> System.out.println(c); >>> } >>> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) >>> (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 >>> 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), >>> but no output for System.out.println(c); Best Regards, Subash Basnet >>> >>> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet >>> wrote: >>> Hello all, I am trying to convert datastream to collection, but it's shows blank result. There is a stream of data which can be viewed on the console on print(), but the collection of the same stream shows empty after conversion. Below is the code: DataStream centroids = newCentroidDataStream.map(new TupleCentroidConverter()); centroids.print(); Iterator iter = DataStreamUtils.collect(centroids); Collection testCentroids = Lists.newArrayList(iter); for(Centroid c: testCentroids){ System.out.println(c); } The above *centroids.print()* gives the following output in console: Mon Jul 18 21:29:01 CEST 2016 119.3701 119.4 119.3701 119.38 27400.0 Mon Jul 18 21:23:00 CEST 2016 119.3463 119.37 119.315 119.37 48200.0 Mon Jul 18 21:27:59 CEST 2016 119.3401 119.3401 119.26 119.265 50300.0 Mon Jul 18 21:36:00 CEST 2016 119.48 119.505 119.47 119.4741 37400.0 Mon Jul 18 21:33:00 CEST 2016 119.535 119.54 119.445 119.455 152900.0 But the next *System.out.println(c) *within the for loop prints nothing. What could be the problem. My maven has following configuration for dataStreamUtils: org.apache.flink flink-streaming-contrib_2.10 ${flink.version} Best Regards, Subash Basnet >>> >> >
Re: Parallelizing openCV libraries in Flink
Hello, I cannot have an access to the web interface from the nodes I am using. However I will check the logs for anything suspicious and get back. Thanks :-) Regards, Debaditya On Tue, Jul 19, 2016 at 4:46 PM, Till Rohrmannwrote: > Hi Debaditya, > > you can see in the web interface how much data each source has sent to the > downstream tasks and how much data was consumed by the sinks. This should > tell you whether your sources have actually read some data. You can also > check the log files whether you find anything suspicious there. > > Cheers, > Till > > On Tue, Jul 19, 2016 at 10:33 AM, Debaditya Roy > wrote: > >> Hello users, >> >> I am currently doing a project in image processing with Open CV library. >> Have anyone here faced any issue with parallelizing the library in flink? I >> have written a code which is running fine on local environment, however >> when I try to run it in distributed environment it writes (it was supposed >> to write some result) in the sink files. I suspect that it is having >> problem with reading the video file which I have supplied the source >> directory. >> Any comments and similar experience will be extremely helpful. >> >> Warm Regards, >> Debaditya >> > >
Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse
Hi Biplob, if you want to start the web interface from within your IDE, then you have to create a local execution environment as Ufuk told you: Configuration config = new Configuration(); config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(getP, config); and you have to add the following dependency to your pom.xml: org.apache.flink flink-runtime-web_2.10 ${flink.version} Cheers, Till On Tue, Jul 19, 2016 at 2:27 PM, Sameer Wwrote: > Yes you have to provide the path of your jar. The reason is: > 1. When you start in the pseudo-cluster mode the tasks are started in > their own JVM's with their own class loader. > 2. You client program has access to your custom operator classes but the > remote JVM's don't. Hence you need to ship the JAR file to these remote > Task nodes. The getRemoteExcecutionEnvironment() method has overloaded > version which takes a JAR file. Just provide your local path to it and it > will ship it when it starts > > Sameer > > On Tue, Jul 19, 2016 at 6:51 AM, Biplob Biswas > wrote: > >> Hi Sameer, >> >> Thanks for that quick reply, I was using flink streaming so the program >> keeps on running until i close it. But anyway I am ready to try this >> getRemoteExecutionEnvironment(), I checked but it ask me for the jar file, >> which is weird because I am running the program directly. >> >> Does it mean I create a jar package and then run it via eclipse? >> >> If not, could you point me to some resources? >> >> Thanks >> Biplob >> >> >> Sameer W wrote >> > From Eclipse it creates a local environment and runs in the IDE. When >> the >> > program finishes so does the Flink execution instance. I have never >> tried >> > accessing the console when the program is running but one the program is >> > finished there is nothing to connect to. >> > >> > If you need to access the dashboard, start Flink in the pseudo-cluster >> > mode >> > and connect to it using the getRemoteExecutionEnvironment(). That will >> > allow you to access the jobs statuses on the dashboard when you finish >> > running your job. >> > >> > Sameer >> > >> > On Tue, Jul 19, 2016 at 6:35 AM, Biplob Biswas >> >> > revolutionisme@ >> >> > >> > wrote: >> > >> >> Hi, >> >> >> >> I am running my flink program using Eclipse and I can't access the >> >> dashboard >> >> at http://localhost:8081, can someone help me with this? >> >> >> >> I read that I need to check my flink-conf.yaml, but its a maven project >> >> and >> >> I don't have a flink-conf. >> >> >> >> Any help would be really appreciated. >> >> >> >> Thanks a lot >> >> Biplob >> >> >> >> >> >> >> >> -- >> >> View this message in context: >> >> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html >> >> Sent from the Apache Flink User Mailing List archive. mailing list >> >> archive >> >> at Nabble.com. >> >> >> >> >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016p8018.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> > >
Re: Intermediate Data Caching
Thank you, Ufuk! On Tue, Jul 19, 2016 at 5:51 AM, Ufuk Celebiwrote: > PS: I forgot to mention that also constant iteration input is cached. > > On Mon, Jul 18, 2016 at 11:27 AM, Ufuk Celebi wrote: > > Hey Saliya, > > > > the result of each iteration (super step) that is fed back to the > > iteration is cached. For the iterate operator that is the last partial > > solution and for the delta iterate operator it's the current solution > > set ( > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html > ). > > > > Internally, this works via custom iteration operator implementations > > for head and tail tasks, which are co-located and share a hash table. > > I think that the internals of this are not documented, you would have > > to look into the code for this. Most of the relevant implementations > > are found in the "org.apache.flink.runtime.iterative.task" package. > > > > Hope this helps... > > > > Ufuk > > > > > > On Sun, Jul 17, 2016 at 9:36 PM, Saliya Ekanayake > wrote: > >> Hi, > >> > >> I am trying to understand what's the intermediate caching support in > Flink. > >> For example, when there's an iterative dataset what's being cached > between > >> iterations. Is there some documentation on this? > >> > >> Thank you, > >> Saliya > >> > >> -- > >> Saliya Ekanayake > >> Ph.D. Candidate | Research Assistant > >> School of Informatics and Computing | Digital Science Center > >> Indiana University, Bloomington > >> > -- Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington
Re: DataStreamUtils not working properly
Hello Till, Shouldn't it write something in the eclipse console if there is any error or warning. But nothing about error is printed on the console. And I checked the flink project folder: flink-core, flink streaming as such but couldn't find where the log is written when run via eclipse. Best Regards, Subash Basnet On Tue, Jul 19, 2016 at 2:49 PM, Till Rohrmannwrote: > Have you checked your logs whether they contain some problems? In general > it is not recommended collecting the streaming result back to your client. > It might also be a problem with `DataStreamUtils.collect`. > > Cheers, > Till > > On Tue, Jul 19, 2016 at 2:42 PM, subash basnet wrote: > >> Hello all, >> >> I tried to check if it works for tuple but same problem, the collection >> still shows blank result. I took the id of centroid tuple and printed it, >> but the collection displays empty. >> >> DataStream centroids = newCentroidDataStream.map(new >> TupleCentroidConverter()); >> DataStream centroidId = centroids.map(new TestMethod()); >> centroidId.print(); >> Iterator iter = DataStreamUtils.collect(centroidId); >> Collection testCentroids = Lists.newArrayList(iter); >> for (Tuple1 c : testCentroids) { >> System.out.println(c); >> } >> Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) >> (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 >> 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), >> but no output for System.out.println(c); Best Regards, Subash Basnet >> >> On Tue, Jul 19, 2016 at 10:48 AM, subash basnet >> wrote: >> >>> Hello all, >>> >>> I am trying to convert datastream to collection, but it's shows blank >>> result. There is a stream of data which can be viewed on the console on >>> print(), but the collection of the same stream shows empty after >>> conversion. Below is the code: >>> >>> DataStream centroids = newCentroidDataStream.map(new >>> TupleCentroidConverter()); >>> centroids.print(); >>> Iterator iter = DataStreamUtils.collect(centroids); >>> Collection testCentroids = Lists.newArrayList(iter); >>> for(Centroid c: testCentroids){ >>> System.out.println(c); >>> } >>> >>> The above *centroids.print()* gives the following output in console: >>> >>> Mon Jul 18 21:29:01 CEST 2016 119.3701 119.4 119.3701 119.38 27400.0 >>> Mon Jul 18 21:23:00 CEST 2016 119.3463 119.37 119.315 119.37 48200.0 >>> Mon Jul 18 21:27:59 CEST 2016 119.3401 119.3401 119.26 119.265 50300.0 >>> Mon Jul 18 21:36:00 CEST 2016 119.48 119.505 119.47 119.4741 37400.0 >>> Mon Jul 18 21:33:00 CEST 2016 119.535 119.54 119.445 119.455 152900.0 >>> >>> But the next *System.out.println(c) *within the for loop prints >>> nothing. What could be the problem. >>> >>> My maven has following configuration for dataStreamUtils: >>> >>> org.apache.flink >>> flink-streaming-contrib_2.10 >>> ${flink.version} >>> >>> >>> >>> Best Regards, >>> Subash Basnet >>> >>> >> >
Re: DataStreamUtils not working properly
Have you checked your logs whether they contain some problems? In general it is not recommended collecting the streaming result back to your client. It might also be a problem with `DataStreamUtils.collect`. Cheers, Till On Tue, Jul 19, 2016 at 2:42 PM, subash basnetwrote: > Hello all, > > I tried to check if it works for tuple but same problem, the collection > still shows blank result. I took the id of centroid tuple and printed it, > but the collection displays empty. > > DataStream centroids = newCentroidDataStream.map(new > TupleCentroidConverter()); > DataStream centroidId = centroids.map(new TestMethod()); > centroidId.print(); > Iterator iter = DataStreamUtils.collect(centroidId); > Collection testCentroids = Lists.newArrayList(iter); > for (Tuple1 c : testCentroids) { > System.out.println(c); > } > Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) > (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 > 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), > but no output for System.out.println(c); Best Regards, Subash Basnet > > On Tue, Jul 19, 2016 at 10:48 AM, subash basnet > wrote: > >> Hello all, >> >> I am trying to convert datastream to collection, but it's shows blank >> result. There is a stream of data which can be viewed on the console on >> print(), but the collection of the same stream shows empty after >> conversion. Below is the code: >> >> DataStream centroids = newCentroidDataStream.map(new >> TupleCentroidConverter()); >> centroids.print(); >> Iterator iter = DataStreamUtils.collect(centroids); >> Collection testCentroids = Lists.newArrayList(iter); >> for(Centroid c: testCentroids){ >> System.out.println(c); >> } >> >> The above *centroids.print()* gives the following output in console: >> >> Mon Jul 18 21:29:01 CEST 2016 119.3701 119.4 119.3701 119.38 27400.0 >> Mon Jul 18 21:23:00 CEST 2016 119.3463 119.37 119.315 119.37 48200.0 >> Mon Jul 18 21:27:59 CEST 2016 119.3401 119.3401 119.26 119.265 50300.0 >> Mon Jul 18 21:36:00 CEST 2016 119.48 119.505 119.47 119.4741 37400.0 >> Mon Jul 18 21:33:00 CEST 2016 119.535 119.54 119.445 119.455 152900.0 >> >> But the next *System.out.println(c) *within the for loop prints nothing. >> What could be the problem. >> >> My maven has following configuration for dataStreamUtils: >> >> org.apache.flink >> flink-streaming-contrib_2.10 >> ${flink.version} >> >> >> >> Best Regards, >> Subash Basnet >> >> >
Re: DataStreamUtils not working properly
Hello all, I tried to check if it works for tuple but same problem, the collection still shows blank result. I took the id of centroid tuple and printed it, but the collection displays empty. DataStream centroids = newCentroidDataStream.map(new TupleCentroidConverter()); DataStreamcentroidId = centroids.map(new TestMethod()); centroidId.print(); Iterator iter = DataStreamUtils.collect(centroidId); Collection testCentroids = Lists.newArrayList(iter); for (Tuple1 c : testCentroids) { System.out.println(c); } Output: (Mon Jul 18 17:36:03 CEST 2016) (Mon Jul 18 17:43:58 CEST 2016) (Mon Jul 18 17:42:59 CEST 2016) (Mon Jul 18 17:34:01 CEST 2016) (Mon Jul 18 17:52:00 CEST 2016) (Mon Jul 18 17:40:58 CEST 2016) for centroidId.print(), but no output for System.out.println(c); Best Regards, Subash Basnet On Tue, Jul 19, 2016 at 10:48 AM, subash basnet wrote: > Hello all, > > I am trying to convert datastream to collection, but it's shows blank > result. There is a stream of data which can be viewed on the console on > print(), but the collection of the same stream shows empty after > conversion. Below is the code: > > DataStream centroids = newCentroidDataStream.map(new > TupleCentroidConverter()); > centroids.print(); > Iterator iter = DataStreamUtils.collect(centroids); > Collection testCentroids = Lists.newArrayList(iter); > for(Centroid c: testCentroids){ > System.out.println(c); > } > > The above *centroids.print()* gives the following output in console: > > Mon Jul 18 21:29:01 CEST 2016 119.3701 119.4 119.3701 119.38 27400.0 > Mon Jul 18 21:23:00 CEST 2016 119.3463 119.37 119.315 119.37 48200.0 > Mon Jul 18 21:27:59 CEST 2016 119.3401 119.3401 119.26 119.265 50300.0 > Mon Jul 18 21:36:00 CEST 2016 119.48 119.505 119.47 119.4741 37400.0 > Mon Jul 18 21:33:00 CEST 2016 119.535 119.54 119.445 119.455 152900.0 > > But the next *System.out.println(c) *within the for loop prints nothing. > What could be the problem. > > My maven has following configuration for dataStreamUtils: > > org.apache.flink > flink-streaming-contrib_2.10 > ${flink.version} > > > > Best Regards, > Subash Basnet > >
Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse
Yes you have to provide the path of your jar. The reason is: 1. When you start in the pseudo-cluster mode the tasks are started in their own JVM's with their own class loader. 2. You client program has access to your custom operator classes but the remote JVM's don't. Hence you need to ship the JAR file to these remote Task nodes. The getRemoteExcecutionEnvironment() method has overloaded version which takes a JAR file. Just provide your local path to it and it will ship it when it starts Sameer On Tue, Jul 19, 2016 at 6:51 AM, Biplob Biswaswrote: > Hi Sameer, > > Thanks for that quick reply, I was using flink streaming so the program > keeps on running until i close it. But anyway I am ready to try this > getRemoteExecutionEnvironment(), I checked but it ask me for the jar file, > which is weird because I am running the program directly. > > Does it mean I create a jar package and then run it via eclipse? > > If not, could you point me to some resources? > > Thanks > Biplob > > > Sameer W wrote > > From Eclipse it creates a local environment and runs in the IDE. When the > > program finishes so does the Flink execution instance. I have never tried > > accessing the console when the program is running but one the program is > > finished there is nothing to connect to. > > > > If you need to access the dashboard, start Flink in the pseudo-cluster > > mode > > and connect to it using the getRemoteExecutionEnvironment(). That will > > allow you to access the jobs statuses on the dashboard when you finish > > running your job. > > > > Sameer > > > > On Tue, Jul 19, 2016 at 6:35 AM, Biplob Biswas > > > revolutionisme@ > > > > > wrote: > > > >> Hi, > >> > >> I am running my flink program using Eclipse and I can't access the > >> dashboard > >> at http://localhost:8081, can someone help me with this? > >> > >> I read that I need to check my flink-conf.yaml, but its a maven project > >> and > >> I don't have a flink-conf. > >> > >> Any help would be really appreciated. > >> > >> Thanks a lot > >> Biplob > >> > >> > >> > >> -- > >> View this message in context: > >> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html > >> Sent from the Apache Flink User Mailing List archive. mailing list > >> archive > >> at Nabble.com. > >> > > > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016p8018.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >
Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse
Thanks Ufuk, for the input. I tried what u suggested as well ( as follows) Configuration config = new Configuration(); config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(getP, config); But still i get nothing, i tried 127.0.0.1:8081 and localhost:8081 both as I was getting this message in my log "No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name." But still nothing. Anything else I can try, before installing flink and running it via command line? Thanks for such quick replie btw :) Biplob Ufuk Celebi wrote > You can explicitly create a LocalEnvironment and provide a Configuration: > > Configuration config = new Configuration(); > config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); > > ExecutionEnvironment env = new LocalEnvironment(config); > ... > > > On Tue, Jul 19, 2016 at 1:28 PM, Sameer W > sameer@ > wrote: >> From Eclipse it creates a local environment and runs in the IDE. When the >> program finishes so does the Flink execution instance. I have never tried >> accessing the console when the program is running but one the program is >> finished there is nothing to connect to. >> >> If you need to access the dashboard, start Flink in the pseudo-cluster >> mode >> and connect to it using the getRemoteExecutionEnvironment(). That will >> allow >> you to access the jobs statuses on the dashboard when you finish running >> your job. >> >> Sameer >> >> On Tue, Jul 19, 2016 at 6:35 AM, Biplob Biswas > revolutionisme@ > >> wrote: >>> >>> Hi, >>> >>> I am running my flink program using Eclipse and I can't access the >>> dashboard >>> at http://localhost:8081, can someone help me with this? >>> >>> I read that I need to check my flink-conf.yaml, but its a maven project >>> and >>> I don't have a flink-conf. >>> >>> Any help would be really appreciated. >>> >>> Thanks a lot >>> Biplob >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html >>> Sent from the Apache Flink User Mailing List archive. mailing list >>> archive >>> at Nabble.com. >> >> -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016p8020.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse
You can explicitly create a LocalEnvironment and provide a Configuration: Configuration config = new Configuration(); config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); ExecutionEnvironment env = new LocalEnvironment(config); ... On Tue, Jul 19, 2016 at 1:28 PM, Sameer Wwrote: > From Eclipse it creates a local environment and runs in the IDE. When the > program finishes so does the Flink execution instance. I have never tried > accessing the console when the program is running but one the program is > finished there is nothing to connect to. > > If you need to access the dashboard, start Flink in the pseudo-cluster mode > and connect to it using the getRemoteExecutionEnvironment(). That will allow > you to access the jobs statuses on the dashboard when you finish running > your job. > > Sameer > > On Tue, Jul 19, 2016 at 6:35 AM, Biplob Biswas > wrote: >> >> Hi, >> >> I am running my flink program using Eclipse and I can't access the >> dashboard >> at http://localhost:8081, can someone help me with this? >> >> I read that I need to check my flink-conf.yaml, but its a maven project >> and >> I don't have a flink-conf. >> >> Any help would be really appreciated. >> >> Thanks a lot >> Biplob >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html >> Sent from the Apache Flink User Mailing List archive. mailing list archive >> at Nabble.com. > >
Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse
Hi Sameer, Thanks for that quick reply, I was using flink streaming so the program keeps on running until i close it. But anyway I am ready to try this getRemoteExecutionEnvironment(), I checked but it ask me for the jar file, which is weird because I am running the program directly. Does it mean I create a jar package and then run it via eclipse? If not, could you point me to some resources? Thanks Biplob Sameer W wrote > From Eclipse it creates a local environment and runs in the IDE. When the > program finishes so does the Flink execution instance. I have never tried > accessing the console when the program is running but one the program is > finished there is nothing to connect to. > > If you need to access the dashboard, start Flink in the pseudo-cluster > mode > and connect to it using the getRemoteExecutionEnvironment(). That will > allow you to access the jobs statuses on the dashboard when you finish > running your job. > > Sameer > > On Tue, Jul 19, 2016 at 6:35 AM, Biplob Biswas > revolutionisme@ > > wrote: > >> Hi, >> >> I am running my flink program using Eclipse and I can't access the >> dashboard >> at http://localhost:8081, can someone help me with this? >> >> I read that I need to check my flink-conf.yaml, but its a maven project >> and >> I don't have a flink-conf. >> >> Any help would be really appreciated. >> >> Thanks a lot >> Biplob >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive >> at Nabble.com. >> -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016p8018.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Can't access Flink Dashboard at 8081, running Flink program using Eclipse
>From Eclipse it creates a local environment and runs in the IDE. When the program finishes so does the Flink execution instance. I have never tried accessing the console when the program is running but one the program is finished there is nothing to connect to. If you need to access the dashboard, start Flink in the pseudo-cluster mode and connect to it using the getRemoteExecutionEnvironment(). That will allow you to access the jobs statuses on the dashboard when you finish running your job. Sameer On Tue, Jul 19, 2016 at 6:35 AM, Biplob Biswaswrote: > Hi, > > I am running my flink program using Eclipse and I can't access the > dashboard > at http://localhost:8081, can someone help me with this? > > I read that I need to check my flink-conf.yaml, but its a maven project and > I don't have a flink-conf. > > Any help would be really appreciated. > > Thanks a lot > Biplob > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >
Can't access Flink Dashboard at 8081, running Flink program using Eclipse
Hi, I am running my flink program using Eclipse and I can't access the dashboard at http://localhost:8081, can someone help me with this? I read that I need to check my flink-conf.yaml, but its a maven project and I don't have a flink-conf. Any help would be really appreciated. Thanks a lot Biplob -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-access-Flink-Dashboard-at-8081-running-Flink-program-using-Eclipse-tp8016.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Data point goes missing within iteration
Hi Ufuk, Thanks for the update, is there any known way to fix this issue? Any workaround that you know of, which I can try? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8015.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Unable to get the value of datatype in datastream
Hi, you have to ensure to filter the data that you send back on the feedback edge, i.e. the loop.closeWith(newCentroids.broadcast()); statement needs to take a stream that only has the centroids that you want to send back. And you need to make sure to emit centroids with a good timestamp if you want to preserve timestamps. What you can also do is to union the stream of initial centroids with the new centroids on the feedback edge, i.e: loop.closeWith(newCentroids.union(initialCentroids).broadcast()) Cheers, Aljoscha On Mon, 18 Jul 2016 at 12:59 subash basnetwrote: > Hello all, > > I am trying to cluster datastream points around a centroid. My input is > stock data where the centroid id I have taken as the timestamp of the > stock. The error I am facing is in getting *id *of the *centroid* within > *flatMap2*. Below is my code if you could look: > > ConnectedIterativeStreams loop = > points.iterate().withFeedbackType(Centroid.class); > DataStream newCentroids = loop.flatMap(new > SelectNearestCenter(10)).map(new CountAppender()).keyBy(0) > .reduce(new CentroidAccumulator()).map(new CentroidAverager()); > DataStream finalCentroids = > loop.closeWith(newCentroids.broadcast()); > > public static final class SelectNearestCenter implements > CoFlatMapFunction > { > private Centroid[] centroids; > private int size = 0; > private int count = 0; > private boolean flag = true; > > public SelectNearestCenter(int size) { > this.size = size; > } > > @Override > public void flatMap1(Point p, Collector > out) throws > Exception { > double minDistance = Double.MAX_VALUE; > *String closestCentroidId = "-1";* > if (centroids != null) { > // let's assume minimum size 20 for now > for (Centroid centroid : centroids) { > // compute distance > double distance = p.euclideanDistance(centroid); > // update nearest cluster if necessary > if (distance < minDistance) { > minDistance = distance; > closestCentroidId = centroid.id; > } > } > } > // emit a new record with the center id and the data point. > out.collect(new Tuple2 (closestCentroidId, p)); > } > > @Override > public void flatMap2(Centroid value, Collector > out) > throws Exception { > if (flag) { > centroids = new Centroid[size]; > flag = false; > } > if (count < size) { > *System.out.println(value);* > centroids[count] = value; > count++; > } > } > } > > > The centroid datastreams looks as below with string timestamp as id. > Fri Jul 15 15:30:55 CEST 2016 117.8818 117.9 117.8 117.835 1383700.0 > Fri Jul 15 15:31:58 CEST 2016 117.835 117.99 117.82 117.885 118900.0 > > But now if I print the *centroid value *in *flatMap2* it shows with the > id as '-1': > -1 117.8818 117.9 117.8 117.835 1383700.0 > -1 117.5309 117.575 117.48245 117.52 707100.0 > > This '-1' is from *flatMap1 *which get's assigned initially. To get rid > of this if I put the out.collect statement within the if centroids is not > null condition, it never goes inside the if condition as intially the > centroids is null, hence the execution never comes out of *flatMap1*. > It would be great if you could suggest what could be the probable problem > or solution to the case. > > > Best Regards, > Subash Basnet >
Re: Intermediate Data Caching
PS: I forgot to mention that also constant iteration input is cached. On Mon, Jul 18, 2016 at 11:27 AM, Ufuk Celebiwrote: > Hey Saliya, > > the result of each iteration (super step) that is fed back to the > iteration is cached. For the iterate operator that is the last partial > solution and for the delta iterate operator it's the current solution > set > (https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html). > > Internally, this works via custom iteration operator implementations > for head and tail tasks, which are co-located and share a hash table. > I think that the internals of this are not documented, you would have > to look into the code for this. Most of the relevant implementations > are found in the "org.apache.flink.runtime.iterative.task" package. > > Hope this helps... > > Ufuk > > > On Sun, Jul 17, 2016 at 9:36 PM, Saliya Ekanayake wrote: >> Hi, >> >> I am trying to understand what's the intermediate caching support in Flink. >> For example, when there's an iterative dataset what's being cached between >> iterations. Is there some documentation on this? >> >> Thank you, >> Saliya >> >> -- >> Saliya Ekanayake >> Ph.D. Candidate | Research Assistant >> School of Informatics and Computing | Digital Science Center >> Indiana University, Bloomington >>
Re: Data point goes missing within iteration
Unfortunately, no. It's expected for streaming iterations to loose data (known shortcoming), but I don't see why they never see the initial input. Maybe Gyula or Paris (they worked on this previously) can chime in. – Ufuk On Tue, Jul 19, 2016 at 10:15 AM, Biplob Biswaswrote: > Hi Ufuk, > > Did you get time to go through my issue, just wanted to follow up to see > whether I can get a solution or not. > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8010.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com.
Re: Issue with running Flink Python jobs on cluster
Hi! HDFS is mentioned in the docs but not explicitly listed as a requirement: https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/python.html#project-setup I suppose the Python API could also distribute its libraries through Flink's BlobServer. Cheers, Max On Tue, Jul 19, 2016 at 9:24 AM, Chesnay Scheplerwrote: > Glad to hear it! The HDFS requirement should most definitely be > documented; i assumed it already was actually... > > > On 19.07.2016 03:42, Geoffrey Mon wrote: > > Hello Chesnay, > > Thank you very much! With your help I've managed to set up a Flink cluster > that can run Python jobs successfully. I solved my issue by removing > local=True and installing HDFS in a separate cluster. > > I don't think it was clearly mentioned in the documentation that HDFS was > required for Python-running clusters. Would it be a good idea to include > that in the documentation? > > Cheers, > Geoffrey > > On Sun, Jul 17, 2016 at 11:58 AM Chesnay Schepler > wrote: > >> well now i know what the problem could be. >> >> You are trying to execute a job on a cluster (== not local), but have set >> the local flag to true. >> env.execute(local=True) >> >> Due to this flag the files are only copied into the tmp directory of the >> node where you execute the plan, and are thus not accessible from other >> worker nodes. >> >> In order to use the Python API on a cluster you *must* have a filesystem >> that is accessible by all workers (like HDFS) to which the files can be >> copied. From there they can be distributed to the nodes via the DC. >> >> >> On 17.07.2016 17:33, Geoffrey Mon wrote: >> >> I haven't yet figured out how to write a Java job to test >> DistributedCache functionality between machines; I've only gotten worker >> nodes to create caches from local files (on the same worker nodes), rather >> than on files from the master node. The DistributedCache test I've been >> using (based on the DistributedCacheTest unit test) is here: >> https://gist.github.com/GEOFBOT/041d76b47f08919305493f57ebdde0f7 >> >> I realized that this test only tested local files because I was getting >> an error that the file used for the cache was not found until I created >> that file on the worker node in the location specified in the plan. >> >> I've been trying to run a simple Python example that does word counting: >> https://gist.github.com/GEOFBOT/dbdc30120fb4d71383d9e3eff5f93c1f >> >> I've tried three different setups so far: I've tried virtual machines, >> AWS virtual machine instances, and physical machines. With each setup, I >> get the same errors. >> >> Although with all three of these setups, basic Java jobs can be run (like >> WordCount, PageRank), Python programs cannot be run because the files >> needed to run them are not properly distributed to the worker nodes. I've >> found that although the master node reads the Python libraries and plan >> files (presumably to send them to the worker), the worker node never writes >> any of those files to disk, despite the files being added to the list of >> files in the distributed cache via DistributedCache.writeFileInfotoConfig >> (which I found via remote debugging). >> >> When a Python program is run via pyflink, it executes but crashes as soon >> as there is any sort of operation requiring mapping. The following >> exception is thrown: >> >> 2016-07-17 09:39:50,857 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph- >> MapPartition (PythonFlatMap -> PythonMap) (1/1) >> (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED >> 2016-07-17 09:39:50,863 INFO >> org.apache.flink.runtime.jobmanager.JobManager- Status of >> job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49 >> EDT 2016) changed to FAILING. >> java.lang.Exception: The user defined 'open()' method caused an >> exception: An error occurred while copying the file. >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) >> >> Caused by: java.lang.RuntimeException: An error occurred while copying >> the file. >> at >> org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78) >> at >> org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102) >> >> Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not >> exist or the user running Flink ('gmon') has insufficient permissions to >> access it. >> at >> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109) >> at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242) >> at >> org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322) >> >> ... 1 more >> >> If the pyflink library is manually copied into place at /tmp/flink, that >> error will be replaced by the following: >> >> 2016-07-17 00:10:54,342 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph-
Re: Data point goes missing within iteration
Hi Ufuk, Did you get time to go through my issue, just wanted to follow up to see whether I can get a solution or not. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-point-goes-missing-within-iteration-tp7776p8010.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
DataStreamUtils not working properly
Hello all, I am trying to convert datastream to collection, but it's shows blank result. There is a stream of data which can be viewed on the console on print(), but the collection of the same stream shows empty after conversion. Below is the code: DataStream centroids = newCentroidDataStream.map(new TupleCentroidConverter()); centroids.print(); Iterator iter = DataStreamUtils.collect(centroids); Collection testCentroids = Lists.newArrayList(iter); for(Centroid c: testCentroids){ System.out.println(c); } The above *centroids.print()* gives the following output in console: Mon Jul 18 21:29:01 CEST 2016 119.3701 119.4 119.3701 119.38 27400.0 Mon Jul 18 21:23:00 CEST 2016 119.3463 119.37 119.315 119.37 48200.0 Mon Jul 18 21:27:59 CEST 2016 119.3401 119.3401 119.26 119.265 50300.0 Mon Jul 18 21:36:00 CEST 2016 119.48 119.505 119.47 119.4741 37400.0 Mon Jul 18 21:33:00 CEST 2016 119.535 119.54 119.445 119.455 152900.0 But the next *System.out.println(c) *within the for loop prints nothing. What could be the problem. My maven has following configuration for dataStreamUtils: org.apache.flink flink-streaming-contrib_2.10 ${flink.version} Best Regards, Subash Basnet
Parallelizing openCV libraries in Flink
Hello users, I am currently doing a project in image processing with Open CV library. Have anyone here faced any issue with parallelizing the library in flink? I have written a code which is running fine on local environment, however when I try to run it in distributed environment it writes (it was supposed to write some result) in the sink files. I suspect that it is having problem with reading the video file which I have supplied the source directory. Any comments and similar experience will be extremely helpful. Warm Regards, Debaditya
Re: Error using S3a State Backend: Window Operators sending directory instead of fully qualified file?
Feel free to do the contribution at any time you like. We can also always make it part of a bugfix release if it does not make it into the upcoming 1.1 RC (probably end of this week or beginning of next). Feel free to ping me if you need any feed back or pointers. – Ufuk On Mon, Jul 18, 2016 at 9:52 PM, Clifford Resnickwrote: > In 1.1, AbstractYarnClusterDescriptor pushes contents of flink/lib (local to > where the yarn app is launched) to Yarn with a single directory copy. In > 1.0.3 it looked like it was copying the individual jars. > > So, yes I did actually change HDFSCopyToLocal, which was easy, but the job > staging in the above class also needs altering. I’m happy to contribute on > both though I won’t be able to get to it until later this week. > > -Cliff > > > > On 7/18/16, 3:38 PM, "Ufuk Celebi" wrote: > > Hey Cliff! Good to see that we came to the same conclusion :-) What do > you mean with copying of the "lib" folder? This issue should be the > same for both 1.0 and 1.1. Another work around could be to use the > fully async RocksDB snapshots with Flink 1.1-SNAPSHOT. > > If you like, you could also work on the issue I've created by > implementing the recursive File copy in Flink (in HDFSCopyToLocal) and > contribute this via a pull request. > > – Ufuk > > > On Mon, Jul 18, 2016 at 7:22 PM, Clifford Resnick > wrote: > > Hi Ufuk, > > > > My mail was down, so I missed this response. Thanks for that. > > > > On 7/18/16, 10:38 AM, "Ufuk Celebi" wrote: > > > > Hey Cliff! > > > > I was able to reproduce this by locally running a job and RocksDB > semi > > asynchronous checkpoints (current default) to S3A. I've created an > > issue here: https://issues.apache.org/jira/browse/FLINK-4228. > > > > Running with S3N it is working as expected. You can use that > > implementation as a work around. I don't know whether it's possible > to > > disable creation of MD5 hashes for S3A. > > > > – Ufuk > > > > On Sat, Jul 16, 2016 at 6:26 PM, Clifford Resnick > > wrote: > > > Using Flink 1.1-SNAPSHOT, Hadoop-aws 2.6.4 > > > > > > > > > > > > The error I’m getting is : > > > > > > > > > > > > 11:05:44,425 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask > > > - Caught exception while materializing asynchronous checkpoints. > > > > > > com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > > > > /var/folders/t8/k5764ltj4sq4ft06c1zp0nxn928mwr/T/flink-io-247956be-e422-4222-a512-e3ae321b1590/ede87211c622f86d1ef7b2b323076e79/WindowOperator_10_3/dummy_state/31b7ca7b-dc94-4d40-84c7-4f10ebc644a2/local-chk-1 > > > (Is a directory) > > > > > > at > > > > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1266) > > > > > > at > > > > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131) > > > > > > at > > > > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123) > > > > > > at > > > > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139) > > > > > > at > > > > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47) > > > > > > at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > > > > > at > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > > > > > at > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > > > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > > > > In the debugger I noticed that some of the uploaded checkpoints > are from the > > > configured /tmp location. These succeed as file in the request is > fully > > > qualified, but I guess it’s different for WindowOperators? Here > the file in > > > the request (using a different /var/folders.. location not > configured by me > > > – must be a mac thing?) is actually a directory. The AWS api is > failing when > > > it tries to calculate an MD5 of the directory. The Flink side of > the > > > codepath is hard to discern from debugging because it’s > asynchronous. > > > > > > > > > > > > I get the same issue whether local or on a
Re: Issue with running Flink Python jobs on cluster
Glad to hear it! The HDFS requirement should most definitely be documented; i assumed it already was actually... On 19.07.2016 03:42, Geoffrey Mon wrote: Hello Chesnay, Thank you very much! With your help I've managed to set up a Flink cluster that can run Python jobs successfully. I solved my issue by removing local=True and installing HDFS in a separate cluster. I don't think it was clearly mentioned in the documentation that HDFS was required for Python-running clusters. Would it be a good idea to include that in the documentation? Cheers, Geoffrey On Sun, Jul 17, 2016 at 11:58 AM Chesnay Schepler> wrote: well now i know what the problem could be. You are trying to execute a job on a cluster (== not local), but have set the local flag to true. env.execute(local=True) Due to this flag the files are only copied into the tmp directory of the node where you execute the plan, and are thus not accessible from other worker nodes. In order to use the Python API on a cluster you *must* have a filesystem that is accessible by all workers (like HDFS) to which the files can be copied. From there they can be distributed to the nodes via the DC. On 17.07.2016 17:33, Geoffrey Mon wrote: I haven't yet figured out how to write a Java job to test DistributedCache functionality between machines; I've only gotten worker nodes to create caches from local files (on the same worker nodes), rather than on files from the master node. The DistributedCache test I've been using (based on the DistributedCacheTest unit test) is here: https://gist.github.com/GEOFBOT/041d76b47f08919305493f57ebdde0f7 I realized that this test only tested local files because I was getting an error that the file used for the cache was not found until I created that file on the worker node in the location specified in the plan. I've been trying to run a simple Python example that does word counting: https://gist.github.com/GEOFBOT/dbdc30120fb4d71383d9e3eff5f93c1f I've tried three different setups so far: I've tried virtual machines, AWS virtual machine instances, and physical machines. With each setup, I get the same errors. Although with all three of these setups, basic Java jobs can be run (like WordCount, PageRank), Python programs cannot be run because the files needed to run them are not properly distributed to the worker nodes. I've found that although the master node reads the Python libraries and plan files (presumably to send them to the worker), the worker node never writes any of those files to disk, despite the files being added to the list of files in the distributed cache via DistributedCache.writeFileInfotoConfig (which I found via remote debugging). When a Python program is run via pyflink, it executes but crashes as soon as there is any sort of operation requiring mapping. The following exception is thrown: 2016-07-17 09:39:50,857 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition (PythonFlatMap -> PythonMap) (1/1) (12fbc21f0424ab87f3ef579fbe73b0b3) switched from RUNNING to FAILED 2016-07-17 09:39:50,863 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job 3685c180d7eb004154e8f9d94996e0cf (Flink Java Job at Sun Jul 17 09:39:49 EDT 2016) changed to FAILING. java.lang.Exception: The user defined 'open()' method caused an exception: An error occurred while copying the file. at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:481) Caused by: java.lang.RuntimeException: An error occurred while copying the file. at org.apache.flink.api.common.cache.DistributedCache.getFile(DistributedCache.java:78) at org.apache.flink.python.api.streaming.data.PythonStreamer.startPython(PythonStreamer.java:102) Caused by: java.io.FileNotFoundException: File file:/tmp/flink does not exist or the user running Flink ('gmon') has insufficient permissions to access it. at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:109) at org.apache.flink.runtime.filecache.FileCache.copy(FileCache.java:242) at org.apache.flink.runtime.filecache.FileCache$CopyProcess.call(FileCache.java:322) ... 1 more If the pyflink library is manually copied into place at /tmp/flink, that error will be replaced by the following: 2016-07-17 00:10:54,342 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- MapPartition (PythonFlatMap -> PythonMap) (1/1) (23591303d5b571a6b3e9b68ef51c5a8e) switched from RUNNING to FAILED 2016-07-17 00:10:54,348 INFO org.apache.flink.runtime.jobmanager.JobManager- Status of job e072403ffec32bd14b54416b53cb46ae (Flink