Re: MatrixMultiplication

2016-01-25 Thread Till Rohrmann
Hi Lydia,

Since matrix multiplication is O(n^3), I would assume that it would simply
take 1000 times longer than the multiplication of the 100 x 100 matrix.
Have you waited so long to see whether it completes or is there another
problem?

Cheers,
Till

On Mon, Jan 25, 2016 at 2:13 PM, Lydia Ickler 
wrote:

> Hi,
>
> I want do a simple MatrixMultiplication and use the following code (see
> bottom).
> For matrices 50x50 or 100x100 it is no problem. But already with matrices
> of 1000x1000 it would not work anymore and gets stuck in the joining part.
> What am I doing wrong?
>
> Best regards,
> Lydia
>
> package de.tuberlin.dima.aim3.assignment3;
>
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.CsvReader;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.operators.GroupReduceOperator;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.api.java.DataSet;
>
>
> public class MatrixMultiplication {
>
>static String input = null;
>static String output = null;
>
>public void run() throws Exception {
>   ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>
>   DataSet> matrixA = readMatrix(env, 
> input);
>
>   matrixA.join(matrixA).where(1).equalTo(0)
> .map(new ProjectJoinResultMapper()).groupBy(0, 
> 1).sum(2).writeAsCsv(output);
>
>
>   env.execute();
>}
>
>
>
>public static DataSource> 
> readMatrix(ExecutionEnvironment env,
>  String filePath) {
>   CsvReader csvReader = env.readCsvFile(filePath);
>   csvReader.fieldDelimiter(',');
>   csvReader.includeFields("fttt");
>   return csvReader.types(Integer.class, Integer.class, Double.class);
>}
>
>public static final class ProjectJoinResultMapper implements
> MapFunction,
>Tuple3>,
>   Tuple3> {
>   @Override
>   public Tuple3 map(
> Tuple2, Tuple3 Double>> value)
> throws Exception {
>  Integer row = value.f0.f0;
>  Integer column = value.f1.f1;
>  Double product = value.f0.f2 * value.f1.f2;
>  return new Tuple3(row, column, product);
>   }
>}
>
>
>public static void main(String[] args) throws Exception {
>   if(args.length<2){
>  System.err.println("Usage: MatrixMultiplication   path>");
>  System.exit(0);
>   }
>   input = args[0];
>   output = args[1];
>   new MatrixMultiplication().run();
>}
>
> }
>
>
>


Re: Redeployements and state

2016-01-25 Thread Stephan Ewen
Hi Niels!

There is a slight mismatch between your thoughts and the current design,
but not much.

What you describe (at the start of the job, the latest checkpoint is
automatically loaded) is basically what the high-availability setup does if
the master dies. The new master loads all jobs and continues them from the
latest checkpoint.
If you run an HA setup, and you stop/restart your jobs not by stopping the
jobs, but by killing the cluster, you should get that behavior.

Once a job is properly stopped, and you start a new job, there is no way
for Flink to tell that this is in fact the same job and it should resume
from where the recently stopped. Also, "same" should be a fuzzy "same", to
allow for slight changes in the job (bug fixes). Safepoints let you put the
persistent part of the job somewhere, to tell a new job where to pick up
from.
  - Makes it work in non-HA setups
  - Allows you to keep multiple savepoint (like "versions", say one per day
or so) to roll back to
  - Can have multiple versions of the same jobs resuming from one savepoint
(what-if or A/B tests, or seamless version upgrades)


There is something on the roadmap that would make your use case very easy:
"StopWithSavepoint"

There is an open pull request to cleanly stop() a streaming program. The
next enhancement is to stop it and let it draw a savepoint as part of that.
Then you can simply script a stop/start like that:

# stop with savepoint
bin/flink stop -s  jobid

# resume
bin/flink run -s  job


Hope that helps,
Stephan


On Fri, Jan 22, 2016 at 3:06 PM, Niels Basjes  wrote:

> Hi,
>
> @Max: Thanks for the new URL. I noticed that a lot (in fact almost all) of
> links in the new manuals lead to 404 errors. Maybe you should run an
> automated test to find them all.
>
> I did a bit of reading about the savepoints and that in fact they are
> written as "Allow to trigger checkpoints manually".
>
> Let me sketch what I think I need:
> 1) I need recovery of the topology in case of partial failure (i.e. a
> single node dies).
> 2) I need recovery of the topology in case of full topology failure (i.e.
> Hadoop security tokens cause the entire thing to die, or I need to deploy a
> fixed version of my software).
>
> Now what I understand is that the checkpoints are managed by Flink and as
> such allow me to run the topology without any manual actions. These are
> cleaned automatically when no longer needed.
> These savepoints however appear to need external 'intervention'; they are
> intended as 'manual'. So in addition to my topology I need something extra
> that periodically (i.e. every minute) fires a command to persist a
> checkpoint into a savepoint and to cleanup the 'old' ones.
>
> What I want is something that works roughly as follows:
> 1) I configure everything (i.e. assign Ids configure the checkpoint
> directory, etc.)
> 2) The framework saves and cleans the checkpoints automatically when the
> topology is running.
> 3) I simply start the topology without any special options.
>
> My idea is essentially that at the startup of a topology the system looks
> at the configured checkpoint persistance and recovers the most recent one.
>
> Apparently there is a mismatch between what I think is useful and what has
> been implemented so far.
> Am I missing something or should I submit this as a Jira ticket for a
> later version?
>
> Niels Basjes
>
>
>
>
>
>
> On Mon, Jan 18, 2016 at 12:13 PM, Maximilian Michels 
> wrote:
>
>> The documentation layout changed in the master. Then new URL:
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>>
>> On Thu, Jan 14, 2016 at 2:21 PM, Niels Basjes  wrote:
>> > Yes, that is exactly the type of solution I was looking for.
>> >
>> > I'll dive into this.
>> > Thanks guys!
>> >
>> > Niels
>> >
>> > On Thu, Jan 14, 2016 at 11:55 AM, Ufuk Celebi  wrote:
>> >>
>> >> Hey Niels,
>> >>
>> >> as Gabor wrote, this feature has been merged to the master branch
>> >> recently.
>> >>
>> >> The docs are online here:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/savepoints.html
>> >>
>> >> Feel free to report back your experience with it if you give it a try.
>> >>
>> >> – Ufuk
>> >>
>> >> > On 14 Jan 2016, at 11:09, Gábor Gévay  wrote:
>> >> >
>> >> > Hello,
>> >> >
>> >> > You are probably looking for this feature:
>> >> > https://issues.apache.org/jira/browse/FLINK-2976
>> >> >
>> >> > Best,
>> >> > Gábor
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > 2016-01-14 11:05 GMT+01:00 Niels Basjes :
>> >> >> Hi,
>> >> >>
>> >> >> I'm working on a streaming application using Flink.
>> >> >> Several steps in the processing are state-full (I use custom Windows
>> >> >> and
>> >> >> state-full operators ).
>> >> >>
>> >> >> Now if during a normal run an worker fails the checkpointing system
>> >> >> will be
>> >> >> used to recover.
>> >> >>
>> >> >> But what if the 

Hello, a question about Dashborad in Flink

2016-01-25 Thread Philip Lee
Hello,

According to
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Web-Dashboard-Completed-Job-history-td4067.html,
I cannot retrieve the job history from Dashboard after turnning off JM.

But as Fabian mentioned here,
"However, you can query all stats that are displayed by the dashboard via a
REST API [1] while the JM is running and save them yourself. This way you
can analyze the data also after the JM was stopped" could you explain about
this sentence in detail.

I want to evaluate timeline view of each function after a job is done.

Thanks,
Phil


Fwd: Flink 0.10.1 and HBase

2016-01-25 Thread Christophe Salperwyck
Hi all,

I have an issue with Flink 0.10.1, HBase and Guava, it seems to be related
to this JIRA:
https://issues.apache.org/jira/browse/FLINK-3158

If I removed the com.google.common.* class files from the jar file, it
works then.

Is there any other way to deal with this problem?

Thanks for your work!


Re: Flink 0.10.1 and HBase

2016-01-25 Thread Robert Metzger
Hi Christophe,

I'm sorry that you ran into the issue. Right now, there is no better fix.
For the next releases, I'll take care that this doesn't happen again.

Maybe (you are the third user who (however implicitly) requested publicly
for a flink 0.10.2 release), we'll do a 0.10.2 before 1.0.0.


On Mon, Jan 25, 2016 at 3:46 PM, Christophe Salperwyck <
christophe.salperw...@gmail.com> wrote:

> Hi all,
>
> I have an issue with Flink 0.10.1, HBase and Guava, it seems to be related
> to this JIRA:
> https://issues.apache.org/jira/browse/FLINK-3158
>
> If I removed the com.google.common.* class files from the jar file, it
> works then.
>
> Is there any other way to deal with this problem?
>
> Thanks for your work!
>
>


maxtime / watermark for GlobaWindow

2016-01-25 Thread Radu Tudoran
Hi,

I am using a global window to collect some events. I use a trigger to fire the 
processing.
Is there any way to get the time of the event that has triggered the processing.

I am asking this as the getMaxTime() field of the GlobalWindow returns MaxLong.


The code skeleton is:

stream
   .windowAll(GlobalWindows.create())
   .trigger( new MyTrigger())
   .apply( new AllWindowFunction() {
  @Override
  public void apply(GlobalWindow arg0,
Iterable< Tuple1, > arg1,
Collector arg2) 
throws Exception {

// - get the even timestamp



  }
   })



Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!



continous time triger

2016-01-25 Thread Radu Tudoran
Re-Hi,

I have another question regarding the triggering of the processing of a window. 
Can this be done in some way at specific time intervals, independent of whether 
 an event has been received or not, via a trigger?

The reason why I am considering a trigger rather than timeWindow(All) is that 
timeWindow will end up generating multiple windows and duplicating data, while 
having the option from the trigger to actually fire the processing at certain 
times, independent of when the events arrived) would enable to operate with a 
single window.

Regards,

Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!



Re: Reading Binary Data (Matrix) with Flink

2016-01-25 Thread Fabian Hueske
Hi Saliya,

the number of parallel splits is controlled by the number of input splits
returned by the InputFormat.createInputSplits() method. This method
receives a parameter minNumSplits with is equal to the number of DataSource
tasks.

Flink handles input splits a bit different from Hadoop. In Hadoop, each
input split corresponds to one map task. In Flink you have a fixed number
of DataSource tasks and input splits are lazily distributed to source
tasks. If you have more splits than tasks, a data source requests a new
split when it is done with its last split until all splits are assigned. If
your createInputSplits method returns less splits than minNumSplits, some
source tasks won't receive a split.

If you read files from a local FS in a distributed (multi-node) setup, you
have to be careful. Each node must have an exact copy of the data at
exactly the same location. Otherwise, it won't work.

Best, Fabian

2016-01-25 16:46 GMT+01:00 Saliya Ekanayake :

> Hi Fabian,
>
> Thank you for the information.
>
> So, is there a way I can get the task number within the InputFormat? That
> way I can use it to offset the block of rows.
>
> The file size is large to fit in a single process' memory, so the current
> setup in MPI and Hadoop use the rank (task number) info to memory map the
> corresponding block of rows. In our experiments, we found this approach to
> be the fastest because of the memory mapping rather buffered reads. Also,
> the file is replicated across nodes and the reading (mapping) happens only
> once.
>
> Thank you,
> Saliya
>
> On Mon, Jan 25, 2016 at 4:38 AM, Fabian Hueske  wrote:
>
>> Hi Saliya,
>>
>> yes that is possible, however the requirements for reading a binary file
>> from local fs are basically the same as for reading it from HDSF.
>> In order to be able to start reading different sections of a file in
>> parallel, you need to know the different starting positions. This can be
>> done by either having fixed offsets for blocks or adding some meta
>> information for the block start positions. InputFormats can divide the work
>> of reading a file by generating multiple input splits. Each input split
>> defines the file, the start offset and the length to read.
>>
>> However, are you sure that reading a file in parallel will be faster than
>> reading it sequentially?
>> At least for HDDs, IO-bound workloads with "random" reading patterns are
>> usually much slower than sequential reads.
>>
>> Cheers, Fabian
>>
>> 2016-01-24 19:10 GMT+01:00 Suneel Marthi :
>>
>>> There should be a env.readbinaryfile() IIRC, check that
>>>
>>> Sent from my iPhone
>>>
>>> On Jan 24, 2016, at 12:44 PM, Saliya Ekanayake 
>>> wrote:
>>>
>>> Thank you for the response on this, but I still have some doubt. Simply,
>>> the files is not in HDFS, it's in local storage. In Flink if I run the
>>> program with, say 5 parallel tasks, what I would like to do is to read a
>>> block of rows in each task as shown below. I looked at the simple CSV
>>> reader and was thinking to create a custom one like that, but I would need
>>> to know the task number to read the relevant block. Is this possible?
>>>
>>> 
>>>
>>> Thank you,
>>> Saliya
>>>
>>> On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann 
>>> wrote:
>>>
 With readHadoopFile you can use all of Hadoop’s FileInputFormats and
 thus you can also do everything with Flink, what you can do with Hadoop.
 Simply take the same Hadoop FileInputFormat which you would take for
 your MapReduce job.

 Cheers,
 Till
 ​

 On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake 
 wrote:

> Thank you, I saw the readHadoopFile, but I was not sure how it can be
> used to the following, which is what I need. The logic of the code 
> requires
> an entire row to operate on, so in our current implementation with P 
> tasks,
> each of them will read a rectangular block of (N/P) x N from the matrix. 
> Is
> this possible with readHadoopFile? Also, the file may not be in hdfs, so 
> is
> it possible to refer to local disk in doing this?
>
> Thank you
>
> On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park 
> wrote:
>
>> Hi Saliya,
>>
>> You can use the input format from Hadoop in Flink by using
>> readHadoopFile method. The method returns a dataset which of type is
>> Tuple2. Note that MapReduce equivalent transformation in 
>> Flink
>> is composed of map, groupBy, and reduceGroup.
>>
>> > On Jan 20, 2016, at 3:04 PM, Suneel Marthi 
>> wrote:
>> >
>> > Guess u r looking for Flink's BinaryInputFormat to be able to read
>> blocks of data from HDFS
>> >
>> >
>> 

Re: MatrixMultiplication

2016-01-25 Thread Lydia Ickler
Hi Till,

thanks for your reply :)
Yes, it finished after ~27 minutes…

Best regards, 
Lydia

> Am 25.01.2016 um 14:27 schrieb Till Rohrmann :
> 
> Hi Lydia,
> 
> Since matrix multiplication is O(n^3), I would assume that it would simply 
> take 1000 times longer than the multiplication of the 100 x 100 matrix. Have 
> you waited so long to see whether it completes or is there another problem?
> 
> Cheers,
> Till
> 
> On Mon, Jan 25, 2016 at 2:13 PM, Lydia Ickler  > wrote:
> Hi, 
> 
> I want do a simple MatrixMultiplication and use the following code (see 
> bottom).
> For matrices 50x50 or 100x100 it is no problem. But already with matrices of 
> 1000x1000 it would not work anymore and gets stuck in the joining part. 
> What am I doing wrong?
> 
> Best regards, 
> Lydia
> 
> package de.tuberlin.dima.aim3.assignment3;
> 
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.CsvReader;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.operators.GroupReduceOperator;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.api.java.DataSet;
> 
> 
> public class MatrixMultiplication {
> 
>static String input = null;
>static String output = null;
> 
>public void run() throws Exception {
>   ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> 
>   DataSet> matrixA = readMatrix(env, 
> input);
> 
>   matrixA.join(matrixA).where(1).equalTo(0)
> .map(new ProjectJoinResultMapper()).groupBy(0, 
> 1).sum(2).writeAsCsv(output);
> 
>  
>   env.execute();
>}
> 
> 
> 
>public static DataSource> 
> readMatrix(ExecutionEnvironment env,
>  String filePath) {
>   CsvReader csvReader = env.readCsvFile(filePath);
>   csvReader.fieldDelimiter(',');
>   csvReader.includeFields("fttt");
>   return csvReader.types(Integer.class, Integer.class, Double.class);
>}
> 
>public static final class ProjectJoinResultMapper implements
> MapFunction,
>Tuple3>,
>   Tuple3> {
>   @Override
>   public Tuple3 map(
> Tuple2, Tuple3 Double>> value)
> throws Exception {
>  Integer row = value.f0.f0;
>  Integer column = value.f1.f1;
>  Double product = value.f0.f2 * value.f1.f2;
>  return new Tuple3(row, column, product);
>   }
>}
> 
>   
>public static void main(String[] args) throws Exception {
>   if(args.length<2){
>  System.err.println("Usage: MatrixMultiplication   path>");
>  System.exit(0);
>   }
>   input = args[0];
>   output = args[1];
>   new MatrixMultiplication().run();
>}
> 
> }
> 
> 



Re: Hello, a question about Dashborad in Flink

2016-01-25 Thread Fabian Hueske
You can start a job and then periodically request and store information
about the running job and vertices from using corresponding REST calls [1].
The data will be in JSON format.
After the job finished, you can stop requesting data.

Next you parse the JSON, extract the information you need and give it to
some plotting library.
As I said, it is not possible to pass this data back into Flink's
dashboard, but you have to process and plot it yourself.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.html#overview-of-jobs



2016-01-25 16:15 GMT+01:00 Philip Lee :

> Hello,
>
> According to
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-Web-Dashboard-Completed-Job-history-td4067.html,
> I cannot retrieve the job history from Dashboard after turnning off JM.
>
> But as Fabian mentioned here,
> "However, you can query all stats that are displayed by the dashboard via
> a REST API [1] while the JM is running and save them yourself. This way you
> can analyze the data also after the JM was stopped" could you explain about
> this sentence in detail.
>
> I want to evaluate timeline view of each function after a job is done.
>
> Thanks,
> Phil
>


Re: Flink 0.10.1 and HBase

2016-01-25 Thread Nick Dimiduk
Hi Christophe,

What HBase version are you using? Have you looked at using the shaded
client jars? Those should at least isolate HBase/Hadoop's Guava version
from that used by your application.

-n

On Monday, January 25, 2016, Christophe Salperwyck <
christophe.salperw...@gmail.com> wrote:

> Hi all,
>
> I have an issue with Flink 0.10.1, HBase and Guava, it seems to be related
> to this JIRA:
> https://issues.apache.org/jira/browse/FLINK-3158
>
> If I removed the com.google.common.* class files from the jar file, it
> works then.
>
> Is there any other way to deal with this problem?
>
> Thanks for your work!
>
>


Re: Reading Binary Data (Matrix) with Flink

2016-01-25 Thread Saliya Ekanayake
Hi Fabian,

Thank you, I think I've a better picture of this now. I think if I set
DataSource tasks (a config option I guess?) equal to input splits that
would do as I expected.

Yes, will  keep it at the same place across nodes.

Thank you,
Saliya

On Mon, Jan 25, 2016 at 10:59 AM, Fabian Hueske  wrote:

> Hi Saliya,
>
> the number of parallel splits is controlled by the number of input splits
> returned by the InputFormat.createInputSplits() method. This method
> receives a parameter minNumSplits with is equal to the number of DataSource
> tasks.
>
> Flink handles input splits a bit different from Hadoop. In Hadoop, each
> input split corresponds to one map task. In Flink you have a fixed number
> of DataSource tasks and input splits are lazily distributed to source
> tasks. If you have more splits than tasks, a data source requests a new
> split when it is done with its last split until all splits are assigned. If
> your createInputSplits method returns less splits than minNumSplits, some
> source tasks won't receive a split.
>
> If you read files from a local FS in a distributed (multi-node) setup, you
> have to be careful. Each node must have an exact copy of the data at
> exactly the same location. Otherwise, it won't work.
>
> Best, Fabian
>
> 2016-01-25 16:46 GMT+01:00 Saliya Ekanayake :
>
>> Hi Fabian,
>>
>> Thank you for the information.
>>
>> So, is there a way I can get the task number within the InputFormat? That
>> way I can use it to offset the block of rows.
>>
>> The file size is large to fit in a single process' memory, so the current
>> setup in MPI and Hadoop use the rank (task number) info to memory map the
>> corresponding block of rows. In our experiments, we found this approach to
>> be the fastest because of the memory mapping rather buffered reads. Also,
>> the file is replicated across nodes and the reading (mapping) happens only
>> once.
>>
>> Thank you,
>> Saliya
>>
>> On Mon, Jan 25, 2016 at 4:38 AM, Fabian Hueske  wrote:
>>
>>> Hi Saliya,
>>>
>>> yes that is possible, however the requirements for reading a binary file
>>> from local fs are basically the same as for reading it from HDSF.
>>> In order to be able to start reading different sections of a file in
>>> parallel, you need to know the different starting positions. This can be
>>> done by either having fixed offsets for blocks or adding some meta
>>> information for the block start positions. InputFormats can divide the work
>>> of reading a file by generating multiple input splits. Each input split
>>> defines the file, the start offset and the length to read.
>>>
>>> However, are you sure that reading a file in parallel will be faster
>>> than reading it sequentially?
>>> At least for HDDs, IO-bound workloads with "random" reading patterns are
>>> usually much slower than sequential reads.
>>>
>>> Cheers, Fabian
>>>
>>> 2016-01-24 19:10 GMT+01:00 Suneel Marthi :
>>>
 There should be a env.readbinaryfile() IIRC, check that

 Sent from my iPhone

 On Jan 24, 2016, at 12:44 PM, Saliya Ekanayake 
 wrote:

 Thank you for the response on this, but I still have some doubt.
 Simply, the files is not in HDFS, it's in local storage. In Flink if I run
 the program with, say 5 parallel tasks, what I would like to do is to read
 a block of rows in each task as shown below. I looked at the simple CSV
 reader and was thinking to create a custom one like that, but I would need
 to know the task number to read the relevant block. Is this possible?

 

 Thank you,
 Saliya

 On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann 
 wrote:

> With readHadoopFile you can use all of Hadoop’s FileInputFormats and
> thus you can also do everything with Flink, what you can do with Hadoop.
> Simply take the same Hadoop FileInputFormat which you would take for
> your MapReduce job.
>
> Cheers,
> Till
> ​
>
> On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake 
> wrote:
>
>> Thank you, I saw the readHadoopFile, but I was not sure how it can be
>> used to the following, which is what I need. The logic of the code 
>> requires
>> an entire row to operate on, so in our current implementation with P 
>> tasks,
>> each of them will read a rectangular block of (N/P) x N from the matrix. 
>> Is
>> this possible with readHadoopFile? Also, the file may not be in hdfs, so 
>> is
>> it possible to refer to local disk in doing this?
>>
>> Thank you
>>
>> On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park 
>> wrote:
>>
>>> Hi Saliya,
>>>
>>> You can use the input format from Hadoop in Flink by using
>>> readHadoopFile method. The method returns a dataset which of type is
>>> Tuple2. 

Re: Reading Binary Data (Matrix) with Flink

2016-01-25 Thread Fabian Hueske
Hi Saliya,

yes that is possible, however the requirements for reading a binary file
from local fs are basically the same as for reading it from HDSF.
In order to be able to start reading different sections of a file in
parallel, you need to know the different starting positions. This can be
done by either having fixed offsets for blocks or adding some meta
information for the block start positions. InputFormats can divide the work
of reading a file by generating multiple input splits. Each input split
defines the file, the start offset and the length to read.

However, are you sure that reading a file in parallel will be faster than
reading it sequentially?
At least for HDDs, IO-bound workloads with "random" reading patterns are
usually much slower than sequential reads.

Cheers, Fabian

2016-01-24 19:10 GMT+01:00 Suneel Marthi :

> There should be a env.readbinaryfile() IIRC, check that
>
> Sent from my iPhone
>
> On Jan 24, 2016, at 12:44 PM, Saliya Ekanayake  wrote:
>
> Thank you for the response on this, but I still have some doubt. Simply,
> the files is not in HDFS, it's in local storage. In Flink if I run the
> program with, say 5 parallel tasks, what I would like to do is to read a
> block of rows in each task as shown below. I looked at the simple CSV
> reader and was thinking to create a custom one like that, but I would need
> to know the task number to read the relevant block. Is this possible?
>
> 
>
> Thank you,
> Saliya
>
> On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann 
> wrote:
>
>> With readHadoopFile you can use all of Hadoop’s FileInputFormats and
>> thus you can also do everything with Flink, what you can do with Hadoop.
>> Simply take the same Hadoop FileInputFormat which you would take for
>> your MapReduce job.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake 
>> wrote:
>>
>>> Thank you, I saw the readHadoopFile, but I was not sure how it can be
>>> used to the following, which is what I need. The logic of the code requires
>>> an entire row to operate on, so in our current implementation with P tasks,
>>> each of them will read a rectangular block of (N/P) x N from the matrix. Is
>>> this possible with readHadoopFile? Also, the file may not be in hdfs, so is
>>> it possible to refer to local disk in doing this?
>>>
>>> Thank you
>>>
>>> On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park 
>>> wrote:
>>>
 Hi Saliya,

 You can use the input format from Hadoop in Flink by using
 readHadoopFile method. The method returns a dataset which of type is
 Tuple2. Note that MapReduce equivalent transformation in Flink
 is composed of map, groupBy, and reduceGroup.

 > On Jan 20, 2016, at 3:04 PM, Suneel Marthi 
 wrote:
 >
 > Guess u r looking for Flink's BinaryInputFormat to be able to read
 blocks of data from HDFS
 >
 >
 https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html
 >
 > On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake 
 wrote:
 > Hi,
 >
 > I am trying to use Flink perform a parallel batch operation on a NxN
 matrix represented as a binary file. Each (i,j) element is stored as a Java
 Short value. In a typical MapReduce programming with Hadoop, each map task
 will read a block of rows of this matrix and perform computation on that
 block and emit result to the reducer.
 >
 > How is this done in Flink? I am new to Flink and couldn't find a
 binary reader so far. Any help is greatly appreciated.
 >
 > Thank you,
 > Saliya
 >
 > --
 > Saliya Ekanayake
 > Ph.D. Candidate | Research Assistant
 > School of Informatics and Computing | Digital Science Center
 > Indiana University, Bloomington
 > Cell 812-391-4914
 > http://saliya.org
 >

 Regards,
 Chiwan Park


>>>
>>>
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>> Cell 812-391-4914
>>> http://saliya.org
>>>
>>
>>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell 812-391-4914
> http://saliya.org
>
>