Re: Flink program compiled with Janino fails

2015-10-05 Thread Till Rohrmann
I’m not a Janino expert but it might be related to the fact that Janino not
fully supports generic types (see http://unkrig.de/w/Janino under
limitations). Maybe it works of you use the untyped MapFunction type.

Cheers,
Till
​

On Sat, Oct 3, 2015 at 8:04 PM, Giacomo Licari 
wrote:

> Hi guys,
> I'm developing a dynamic Flink program composer, which receive a dataflow
> from a client and convert it into Flink code.
>
> I have tried to compile a test Flink program with Janino, but it fails,
> the error I receive is:
> org.codehaus.commons.compiler.CompileException: Line 66, Column 0:
> Non-abstract class "FlinkExecutor$1" must implement method "public abstract
> java.lang.Object
> org.apache.flink.api.common.functions.MapFunction.map(java.lang.Object)
> throws java.lang.Exception"
>
> It seems Janino doesn't recognize the MapFunction.
>
> If I put this code into a java file and I execute it with Eclipse,
> everything works good.
>
> Here the code I used:
>
> package Test;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import com.Flink.Operators.Source;
>
> public class FlinkExecutor {
> public static class RainPOJO {
> private String altitude;
> private String city_name;
> private String latitude;
> private String longitude;
> private String rainfall;
> private String station_name;
> private String time;
> public String getAltitude() {
> return altitude;
> }
> public void setAltitude(String Altitude) {
> this.altitude = Altitude;
> }
> public String getCity_name() {
> return city_name;
> }
> public void setCity_name(String City_name) {
> this.city_name = City_name;
> }
> public String getLatitude() {
> return latitude;
> }
> public void setLatitude(String Latitude) {
> this.latitude = Latitude;
> }
> public String getLongitude() {
> return longitude;
> }
> public void setLongitude(String Longitude) {
> this.longitude = Longitude;
> }
> public String getRainfall() {
> return rainfall;
> }
> public void setRainfall(String Rainfall) {
> this.rainfall = Rainfall;
> }
> public String getStation_name() {
> return station_name;
> }
> public void setStation_name(String Station_name) {
> this.station_name = Station_name;
> }
> public String getTime() {
> return time;
> }
> public void setTime(String Time) {
> this.time = Time;
> }
> }
> public FlinkExecutor() {}
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> env.setDegreeOfParallelism(1);
> Source Source = new Source("sensor", "rain");
> String path_Source = Source.getCSVPath();
> DataSet < RainPOJO > ds_s1 = env.readCsvFile("file://" + path_Source)
> .ignoreFirstLine()
> .pojoType(RainPOJO.class, "altitude", "city_name", "latitude",
> "longitude", "rainfall", "station_name", "time");
> long size = ds_s1.count();
> long startTime = System.currentTimeMillis();
> ds_s1.map(new MapFunction < RainPOJO, String > () {
> int count = 0;@Override
> public String map(RainPOJO obj) throws Exception {
> count += 1;
> long endTime = System.currentTimeMillis();
> double elapsed_time = endTime - startTime;
> if (count == size) {
> double d_seconds = elapsed_time / 1000;
> return "Elapsed time => " + elapsed_time + "(millis) " + d_seconds + "
> seconds";
> }
> return " " + count;
> }
> })
> .print();
> }
> }
>


Re: Destroy StreamExecutionEnv

2015-10-05 Thread Matthias J. Sax
Hi,

you just need to terminate your source (ie, return from run() method if
you implement your own source function). This will finish the complete
program. For already available sources, just make sure you read finite
input.

Hope this helps.

-Matthias

On 10/05/2015 12:15 AM, jay vyas wrote:
> Hi folks.
> 
> How do we end a stream execution environment? 
> 
> I have a unit test which runs a streaming job, and want the unit test to
> die after the first round of output is processed...
> 
> 
> DataStream> counts =
> dataStream.map(
> new MapFunction>() {
>   @Override
>   public Tuple2 map(String s) throws Exception {
> Map transaction = MAPPER.readValue(s, Map.class);
> return new Tuple2<>(transaction, 1);
>   }
> });
> counts.print();
> 
> 
> 
> -- 
> jay vyas



signature.asc
Description: OpenPGP digital signature


Re: Destroy StreamExecutionEnv

2015-10-05 Thread Stephan Ewen
Matthias' solution should work in most cases.

In cases where you do not control the source (or the source can never be
finite, like the Kafka source), we often use a trick in the tests, which is
throwing a special type of exception (a SuccessException).

You can catch this exception on env.execute() (it is the nested cause) and
decide that this qualifies the test as successful...

Greetings,
Stephan


On Mon, Oct 5, 2015 at 11:20 AM, Matthias J. Sax  wrote:

> Hi,
>
> you just need to terminate your source (ie, return from run() method if
> you implement your own source function). This will finish the complete
> program. For already available sources, just make sure you read finite
> input.
>
> Hope this helps.
>
> -Matthias
>
> On 10/05/2015 12:15 AM, jay vyas wrote:
> > Hi folks.
> >
> > How do we end a stream execution environment?
> >
> > I have a unit test which runs a streaming job, and want the unit test to
> > die after the first round of output is processed...
> >
> >
> > DataStream> counts =
> > dataStream.map(
> > new MapFunction>() {
> >   @Override
> >   public Tuple2 map(String s) throws Exception {
> > Map transaction = MAPPER.readValue(s, Map.class);
> > return new Tuple2<>(transaction, 1);
> >   }
> > });
> > counts.print();
> >
> >
> >
> > --
> > jay vyas
>
>


Processing S3 data with Apache Flink

2015-10-05 Thread Kostiantyn Kudriavtsev
Hi guys,

I,m trying to get work Apache Flink 0.9.1 on EMR, basically to read 
data from S3. I tried the following path for data 
s3://mybucket.s3.amazonaws.com/folder, but it throws me the following 
exception:

java.io.IOException: Cannot establish connection to Amazon S3: 
com.amazonaws.services.s3.model.AmazonS3Exception: The request signature 
we calculated does not match the signature you provided. Check your key 
and signing method. (Service: Amazon S3; Status Code: 403;

I added access and secret keys, so the problem is not here. I=92m using 
standard region and gave read credential to everyone.

Any ideas how can it be fixed?

Thank you in advance,
Kostia

Re: Error trying to access JM through proxy

2015-10-05 Thread Robert Metzger
I filed a bug for this issue in our bug tracker
https://issues.apache.org/jira/browse/FLINK-2821 (even though we can not do
much about it, we should track the resolution of the issue).

On Mon, Oct 5, 2015 at 5:34 AM, Stephan Ewen  wrote:

> I think this is yet another problem caused by Akka's overly strict message
> routing.
>
> An actor system bound to a certain URL can only receive messages that are
> sent to that exact URL. All other messages are dropped.
>
> This has many problems:
>
>   - Proxy routing (as described here, send to the proxy URL, receiver
> recognizes only original URL)
>   - Using hostname / IP interchangeably does not work (we solved this by
> always putting IP addresses into URLs, never hostnames)
>   - Binding to multiple interfaces (any local 0.0.0.0) does not work.
> Still no solution to that (but seems not too much of a restriction)
>
> Since this is inherent in Akka, I am puzzled what we can do about that.
> Just googled a bit, seems other systems that use Akka (like Spark) have
> stumbled across that issue as well...
>
> A good point to ask would be the akka mailing list. I can did into this
> after the next release (2 weeks or so), if you want to help us speed this
> up, you could ping the Akka mailing list as well.
>
> Greetings,
> Stephan
>
>
>
>
> On Sun, Oct 4, 2015 at 7:21 AM, Henry Saputra 
> wrote:
>
>> Hi Emmanuel,
>>
>> Could you tell a bit how the proxy being setup?
>>
>> - Henry
>>
>> On Fri, Oct 2, 2015 at 1:55 PM, Emmanuel  wrote:
>> > When trying to access the JM through a proxy I get:
>> >
>> >
>> > 19:26:23,113 ERROR akka.remote.EndpointWriter
>> > - dropping message [class akka.actor.ActorSelectionMessage] for
>> non-local
>> > recipient [Actor[akka.tcp://flink@10.155.241.168:6123/]] arriving at
>> > [akka.tcp://flink@10.155.241.168:6123] inbound addresses are
>> > [akka.tcp://flink@10.152.1.107:6123]
>> >
>> > Is there a way to allow this look up through a proxy?
>> >
>> > Thanks
>> >
>> >
>>
>
>


Re: Processing S3 data with Apache Flink

2015-10-05 Thread Robert Metzger
Hi Kostia,

thank you for writing to the Flink mailing list. I actually started to try
out our S3 File system support after I saw your question on StackOverflow
[1].
I found that our S3 connector is very broken. I had to resolve two more
issues with it, before I was able to get the same exception you reported.

Another Flink commiter looked into the issue as well (it was confirmed as
well) but there was no solution [2].

So for now, I would say we have to assume that our S3 connector is not
working. I will start a separate discussion at the developer mailing list
to remove our S3 connector.

The good news is that you can just use Hadoop's S3 File System
implementation with Flink.

I used this Flink program to verify its working:

public class S3FileSystem {
   public static void main(String[] args) throws Exception {
  ExecutionEnvironment ee = ExecutionEnvironment.createLocalEnvironment();
  DataSet myLines =
ee.readTextFile("s3n://my-bucket-name/some-test-file.xml");
  myLines.print();
   }
}

also, you need to make a Hadoop configuration file available to Flink.
When running flink locally in your IDE, just create a "core-site.xml" in
the src/main/resource folder, with the following content:




fs.s3n.awsAccessKeyId
putKeyHere



fs.s3n.awsSecretAccessKey
putSecretHere


fs.s3n.impl
org.apache.hadoop.fs.s3native.NativeS3FileSystem



Maybe you are running on a cluster, then re-use the existing core-site.xml
file (= edit it) and point to the directory using Flink's
fs.hdfs.hadoopconf configuration option.

With these two things in place, you should be good to go.

[1]
http://stackoverflow.com/questions/32959790/run-apache-flink-with-amazon-s3
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-Amazon-S3-td946.html

On Mon, Oct 5, 2015 at 8:19 PM, Kostiantyn Kudriavtsev <
kudryavtsev.konstan...@gmail.com> wrote:

> Hi guys,
>
> I,m trying to get work Apache Flink 0.9.1 on EMR, basically to read
> data from S3. I tried the following path for data
> s3://mybucket.s3.amazonaws.com/folder, but it throws me the following
> exception:
>
> java.io.IOException: Cannot establish connection to Amazon S3:
> com.amazonaws.services.s3.model.AmazonS3Exception: The request signature
> we calculated does not match the signature you provided. Check your key
> and signing method. (Service: Amazon S3; Status Code: 403;
>
> I added access and secret keys, so the problem is not here. I=92m using
> standard region and gave read credential to everyone.
>
> Any ideas how can it be fixed?
>
> Thank you in advance,
> Kostia
>


Re: Error trying to access JM through proxy

2015-10-05 Thread Stephan Ewen
I think this is yet another problem caused by Akka's overly strict message
routing.

An actor system bound to a certain URL can only receive messages that are
sent to that exact URL. All other messages are dropped.

This has many problems:

  - Proxy routing (as described here, send to the proxy URL, receiver
recognizes only original URL)
  - Using hostname / IP interchangeably does not work (we solved this by
always putting IP addresses into URLs, never hostnames)
  - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still
no solution to that (but seems not too much of a restriction)

Since this is inherent in Akka, I am puzzled what we can do about that.
Just googled a bit, seems other systems that use Akka (like Spark) have
stumbled across that issue as well...

A good point to ask would be the akka mailing list. I can did into this
after the next release (2 weeks or so), if you want to help us speed this
up, you could ping the Akka mailing list as well.

Greetings,
Stephan




On Sun, Oct 4, 2015 at 7:21 AM, Henry Saputra 
wrote:

> Hi Emmanuel,
>
> Could you tell a bit how the proxy being setup?
>
> - Henry
>
> On Fri, Oct 2, 2015 at 1:55 PM, Emmanuel  wrote:
> > When trying to access the JM through a proxy I get:
> >
> >
> > 19:26:23,113 ERROR akka.remote.EndpointWriter
> > - dropping message [class akka.actor.ActorSelectionMessage] for non-local
> > recipient [Actor[akka.tcp://flink@10.155.241.168:6123/]] arriving at
> > [akka.tcp://flink@10.155.241.168:6123] inbound addresses are
> > [akka.tcp://flink@10.152.1.107:6123]
> >
> > Is there a way to allow this look up through a proxy?
> >
> > Thanks
> >
> >
>


Reading from multiple input files with fewer task slots

2015-10-05 Thread Pieter Hameete
Hello Flinkers!

I run into some strange behavior when reading from a folder of input files.

When the number of input files in the folder exceeds the number of task
slots I noticed that the size of my datasets varies with each run. It seems
as if the transformations don't wait for all input files to be read.

When I have equal or more task slots than there are files, there are no
problems.

I'm using a custom input format. Could there be a problem with my custom
input format, and if so what could I be forgetting?

Kind regards and thank you for your time!

Pieter


Re: Config files content read

2015-10-05 Thread Fabian Hueske
Hi Flavio,

I don't think this is a feature that needs to go into Flink core.
To me it looks like this be implemented as a utility method by anybody who
needs it without major effort.

Best, Fabian


2015-10-02 15:27 GMT+02:00 Flavio Pompermaier :

> Hi to all,
>
> in many of my jobs I have to read a config file that can be either on
> local fs either on hdfs.
> I'm looking for an intuitive API to read the content of such config files
> (JSON) before converting them to Java objects through jackson. Is there any
> Flink API to easily achieve this?
> I really like something like
>
>- String content = FileSystem.get(myFileUri).readAsString() or
>- String content = new Path(myFilePath).readAsString();
>
> but at the moment the only solution I found is something like:
>
> *StringBuffer content = new StringBuffer();*
> *Path path = new Path(myFilePath);*
> *FSDataInputStream stream =  FileSystem.get(path.toUri()).open(path);*
> *BufferedReader reader = new BufferedReader(new
> InputStreamReader(stream));*
> *String line;*
> *try {*
> * while ((line = reader.readLine()) != null) {*
> * content.append(line);*
> * }*
> *} finally {*
> * reader.close();*
> *}*
> *String contentStr = content.toString();*
>
>
> Am I the only one that need such a feature?
> Best,
> Flavio
>


Re: For each element in a dataset, do something with another dataset

2015-10-05 Thread Pieter Hameete
Hi Fabian,

I have a question regarding the first approach. Is there a benefit gained
from choosing a RichMapPartitionFunction over a RichMapFunction in this
case? I assume that each broadcasted dataset is sent only once to each task
manager?

If I would broadcast dataset B, then I could for each element a in A count
the number of elements in B that are smaller than a and output a tuple in a
map operation. This would also save me a step in aggregating the results?

Kind regards,

Pieter

2015-09-30 12:44 GMT+02:00 Pieter Hameete :

> Hi Gabor, Fabian,
>
> thank you for your suggestions. I am intending to scale up so that I'm
> sure that both A and B won't fit in memory. I'll see if I can come up with
> a nice way to partition the datasets but if that will take too much time
> I'll just have to accept that it wont work on large datasets. I'll let you
> know if I managed to work something out, but I wont work on it until the
> weekend :-)
>
> Cheers again,
>
> Pieter
>
> 2015-09-30 12:28 GMT+02:00 Gábor Gévay :
>
>> Hello,
>>
>> Alternatively, if dataset B fits in memory, but dataset A doesn't,
>> then you can do it with broadcasting B to a RichMapPartitionFunction
>> on A:
>> In the open method of mapPartition, you sort B. Then, for each element
>> of A, you do a binary search in B, and look at the index found by the
>> binary search, which will be the count that you are looking for.
>>
>> Best,
>> Gabor
>>
>>
>>
>> 2015-09-30 11:20 GMT+02:00 Fabian Hueske :
>> > The idea is to partition both datasets by range.
>> > Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
>> > [1,2,3] and p2: [4,5,6].
>> > Each partition is given to a different instance of a MapPartition
>> operator
>> > (this is a bit tricky, because you cannot use broadcastSet. You could
>> load
>> > the corresponding partition it in the open() function from HDFS for
>> > example).
>> >
>> > DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
>> > partition 1, everything > 3 goes to p2. You can partition a dataset by
>> range
>> > using the partitionCustom() function. The partitioned dataset is given
>> to
>> > the mapPartition operator that loaded a partition of dataset A in each
>> task
>> > instance.
>> > You do the counting just like before (sorting the partition of dataset
>> A,
>> > binary sort, long[]), but add an additional count for the complete
>> partition
>> > (basically count all elements that arrive in the task instance).
>> >
>> > If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1
>> would
>> > be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
>> > Now you need to compute the final count by adding the "all" counts of
>> the
>> > lower partitions to the counts of the "higher" partitions, i.e., add
>> all:5
>> > of p1 to all counts for p2.
>> >
>> > This approach requires to know the value range and distribution of the
>> > values which makes it a bit difficult. I guess you'll get the best
>> > performance, if you partition in a way, that you have about equally
>> sized
>> > partitions of dataset B with the constraint that the corresponding
>> > partitions of A fit into memory.
>> >
>> > As I said, its a bit cumbersome. I hope you could follow my explanation.
>> > Please ask if something is not clear ;-)
>> >
>> > 2015-09-30 10:46 GMT+02:00 Pieter Hameete :
>> >>
>> >> Hi Fabian,
>> >>
>> >> thanks for your tips!
>> >>
>> >> Do you have some pointers for getting started with the 'tricky range
>> >> partitioning'? I am quite keen to get this working with large datasets
>> ;-)
>> >>
>> >> Cheers,
>> >>
>> >> Pieter
>> >>
>> >> 2015-09-30 10:24 GMT+02:00 Fabian Hueske :
>> >>>
>> >>> Hi Pieter,
>> >>>
>> >>> cross is indeed too expensive for this task.
>> >>>
>> >>> If dataset A fits into memory, you can do the following: Use a
>> >>> RichMapPartitionFunction to process dataset B and add dataset A as a
>> >>> broadcastSet. In the open method of mapPartition, you can load the
>> >>> broadcasted set and sort it by a.propertyX and initialize a long[]
>> for the
>> >>> counts. For each element of dataset B, you do a binary search on the
>> sorted
>> >>> dataset A and increase all counts up to the position in the sorted
>> list.
>> >>> After all elements of dataset B have been processed, return the
>> counts from
>> >>> the long[].
>> >>>
>> >>> If dataset A doesn't fit into memory, things become more cumbersome
>> and
>> >>> we need to play some tricky with range partitioning...
>> >>>
>> >>> Let me know, if you have questions,
>> >>> Fabian
>> >>>
>> >>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete :
>> 
>>  Good day everyone,
>> 
>>  I am looking for a good way to do the following:
>> 
>>  I have dataset A and dataset B, and for each element in dataset A I
>>  would like to filter dataset B and obtain the size of the result. To
>> 

Re: Reading from multiple input files with fewer task slots

2015-10-05 Thread Stephan Ewen
I assume this concerns the streaming API?

Can you share your program and/or the custom input format code?

On Mon, Oct 5, 2015 at 12:33 PM, Pieter Hameete  wrote:

> Hello Flinkers!
>
> I run into some strange behavior when reading from a folder of input files.
>
> When the number of input files in the folder exceeds the number of task
> slots I noticed that the size of my datasets varies with each run. It seems
> as if the transformations don't wait for all input files to be read.
>
> When I have equal or more task slots than there are files, there are no
> problems.
>
> I'm using a custom input format. Could there be a problem with my custom
> input format, and if so what could I be forgetting?
>
> Kind regards and thank you for your time!
>
> Pieter
>


Re: Reading from multiple input files with fewer task slots

2015-10-05 Thread Pieter Hameete
Hi Stephen,

it concerns the DataSet API.

The program im running can be found at
https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/performance/xmark/XMarkQuery11.scala
The Custom Input Format at
https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/parsing/xml/XML2DawnInputFormat.java

Cheers!

2015-10-05 12:38 GMT+02:00 Stephan Ewen :

> I assume this concerns the streaming API?
>
> Can you share your program and/or the custom input format code?
>
> On Mon, Oct 5, 2015 at 12:33 PM, Pieter Hameete 
> wrote:
>
>> Hello Flinkers!
>>
>> I run into some strange behavior when reading from a folder of input
>> files.
>>
>> When the number of input files in the folder exceeds the number of task
>> slots I noticed that the size of my datasets varies with each run. It seems
>> as if the transformations don't wait for all input files to be read.
>>
>> When I have equal or more task slots than there are files, there are no
>> problems.
>>
>> I'm using a custom input format. Could there be a problem with my custom
>> input format, and if so what could I be forgetting?
>>
>> Kind regards and thank you for your time!
>>
>> Pieter
>>
>
>


Re: For each element in a dataset, do something with another dataset

2015-10-05 Thread Fabian Hueske
Hi Pieter,

a FlatMapFunction can only return values when the map() method is called.
However, in your use case, you would like to return values *after* the
function was called the last time. This is not possible with a
FlatMapFunction, because you cannot identify the last map() call.
The MapPartitionFunction is called only once with an iterator over the
whole partition. Hence you can return values after the iterator was fully
consumed.

The broadcast set is sent only once in both cases.

If it is possible to broadcast dataset B, you can also use a MapFunction
and don't need to store the count values.

Best, Fabian

2015-10-05 11:53 GMT+02:00 Pieter Hameete :

> Hi Fabian,
>
> I have a question regarding the first approach. Is there a benefit gained
> from choosing a RichMapPartitionFunction over a RichMapFunction in this
> case? I assume that each broadcasted dataset is sent only once to each task
> manager?
>
> If I would broadcast dataset B, then I could for each element a in A count
> the number of elements in B that are smaller than a and output a tuple in a
> map operation. This would also save me a step in aggregating the results?
>
> Kind regards,
>
> Pieter
>
> 2015-09-30 12:44 GMT+02:00 Pieter Hameete :
>
>> Hi Gabor, Fabian,
>>
>> thank you for your suggestions. I am intending to scale up so that I'm
>> sure that both A and B won't fit in memory. I'll see if I can come up with
>> a nice way to partition the datasets but if that will take too much time
>> I'll just have to accept that it wont work on large datasets. I'll let you
>> know if I managed to work something out, but I wont work on it until the
>> weekend :-)
>>
>> Cheers again,
>>
>> Pieter
>>
>> 2015-09-30 12:28 GMT+02:00 Gábor Gévay :
>>
>>> Hello,
>>>
>>> Alternatively, if dataset B fits in memory, but dataset A doesn't,
>>> then you can do it with broadcasting B to a RichMapPartitionFunction
>>> on A:
>>> In the open method of mapPartition, you sort B. Then, for each element
>>> of A, you do a binary search in B, and look at the index found by the
>>> binary search, which will be the count that you are looking for.
>>>
>>> Best,
>>> Gabor
>>>
>>>
>>>
>>> 2015-09-30 11:20 GMT+02:00 Fabian Hueske :
>>> > The idea is to partition both datasets by range.
>>> > Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
>>> > [1,2,3] and p2: [4,5,6].
>>> > Each partition is given to a different instance of a MapPartition
>>> operator
>>> > (this is a bit tricky, because you cannot use broadcastSet. You could
>>> load
>>> > the corresponding partition it in the open() function from HDFS for
>>> > example).
>>> >
>>> > DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
>>> > partition 1, everything > 3 goes to p2. You can partition a dataset by
>>> range
>>> > using the partitionCustom() function. The partitioned dataset is given
>>> to
>>> > the mapPartition operator that loaded a partition of dataset A in each
>>> task
>>> > instance.
>>> > You do the counting just like before (sorting the partition of dataset
>>> A,
>>> > binary sort, long[]), but add an additional count for the complete
>>> partition
>>> > (basically count all elements that arrive in the task instance).
>>> >
>>> > If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1
>>> would
>>> > be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
>>> > Now you need to compute the final count by adding the "all" counts of
>>> the
>>> > lower partitions to the counts of the "higher" partitions, i.e., add
>>> all:5
>>> > of p1 to all counts for p2.
>>> >
>>> > This approach requires to know the value range and distribution of the
>>> > values which makes it a bit difficult. I guess you'll get the best
>>> > performance, if you partition in a way, that you have about equally
>>> sized
>>> > partitions of dataset B with the constraint that the corresponding
>>> > partitions of A fit into memory.
>>> >
>>> > As I said, its a bit cumbersome. I hope you could follow my
>>> explanation.
>>> > Please ask if something is not clear ;-)
>>> >
>>> > 2015-09-30 10:46 GMT+02:00 Pieter Hameete :
>>> >>
>>> >> Hi Fabian,
>>> >>
>>> >> thanks for your tips!
>>> >>
>>> >> Do you have some pointers for getting started with the 'tricky range
>>> >> partitioning'? I am quite keen to get this working with large
>>> datasets ;-)
>>> >>
>>> >> Cheers,
>>> >>
>>> >> Pieter
>>> >>
>>> >> 2015-09-30 10:24 GMT+02:00 Fabian Hueske :
>>> >>>
>>> >>> Hi Pieter,
>>> >>>
>>> >>> cross is indeed too expensive for this task.
>>> >>>
>>> >>> If dataset A fits into memory, you can do the following: Use a
>>> >>> RichMapPartitionFunction to process dataset B and add dataset A as a
>>> >>> broadcastSet. In the open method of mapPartition, you can load the
>>> >>> broadcasted set and sort it by a.propertyX and initialize a long[]
>>> for the
>>> >>> 

Re: Reading from multiple input files with fewer task slots

2015-10-05 Thread Stephan Ewen
If you have more files than task slots, then some tasks will get multiple
files. That means that open() and close() are called multiple times on the
input format.

Make sure that your input format tolerates that and does not get confused
with lingering state (maybe create a new SimpleInputProjection as well)

On Mon, Oct 5, 2015 at 12:41 PM, Pieter Hameete  wrote:

> Hi Stephen,
>
> it concerns the DataSet API.
>
> The program im running can be found at
> https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/performance/xmark/XMarkQuery11.scala
> The Custom Input Format at
> https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/parsing/xml/XML2DawnInputFormat.java
>
> Cheers!
>
> 2015-10-05 12:38 GMT+02:00 Stephan Ewen :
>
>> I assume this concerns the streaming API?
>>
>> Can you share your program and/or the custom input format code?
>>
>> On Mon, Oct 5, 2015 at 12:33 PM, Pieter Hameete 
>> wrote:
>>
>>> Hello Flinkers!
>>>
>>> I run into some strange behavior when reading from a folder of input
>>> files.
>>>
>>> When the number of input files in the folder exceeds the number of task
>>> slots I noticed that the size of my datasets varies with each run. It seems
>>> as if the transformations don't wait for all input files to be read.
>>>
>>> When I have equal or more task slots than there are files, there are no
>>> problems.
>>>
>>> I'm using a custom input format. Could there be a problem with my custom
>>> input format, and if so what could I be forgetting?
>>>
>>> Kind regards and thank you for your time!
>>>
>>> Pieter
>>>
>>
>>
>


Re: Reading from multiple input files with fewer task slots

2015-10-05 Thread Pieter Hameete
Hi Stephen,

it was not the SimpleInputProjection, because that is a stateless object.
The boolean endReached was not reset upon opening a new file however, so
for each consecutive file no records were parsed.

Thanks alot for your help!

- Pieter

2015-10-05 12:50 GMT+02:00 Stephan Ewen :

> If you have more files than task slots, then some tasks will get multiple
> files. That means that open() and close() are called multiple times on the
> input format.
>
> Make sure that your input format tolerates that and does not get confused
> with lingering state (maybe create a new SimpleInputProjection as well)
>
> On Mon, Oct 5, 2015 at 12:41 PM, Pieter Hameete 
> wrote:
>
>> Hi Stephen,
>>
>> it concerns the DataSet API.
>>
>> The program im running can be found at
>> https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/performance/xmark/XMarkQuery11.scala
>> The Custom Input Format at
>> https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/parsing/xml/XML2DawnInputFormat.java
>>
>> Cheers!
>>
>> 2015-10-05 12:38 GMT+02:00 Stephan Ewen :
>>
>>> I assume this concerns the streaming API?
>>>
>>> Can you share your program and/or the custom input format code?
>>>
>>> On Mon, Oct 5, 2015 at 12:33 PM, Pieter Hameete 
>>> wrote:
>>>
 Hello Flinkers!

 I run into some strange behavior when reading from a folder of input
 files.

 When the number of input files in the folder exceeds the number of task
 slots I noticed that the size of my datasets varies with each run. It seems
 as if the transformations don't wait for all input files to be read.

 When I have equal or more task slots than there are files, there are no
 problems.

 I'm using a custom input format. Could there be a problem with my
 custom input format, and if so what could I be forgetting?

 Kind regards and thank you for your time!

 Pieter

>>>
>>>
>>
>


Re: Reading from multiple input files with fewer task slots

2015-10-05 Thread Stephan Ewen
Okay, nice to hear it works out!

On Mon, Oct 5, 2015 at 1:50 PM, Pieter Hameete  wrote:

> Hi Stephen,
>
> it was not the SimpleInputProjection, because that is a stateless object.
> The boolean endReached was not reset upon opening a new file however, so
> for each consecutive file no records were parsed.
>
> Thanks alot for your help!
>
> - Pieter
>
> 2015-10-05 12:50 GMT+02:00 Stephan Ewen :
>
>> If you have more files than task slots, then some tasks will get multiple
>> files. That means that open() and close() are called multiple times on the
>> input format.
>>
>> Make sure that your input format tolerates that and does not get confused
>> with lingering state (maybe create a new SimpleInputProjection as well)
>>
>> On Mon, Oct 5, 2015 at 12:41 PM, Pieter Hameete 
>> wrote:
>>
>>> Hi Stephen,
>>>
>>> it concerns the DataSet API.
>>>
>>> The program im running can be found at
>>> https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/performance/xmark/XMarkQuery11.scala
>>> The Custom Input Format at
>>> https://github.com/PHameete/dawn-flink/blob/development/src/main/scala/wis/dawnflink/parsing/xml/XML2DawnInputFormat.java
>>>
>>> Cheers!
>>>
>>> 2015-10-05 12:38 GMT+02:00 Stephan Ewen :
>>>
 I assume this concerns the streaming API?

 Can you share your program and/or the custom input format code?

 On Mon, Oct 5, 2015 at 12:33 PM, Pieter Hameete 
 wrote:

> Hello Flinkers!
>
> I run into some strange behavior when reading from a folder of input
> files.
>
> When the number of input files in the folder exceeds the number of
> task slots I noticed that the size of my datasets varies with each run. It
> seems as if the transformations don't wait for all input files to be read.
>
> When I have equal or more task slots than there are files, there are
> no problems.
>
> I'm using a custom input format. Could there be a problem with my
> custom input format, and if so what could I be forgetting?
>
> Kind regards and thank you for your time!
>
> Pieter
>


>>>
>>
>


Re: Running Flink on an Amazon Elastic MapReduce cluster

2015-10-05 Thread Maximilian Michels
Hi Hanen,

It appears that the environment variables are not set. Thus, Flink cannot
pick up the Hadoop configuration. Could you please paste the output of
"echo $HADOOP_HOME" and "echo $HADOOP_CONF_DIR" here?

In any case, your problem looks similar to the one discussed here:
http://stackoverflow.com/questions/31991934/cannot-use-apache-flink-in-amazon-emr
Please execute

export HADOOP_CONF_DIR=/etc/hadoop/conf

and you should be good to go.

Cheers,
Max

On Mon, Oct 5, 2015 at 3:37 PM, Hanen Borchani  wrote:

> Hi all,
>
> I tried to start a Yarn session on an Amazon EMR cluster with Hadoop 2.6.0
> following the instructions provided in this link and using Flink 0.9.1 for
> Hadoop 2.6.0
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/yarn_setup.html
>
> Running the following command line: ./bin/yarn-session.sh -n 2 -jm 1024
> -tm 2048 generated the following error message
>
>  
>
> 12:53:47,633 INFO  org.apache.hadoop.yarn.client.RMProxy
>- Connecting to ResourceManager at /0.0.0.0:8032
>
> 12:53:47,805 WARN
> org.apache.hadoop.util.NativeCodeLoader   - Unable to
> load native-hadoop library for your platform... using builtin-java classes
> where applicable
>
> 12:53:48,226 WARN
> org.apache.flink.yarn.FlinkYarnClient - Neither the
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink
> YARN Client needs one of these to be set to properly load the Hadoop
> configuration for accessing YARN.
>
> 12:53:48,227 INFO
> org.apache.flink.yarn.FlinkYarnClient - Using
> values:
>
> 12:53:48,228 INFO
> org.apache.flink.yarn.FlinkYarnClient - TaskManager
> count = 2
>
> 12:53:48,229 INFO
> org.apache.flink.yarn.FlinkYarnClient - JobManager
> memory = 1024
>
> 12:53:48,229 INFO
> org.apache.flink.yarn.FlinkYarnClient - TaskManager
> memory = 2048
>
> 12:53:48,580 WARN  org.apache.flink.yarn.FlinkYarnClient
>   - The file system scheme is 'file'. This indicates that the
> specified Hadoop configuration path is wrong and the sytem is using the
> default Hadoop configuration values.The Flink YARN client needs to store
> its files in a distributed file system
>
> 12:53:48,593 INFO
> org.apache.flink.yarn.Utils   - Copying
> from file:/home/hadoop/flink-0.9.1/lib/flink-dist-0.9.1.jar to
> file:/home/hadoop/.flink/application_1444046049303_0008/flink-dist-0.9.1.jar
>
> 12:53:49,245 INFO
> org.apache.flink.yarn.Utils   - Copying
> from /home/hadoop/flink-0.9.1/conf/flink-conf.yaml to
> file:/home/hadoop/.flink/application_1444046049303_0008/flink-conf.yaml
>
> 12:53:49,251 INFO  org.apache.flink.yarn.Utils
>- Copying from
> file:/home/hadoop/flink-0.9.1/lib/flink-python-0.9.1.jar to
> file:/home/hadoop/.flink/application_1444046049303_0008/flink-python-0.9.1.jar
>
> 12:53:49,278 INFO
> org.apache.flink.yarn.Utils   - Copying
> from file:/home/hadoop/flink-0.9.1/conf/logback.xml to
> file:/home/hadoop/.flink/application_1444046049303_0008/logback.xml
>
> 12:53:49,285 INFO
> org.apache.flink.yarn.Utils   - Copying
> from file:/home/hadoop/flink-0.9.1/conf/log4j.properties to
> file:/home/hadoop/.flink/application_1444046049303_0008/log4j.properties
>
> 12:53:49,304 INFO
> org.apache.flink.yarn.FlinkYarnClient - Submitting
> application master application_1444046049303_0008
>
> 12:53:49,347 INFO
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
> application application_1444046049303_0008
>
> 12:53:49,347 INFO
> org.apache.flink.yarn.FlinkYarnClient - Waiting for
> the cluster to be allocated
>
> 12:53:49,349 INFO
> org.apache.flink.yarn.FlinkYarnClient - Deploying
> cluster, current state ACCEPTED
>
> 12:53:50,351 INFO
> org.apache.flink.yarn.FlinkYarnClient - Deploying
> cluster, current state ACCEPTED
>
> Error while deploying YARN cluster: The YARN application unexpectedly
> switched to state FAILED during deployment.
>
> Diagnostics from YARN: Application application_1444046049303_0008 failed 1
> times due to AM Container for appattempt_1444046049303_0008_01 exited
> with  exitCode: -1000
>
> For more detailed output, check application tracking page:http://
> ip-172-31-10-16.us-west-2.compute.internal:20888/proxy/application_1444046049303_0008/Then,
> click on links to logs of each attempt.
>
> Diagnostics: File
> file:/home/hadoop/.flink/application_1444046049303_0008/flink-conf.yaml
> does not exist
>
> java.io.FileNotFoundException: File
> file:/home/hadoop/.flink/application_1444046049303_0008/flink-conf.yaml
> does not exist
>
>  at
>