Re: Flink program compiled with Janino fails
I’m not a Janino expert but it might be related to the fact that Janino not fully supports generic types (see http://unkrig.de/w/Janino under limitations). Maybe it works of you use the untyped MapFunction type. Cheers, Till On Sat, Oct 3, 2015 at 8:04 PM, Giacomo Licariwrote: > Hi guys, > I'm developing a dynamic Flink program composer, which receive a dataflow > from a client and convert it into Flink code. > > I have tried to compile a test Flink program with Janino, but it fails, > the error I receive is: > org.codehaus.commons.compiler.CompileException: Line 66, Column 0: > Non-abstract class "FlinkExecutor$1" must implement method "public abstract > java.lang.Object > org.apache.flink.api.common.functions.MapFunction.map(java.lang.Object) > throws java.lang.Exception" > > It seems Janino doesn't recognize the MapFunction. > > If I put this code into a java file and I execute it with Eclipse, > everything works good. > > Here the code I used: > > package Test; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import com.Flink.Operators.Source; > > public class FlinkExecutor { > public static class RainPOJO { > private String altitude; > private String city_name; > private String latitude; > private String longitude; > private String rainfall; > private String station_name; > private String time; > public String getAltitude() { > return altitude; > } > public void setAltitude(String Altitude) { > this.altitude = Altitude; > } > public String getCity_name() { > return city_name; > } > public void setCity_name(String City_name) { > this.city_name = City_name; > } > public String getLatitude() { > return latitude; > } > public void setLatitude(String Latitude) { > this.latitude = Latitude; > } > public String getLongitude() { > return longitude; > } > public void setLongitude(String Longitude) { > this.longitude = Longitude; > } > public String getRainfall() { > return rainfall; > } > public void setRainfall(String Rainfall) { > this.rainfall = Rainfall; > } > public String getStation_name() { > return station_name; > } > public void setStation_name(String Station_name) { > this.station_name = Station_name; > } > public String getTime() { > return time; > } > public void setTime(String Time) { > this.time = Time; > } > } > public FlinkExecutor() {} > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > env.setDegreeOfParallelism(1); > Source Source = new Source("sensor", "rain"); > String path_Source = Source.getCSVPath(); > DataSet < RainPOJO > ds_s1 = env.readCsvFile("file://" + path_Source) > .ignoreFirstLine() > .pojoType(RainPOJO.class, "altitude", "city_name", "latitude", > "longitude", "rainfall", "station_name", "time"); > long size = ds_s1.count(); > long startTime = System.currentTimeMillis(); > ds_s1.map(new MapFunction < RainPOJO, String > () { > int count = 0;@Override > public String map(RainPOJO obj) throws Exception { > count += 1; > long endTime = System.currentTimeMillis(); > double elapsed_time = endTime - startTime; > if (count == size) { > double d_seconds = elapsed_time / 1000; > return "Elapsed time => " + elapsed_time + "(millis) " + d_seconds + " > seconds"; > } > return " " + count; > } > }) > .print(); > } > } >
Re: Destroy StreamExecutionEnv
Hi, you just need to terminate your source (ie, return from run() method if you implement your own source function). This will finish the complete program. For already available sources, just make sure you read finite input. Hope this helps. -Matthias On 10/05/2015 12:15 AM, jay vyas wrote: > Hi folks. > > How do we end a stream execution environment? > > I have a unit test which runs a streaming job, and want the unit test to > die after the first round of output is processed... > > > DataStream> counts = > dataStream.map( > new MapFunction >() { > @Override > public Tuple2
Re: Destroy StreamExecutionEnv
Matthias' solution should work in most cases. In cases where you do not control the source (or the source can never be finite, like the Kafka source), we often use a trick in the tests, which is throwing a special type of exception (a SuccessException). You can catch this exception on env.execute() (it is the nested cause) and decide that this qualifies the test as successful... Greetings, Stephan On Mon, Oct 5, 2015 at 11:20 AM, Matthias J. Saxwrote: > Hi, > > you just need to terminate your source (ie, return from run() method if > you implement your own source function). This will finish the complete > program. For already available sources, just make sure you read finite > input. > > Hope this helps. > > -Matthias > > On 10/05/2015 12:15 AM, jay vyas wrote: > > Hi folks. > > > > How do we end a stream execution environment? > > > > I have a unit test which runs a streaming job, and want the unit test to > > die after the first round of output is processed... > > > > > > DataStream > counts = > > dataStream.map( > > new MapFunction >() { > > @Override > > public Tuple2
Processing S3 data with Apache Flink
Hi guys, I,m trying to get work Apache Flink 0.9.1 on EMR, basically to read data from S3. I tried the following path for data s3://mybucket.s3.amazonaws.com/folder, but it throws me the following exception: java.io.IOException: Cannot establish connection to Amazon S3: com.amazonaws.services.s3.model.AmazonS3Exception: The request signature we calculated does not match the signature you provided. Check your key and signing method. (Service: Amazon S3; Status Code: 403; I added access and secret keys, so the problem is not here. I=92m using standard region and gave read credential to everyone. Any ideas how can it be fixed? Thank you in advance, Kostia
Re: Error trying to access JM through proxy
I filed a bug for this issue in our bug tracker https://issues.apache.org/jira/browse/FLINK-2821 (even though we can not do much about it, we should track the resolution of the issue). On Mon, Oct 5, 2015 at 5:34 AM, Stephan Ewenwrote: > I think this is yet another problem caused by Akka's overly strict message > routing. > > An actor system bound to a certain URL can only receive messages that are > sent to that exact URL. All other messages are dropped. > > This has many problems: > > - Proxy routing (as described here, send to the proxy URL, receiver > recognizes only original URL) > - Using hostname / IP interchangeably does not work (we solved this by > always putting IP addresses into URLs, never hostnames) > - Binding to multiple interfaces (any local 0.0.0.0) does not work. > Still no solution to that (but seems not too much of a restriction) > > Since this is inherent in Akka, I am puzzled what we can do about that. > Just googled a bit, seems other systems that use Akka (like Spark) have > stumbled across that issue as well... > > A good point to ask would be the akka mailing list. I can did into this > after the next release (2 weeks or so), if you want to help us speed this > up, you could ping the Akka mailing list as well. > > Greetings, > Stephan > > > > > On Sun, Oct 4, 2015 at 7:21 AM, Henry Saputra > wrote: > >> Hi Emmanuel, >> >> Could you tell a bit how the proxy being setup? >> >> - Henry >> >> On Fri, Oct 2, 2015 at 1:55 PM, Emmanuel wrote: >> > When trying to access the JM through a proxy I get: >> > >> > >> > 19:26:23,113 ERROR akka.remote.EndpointWriter >> > - dropping message [class akka.actor.ActorSelectionMessage] for >> non-local >> > recipient [Actor[akka.tcp://flink@10.155.241.168:6123/]] arriving at >> > [akka.tcp://flink@10.155.241.168:6123] inbound addresses are >> > [akka.tcp://flink@10.152.1.107:6123] >> > >> > Is there a way to allow this look up through a proxy? >> > >> > Thanks >> > >> > >> > >
Re: Processing S3 data with Apache Flink
Hi Kostia, thank you for writing to the Flink mailing list. I actually started to try out our S3 File system support after I saw your question on StackOverflow [1]. I found that our S3 connector is very broken. I had to resolve two more issues with it, before I was able to get the same exception you reported. Another Flink commiter looked into the issue as well (it was confirmed as well) but there was no solution [2]. So for now, I would say we have to assume that our S3 connector is not working. I will start a separate discussion at the developer mailing list to remove our S3 connector. The good news is that you can just use Hadoop's S3 File System implementation with Flink. I used this Flink program to verify its working: public class S3FileSystem { public static void main(String[] args) throws Exception { ExecutionEnvironment ee = ExecutionEnvironment.createLocalEnvironment(); DataSet myLines = ee.readTextFile("s3n://my-bucket-name/some-test-file.xml"); myLines.print(); } } also, you need to make a Hadoop configuration file available to Flink. When running flink locally in your IDE, just create a "core-site.xml" in the src/main/resource folder, with the following content: fs.s3n.awsAccessKeyId putKeyHere fs.s3n.awsSecretAccessKey putSecretHere fs.s3n.impl org.apache.hadoop.fs.s3native.NativeS3FileSystem Maybe you are running on a cluster, then re-use the existing core-site.xml file (= edit it) and point to the directory using Flink's fs.hdfs.hadoopconf configuration option. With these two things in place, you should be good to go. [1] http://stackoverflow.com/questions/32959790/run-apache-flink-with-amazon-s3 [2] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-Amazon-S3-td946.html On Mon, Oct 5, 2015 at 8:19 PM, Kostiantyn Kudriavtsev < kudryavtsev.konstan...@gmail.com> wrote: > Hi guys, > > I,m trying to get work Apache Flink 0.9.1 on EMR, basically to read > data from S3. I tried the following path for data > s3://mybucket.s3.amazonaws.com/folder, but it throws me the following > exception: > > java.io.IOException: Cannot establish connection to Amazon S3: > com.amazonaws.services.s3.model.AmazonS3Exception: The request signature > we calculated does not match the signature you provided. Check your key > and signing method. (Service: Amazon S3; Status Code: 403; > > I added access and secret keys, so the problem is not here. I=92m using > standard region and gave read credential to everyone. > > Any ideas how can it be fixed? > > Thank you in advance, > Kostia >
Re: Error trying to access JM through proxy
I think this is yet another problem caused by Akka's overly strict message routing. An actor system bound to a certain URL can only receive messages that are sent to that exact URL. All other messages are dropped. This has many problems: - Proxy routing (as described here, send to the proxy URL, receiver recognizes only original URL) - Using hostname / IP interchangeably does not work (we solved this by always putting IP addresses into URLs, never hostnames) - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still no solution to that (but seems not too much of a restriction) Since this is inherent in Akka, I am puzzled what we can do about that. Just googled a bit, seems other systems that use Akka (like Spark) have stumbled across that issue as well... A good point to ask would be the akka mailing list. I can did into this after the next release (2 weeks or so), if you want to help us speed this up, you could ping the Akka mailing list as well. Greetings, Stephan On Sun, Oct 4, 2015 at 7:21 AM, Henry Saputrawrote: > Hi Emmanuel, > > Could you tell a bit how the proxy being setup? > > - Henry > > On Fri, Oct 2, 2015 at 1:55 PM, Emmanuel wrote: > > When trying to access the JM through a proxy I get: > > > > > > 19:26:23,113 ERROR akka.remote.EndpointWriter > > - dropping message [class akka.actor.ActorSelectionMessage] for non-local > > recipient [Actor[akka.tcp://flink@10.155.241.168:6123/]] arriving at > > [akka.tcp://flink@10.155.241.168:6123] inbound addresses are > > [akka.tcp://flink@10.152.1.107:6123] > > > > Is there a way to allow this look up through a proxy? > > > > Thanks > > > > >
Reading from multiple input files with fewer task slots
Hello Flinkers! I run into some strange behavior when reading from a folder of input files. When the number of input files in the folder exceeds the number of task slots I noticed that the size of my datasets varies with each run. It seems as if the transformations don't wait for all input files to be read. When I have equal or more task slots than there are files, there are no problems. I'm using a custom input format. Could there be a problem with my custom input format, and if so what could I be forgetting? Kind regards and thank you for your time! Pieter
Re: Config files content read
Hi Flavio, I don't think this is a feature that needs to go into Flink core. To me it looks like this be implemented as a utility method by anybody who needs it without major effort. Best, Fabian 2015-10-02 15:27 GMT+02:00 Flavio Pompermaier: > Hi to all, > > in many of my jobs I have to read a config file that can be either on > local fs either on hdfs. > I'm looking for an intuitive API to read the content of such config files > (JSON) before converting them to Java objects through jackson. Is there any > Flink API to easily achieve this? > I really like something like > >- String content = FileSystem.get(myFileUri).readAsString() or >- String content = new Path(myFilePath).readAsString(); > > but at the moment the only solution I found is something like: > > *StringBuffer content = new StringBuffer();* > *Path path = new Path(myFilePath);* > *FSDataInputStream stream = FileSystem.get(path.toUri()).open(path);* > *BufferedReader reader = new BufferedReader(new > InputStreamReader(stream));* > *String line;* > *try {* > * while ((line = reader.readLine()) != null) {* > * content.append(line);* > * }* > *} finally {* > * reader.close();* > *}* > *String contentStr = content.toString();* > > > Am I the only one that need such a feature? > Best, > Flavio >
Re: For each element in a dataset, do something with another dataset
Hi Fabian, I have a question regarding the first approach. Is there a benefit gained from choosing a RichMapPartitionFunction over a RichMapFunction in this case? I assume that each broadcasted dataset is sent only once to each task manager? If I would broadcast dataset B, then I could for each element a in A count the number of elements in B that are smaller than a and output a tuple in a map operation. This would also save me a step in aggregating the results? Kind regards, Pieter 2015-09-30 12:44 GMT+02:00 Pieter Hameete: > Hi Gabor, Fabian, > > thank you for your suggestions. I am intending to scale up so that I'm > sure that both A and B won't fit in memory. I'll see if I can come up with > a nice way to partition the datasets but if that will take too much time > I'll just have to accept that it wont work on large datasets. I'll let you > know if I managed to work something out, but I wont work on it until the > weekend :-) > > Cheers again, > > Pieter > > 2015-09-30 12:28 GMT+02:00 Gábor Gévay : > >> Hello, >> >> Alternatively, if dataset B fits in memory, but dataset A doesn't, >> then you can do it with broadcasting B to a RichMapPartitionFunction >> on A: >> In the open method of mapPartition, you sort B. Then, for each element >> of A, you do a binary search in B, and look at the index found by the >> binary search, which will be the count that you are looking for. >> >> Best, >> Gabor >> >> >> >> 2015-09-30 11:20 GMT+02:00 Fabian Hueske : >> > The idea is to partition both datasets by range. >> > Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1: >> > [1,2,3] and p2: [4,5,6]. >> > Each partition is given to a different instance of a MapPartition >> operator >> > (this is a bit tricky, because you cannot use broadcastSet. You could >> load >> > the corresponding partition it in the open() function from HDFS for >> > example). >> > >> > DataSet B is partitioned in the same way, i.e., all elements <= 3 go to >> > partition 1, everything > 3 goes to p2. You can partition a dataset by >> range >> > using the partitionCustom() function. The partitioned dataset is given >> to >> > the mapPartition operator that loaded a partition of dataset A in each >> task >> > instance. >> > You do the counting just like before (sorting the partition of dataset >> A, >> > binary sort, long[]), but add an additional count for the complete >> partition >> > (basically count all elements that arrive in the task instance). >> > >> > If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 >> would >> > be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7]. >> > Now you need to compute the final count by adding the "all" counts of >> the >> > lower partitions to the counts of the "higher" partitions, i.e., add >> all:5 >> > of p1 to all counts for p2. >> > >> > This approach requires to know the value range and distribution of the >> > values which makes it a bit difficult. I guess you'll get the best >> > performance, if you partition in a way, that you have about equally >> sized >> > partitions of dataset B with the constraint that the corresponding >> > partitions of A fit into memory. >> > >> > As I said, its a bit cumbersome. I hope you could follow my explanation. >> > Please ask if something is not clear ;-) >> > >> > 2015-09-30 10:46 GMT+02:00 Pieter Hameete : >> >> >> >> Hi Fabian, >> >> >> >> thanks for your tips! >> >> >> >> Do you have some pointers for getting started with the 'tricky range >> >> partitioning'? I am quite keen to get this working with large datasets >> ;-) >> >> >> >> Cheers, >> >> >> >> Pieter >> >> >> >> 2015-09-30 10:24 GMT+02:00 Fabian Hueske : >> >>> >> >>> Hi Pieter, >> >>> >> >>> cross is indeed too expensive for this task. >> >>> >> >>> If dataset A fits into memory, you can do the following: Use a >> >>> RichMapPartitionFunction to process dataset B and add dataset A as a >> >>> broadcastSet. In the open method of mapPartition, you can load the >> >>> broadcasted set and sort it by a.propertyX and initialize a long[] >> for the >> >>> counts. For each element of dataset B, you do a binary search on the >> sorted >> >>> dataset A and increase all counts up to the position in the sorted >> list. >> >>> After all elements of dataset B have been processed, return the >> counts from >> >>> the long[]. >> >>> >> >>> If dataset A doesn't fit into memory, things become more cumbersome >> and >> >>> we need to play some tricky with range partitioning... >> >>> >> >>> Let me know, if you have questions, >> >>> Fabian >> >>> >> >>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete : >> >> Good day everyone, >> >> I am looking for a good way to do the following: >> >> I have dataset A and dataset B, and for each element in dataset A I >> would like to filter dataset B and obtain the size of the result. To >>
Re: Reading from multiple input files with fewer task slots
I assume this concerns the streaming API? Can you share your program and/or the custom input format code? On Mon, Oct 5, 2015 at 12:33 PM, Pieter Hameetewrote: > Hello Flinkers! > > I run into some strange behavior when reading from a folder of input files. > > When the number of input files in the folder exceeds the number of task > slots I noticed that the size of my datasets varies with each run. It seems > as if the transformations don't wait for all input files to be read. > > When I have equal or more task slots than there are files, there are no > problems. > > I'm using a custom input format. Could there be a problem with my custom > input format, and if so what could I be forgetting? > > Kind regards and thank you for your time! > > Pieter >
Re: Reading from multiple input files with fewer task slots
Hi Stephen, it concerns the DataSet API. The program im running can be found at https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/performance/xmark/XMarkQuery11.scala The Custom Input Format at https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/parsing/xml/XML2DawnInputFormat.java Cheers! 2015-10-05 12:38 GMT+02:00 Stephan Ewen: > I assume this concerns the streaming API? > > Can you share your program and/or the custom input format code? > > On Mon, Oct 5, 2015 at 12:33 PM, Pieter Hameete > wrote: > >> Hello Flinkers! >> >> I run into some strange behavior when reading from a folder of input >> files. >> >> When the number of input files in the folder exceeds the number of task >> slots I noticed that the size of my datasets varies with each run. It seems >> as if the transformations don't wait for all input files to be read. >> >> When I have equal or more task slots than there are files, there are no >> problems. >> >> I'm using a custom input format. Could there be a problem with my custom >> input format, and if so what could I be forgetting? >> >> Kind regards and thank you for your time! >> >> Pieter >> > >
Re: For each element in a dataset, do something with another dataset
Hi Pieter, a FlatMapFunction can only return values when the map() method is called. However, in your use case, you would like to return values *after* the function was called the last time. This is not possible with a FlatMapFunction, because you cannot identify the last map() call. The MapPartitionFunction is called only once with an iterator over the whole partition. Hence you can return values after the iterator was fully consumed. The broadcast set is sent only once in both cases. If it is possible to broadcast dataset B, you can also use a MapFunction and don't need to store the count values. Best, Fabian 2015-10-05 11:53 GMT+02:00 Pieter Hameete: > Hi Fabian, > > I have a question regarding the first approach. Is there a benefit gained > from choosing a RichMapPartitionFunction over a RichMapFunction in this > case? I assume that each broadcasted dataset is sent only once to each task > manager? > > If I would broadcast dataset B, then I could for each element a in A count > the number of elements in B that are smaller than a and output a tuple in a > map operation. This would also save me a step in aggregating the results? > > Kind regards, > > Pieter > > 2015-09-30 12:44 GMT+02:00 Pieter Hameete : > >> Hi Gabor, Fabian, >> >> thank you for your suggestions. I am intending to scale up so that I'm >> sure that both A and B won't fit in memory. I'll see if I can come up with >> a nice way to partition the datasets but if that will take too much time >> I'll just have to accept that it wont work on large datasets. I'll let you >> know if I managed to work something out, but I wont work on it until the >> weekend :-) >> >> Cheers again, >> >> Pieter >> >> 2015-09-30 12:28 GMT+02:00 Gábor Gévay : >> >>> Hello, >>> >>> Alternatively, if dataset B fits in memory, but dataset A doesn't, >>> then you can do it with broadcasting B to a RichMapPartitionFunction >>> on A: >>> In the open method of mapPartition, you sort B. Then, for each element >>> of A, you do a binary search in B, and look at the index found by the >>> binary search, which will be the count that you are looking for. >>> >>> Best, >>> Gabor >>> >>> >>> >>> 2015-09-30 11:20 GMT+02:00 Fabian Hueske : >>> > The idea is to partition both datasets by range. >>> > Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1: >>> > [1,2,3] and p2: [4,5,6]. >>> > Each partition is given to a different instance of a MapPartition >>> operator >>> > (this is a bit tricky, because you cannot use broadcastSet. You could >>> load >>> > the corresponding partition it in the open() function from HDFS for >>> > example). >>> > >>> > DataSet B is partitioned in the same way, i.e., all elements <= 3 go to >>> > partition 1, everything > 3 goes to p2. You can partition a dataset by >>> range >>> > using the partitionCustom() function. The partitioned dataset is given >>> to >>> > the mapPartition operator that loaded a partition of dataset A in each >>> task >>> > instance. >>> > You do the counting just like before (sorting the partition of dataset >>> A, >>> > binary sort, long[]), but add an additional count for the complete >>> partition >>> > (basically count all elements that arrive in the task instance). >>> > >>> > If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 >>> would >>> > be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7]. >>> > Now you need to compute the final count by adding the "all" counts of >>> the >>> > lower partitions to the counts of the "higher" partitions, i.e., add >>> all:5 >>> > of p1 to all counts for p2. >>> > >>> > This approach requires to know the value range and distribution of the >>> > values which makes it a bit difficult. I guess you'll get the best >>> > performance, if you partition in a way, that you have about equally >>> sized >>> > partitions of dataset B with the constraint that the corresponding >>> > partitions of A fit into memory. >>> > >>> > As I said, its a bit cumbersome. I hope you could follow my >>> explanation. >>> > Please ask if something is not clear ;-) >>> > >>> > 2015-09-30 10:46 GMT+02:00 Pieter Hameete : >>> >> >>> >> Hi Fabian, >>> >> >>> >> thanks for your tips! >>> >> >>> >> Do you have some pointers for getting started with the 'tricky range >>> >> partitioning'? I am quite keen to get this working with large >>> datasets ;-) >>> >> >>> >> Cheers, >>> >> >>> >> Pieter >>> >> >>> >> 2015-09-30 10:24 GMT+02:00 Fabian Hueske : >>> >>> >>> >>> Hi Pieter, >>> >>> >>> >>> cross is indeed too expensive for this task. >>> >>> >>> >>> If dataset A fits into memory, you can do the following: Use a >>> >>> RichMapPartitionFunction to process dataset B and add dataset A as a >>> >>> broadcastSet. In the open method of mapPartition, you can load the >>> >>> broadcasted set and sort it by a.propertyX and initialize a long[] >>> for the >>> >>>
Re: Reading from multiple input files with fewer task slots
If you have more files than task slots, then some tasks will get multiple files. That means that open() and close() are called multiple times on the input format. Make sure that your input format tolerates that and does not get confused with lingering state (maybe create a new SimpleInputProjection as well) On Mon, Oct 5, 2015 at 12:41 PM, Pieter Hameetewrote: > Hi Stephen, > > it concerns the DataSet API. > > The program im running can be found at > https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/performance/xmark/XMarkQuery11.scala > The Custom Input Format at > https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/parsing/xml/XML2DawnInputFormat.java > > Cheers! > > 2015-10-05 12:38 GMT+02:00 Stephan Ewen : > >> I assume this concerns the streaming API? >> >> Can you share your program and/or the custom input format code? >> >> On Mon, Oct 5, 2015 at 12:33 PM, Pieter Hameete >> wrote: >> >>> Hello Flinkers! >>> >>> I run into some strange behavior when reading from a folder of input >>> files. >>> >>> When the number of input files in the folder exceeds the number of task >>> slots I noticed that the size of my datasets varies with each run. It seems >>> as if the transformations don't wait for all input files to be read. >>> >>> When I have equal or more task slots than there are files, there are no >>> problems. >>> >>> I'm using a custom input format. Could there be a problem with my custom >>> input format, and if so what could I be forgetting? >>> >>> Kind regards and thank you for your time! >>> >>> Pieter >>> >> >> >
Re: Reading from multiple input files with fewer task slots
Hi Stephen, it was not the SimpleInputProjection, because that is a stateless object. The boolean endReached was not reset upon opening a new file however, so for each consecutive file no records were parsed. Thanks alot for your help! - Pieter 2015-10-05 12:50 GMT+02:00 Stephan Ewen: > If you have more files than task slots, then some tasks will get multiple > files. That means that open() and close() are called multiple times on the > input format. > > Make sure that your input format tolerates that and does not get confused > with lingering state (maybe create a new SimpleInputProjection as well) > > On Mon, Oct 5, 2015 at 12:41 PM, Pieter Hameete > wrote: > >> Hi Stephen, >> >> it concerns the DataSet API. >> >> The program im running can be found at >> https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/performance/xmark/XMarkQuery11.scala >> The Custom Input Format at >> https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/parsing/xml/XML2DawnInputFormat.java >> >> Cheers! >> >> 2015-10-05 12:38 GMT+02:00 Stephan Ewen : >> >>> I assume this concerns the streaming API? >>> >>> Can you share your program and/or the custom input format code? >>> >>> On Mon, Oct 5, 2015 at 12:33 PM, Pieter Hameete >>> wrote: >>> Hello Flinkers! I run into some strange behavior when reading from a folder of input files. When the number of input files in the folder exceeds the number of task slots I noticed that the size of my datasets varies with each run. It seems as if the transformations don't wait for all input files to be read. When I have equal or more task slots than there are files, there are no problems. I'm using a custom input format. Could there be a problem with my custom input format, and if so what could I be forgetting? Kind regards and thank you for your time! Pieter >>> >>> >> >
Re: Reading from multiple input files with fewer task slots
Okay, nice to hear it works out! On Mon, Oct 5, 2015 at 1:50 PM, Pieter Hameetewrote: > Hi Stephen, > > it was not the SimpleInputProjection, because that is a stateless object. > The boolean endReached was not reset upon opening a new file however, so > for each consecutive file no records were parsed. > > Thanks alot for your help! > > - Pieter > > 2015-10-05 12:50 GMT+02:00 Stephan Ewen : > >> If you have more files than task slots, then some tasks will get multiple >> files. That means that open() and close() are called multiple times on the >> input format. >> >> Make sure that your input format tolerates that and does not get confused >> with lingering state (maybe create a new SimpleInputProjection as well) >> >> On Mon, Oct 5, 2015 at 12:41 PM, Pieter Hameete >> wrote: >> >>> Hi Stephen, >>> >>> it concerns the DataSet API. >>> >>> The program im running can be found at >>> https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/performance/xmark/XMarkQuery11.scala >>> The Custom Input Format at >>> https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/parsing/xml/XML2DawnInputFormat.java >>> >>> Cheers! >>> >>> 2015-10-05 12:38 GMT+02:00 Stephan Ewen : >>> I assume this concerns the streaming API? Can you share your program and/or the custom input format code? On Mon, Oct 5, 2015 at 12:33 PM, Pieter Hameete wrote: > Hello Flinkers! > > I run into some strange behavior when reading from a folder of input > files. > > When the number of input files in the folder exceeds the number of > task slots I noticed that the size of my datasets varies with each run. It > seems as if the transformations don't wait for all input files to be read. > > When I have equal or more task slots than there are files, there are > no problems. > > I'm using a custom input format. Could there be a problem with my > custom input format, and if so what could I be forgetting? > > Kind regards and thank you for your time! > > Pieter > >>> >> >
Re: Running Flink on an Amazon Elastic MapReduce cluster
Hi Hanen, It appears that the environment variables are not set. Thus, Flink cannot pick up the Hadoop configuration. Could you please paste the output of "echo $HADOOP_HOME" and "echo $HADOOP_CONF_DIR" here? In any case, your problem looks similar to the one discussed here: http://stackoverflow.com/questions/31991934/cannot-use-apache-flink-in-amazon-emr Please execute export HADOOP_CONF_DIR=/etc/hadoop/conf and you should be good to go. Cheers, Max On Mon, Oct 5, 2015 at 3:37 PM, Hanen Borchaniwrote: > Hi all, > > I tried to start a Yarn session on an Amazon EMR cluster with Hadoop 2.6.0 > following the instructions provided in this link and using Flink 0.9.1 for > Hadoop 2.6.0 > > > https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html > > Running the following command line: ./bin/yarn-session.sh -n 2 -jm 1024 > -tm 2048 generated the following error message > > > > 12:53:47,633 INFO org.apache.hadoop.yarn.client.RMProxy >- Connecting to ResourceManager at /0.0.0.0:8032 > > 12:53:47,805 WARN > org.apache.hadoop.util.NativeCodeLoader - Unable to > load native-hadoop library for your platform... using builtin-java classes > where applicable > > 12:53:48,226 WARN > org.apache.flink.yarn.FlinkYarnClient - Neither the > HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink > YARN Client needs one of these to be set to properly load the Hadoop > configuration for accessing YARN. > > 12:53:48,227 INFO > org.apache.flink.yarn.FlinkYarnClient - Using > values: > > 12:53:48,228 INFO > org.apache.flink.yarn.FlinkYarnClient - TaskManager > count = 2 > > 12:53:48,229 INFO > org.apache.flink.yarn.FlinkYarnClient - JobManager > memory = 1024 > > 12:53:48,229 INFO > org.apache.flink.yarn.FlinkYarnClient - TaskManager > memory = 2048 > > 12:53:48,580 WARN org.apache.flink.yarn.FlinkYarnClient > - The file system scheme is 'file'. This indicates that the > specified Hadoop configuration path is wrong and the sytem is using the > default Hadoop configuration values.The Flink YARN client needs to store > its files in a distributed file system > > 12:53:48,593 INFO > org.apache.flink.yarn.Utils - Copying > from file:/home/hadoop/flink-0.9.1/lib/flink-dist-0.9.1.jar to > file:/home/hadoop/.flink/application_1444046049303_0008/flink-dist-0.9.1.jar > > 12:53:49,245 INFO > org.apache.flink.yarn.Utils - Copying > from /home/hadoop/flink-0.9.1/conf/flink-conf.yaml to > file:/home/hadoop/.flink/application_1444046049303_0008/flink-conf.yaml > > 12:53:49,251 INFO org.apache.flink.yarn.Utils >- Copying from > file:/home/hadoop/flink-0.9.1/lib/flink-python-0.9.1.jar to > file:/home/hadoop/.flink/application_1444046049303_0008/flink-python-0.9.1.jar > > 12:53:49,278 INFO > org.apache.flink.yarn.Utils - Copying > from file:/home/hadoop/flink-0.9.1/conf/logback.xml to > file:/home/hadoop/.flink/application_1444046049303_0008/logback.xml > > 12:53:49,285 INFO > org.apache.flink.yarn.Utils - Copying > from file:/home/hadoop/flink-0.9.1/conf/log4j.properties to > file:/home/hadoop/.flink/application_1444046049303_0008/log4j.properties > > 12:53:49,304 INFO > org.apache.flink.yarn.FlinkYarnClient - Submitting > application master application_1444046049303_0008 > > 12:53:49,347 INFO > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted > application application_1444046049303_0008 > > 12:53:49,347 INFO > org.apache.flink.yarn.FlinkYarnClient - Waiting for > the cluster to be allocated > > 12:53:49,349 INFO > org.apache.flink.yarn.FlinkYarnClient - Deploying > cluster, current state ACCEPTED > > 12:53:50,351 INFO > org.apache.flink.yarn.FlinkYarnClient - Deploying > cluster, current state ACCEPTED > > Error while deploying YARN cluster: The YARN application unexpectedly > switched to state FAILED during deployment. > > Diagnostics from YARN: Application application_1444046049303_0008 failed 1 > times due to AM Container for appattempt_1444046049303_0008_01 exited > with exitCode: -1000 > > For more detailed output, check application tracking page:http:// > ip-172-31-10-16.us-west-2.compute.internal:20888/proxy/application_1444046049303_0008/Then, > click on links to logs of each attempt. > > Diagnostics: File > file:/home/hadoop/.flink/application_1444046049303_0008/flink-conf.yaml > does not exist > > java.io.FileNotFoundException: File > file:/home/hadoop/.flink/application_1444046049303_0008/flink-conf.yaml > does not exist > > at >