Hi Saliya,

the number of parallel splits is controlled by the number of input splits
returned by the InputFormat.createInputSplits() method. This method
receives a parameter minNumSplits with is equal to the number of DataSource
tasks.

Flink handles input splits a bit different from Hadoop. In Hadoop, each
input split corresponds to one map task. In Flink you have a fixed number
of DataSource tasks and input splits are lazily distributed to source
tasks. If you have more splits than tasks, a data source requests a new
split when it is done with its last split until all splits are assigned. If
your createInputSplits method returns less splits than minNumSplits, some
source tasks won't receive a split.

If you read files from a local FS in a distributed (multi-node) setup, you
have to be careful. Each node must have an exact copy of the data at
exactly the same location. Otherwise, it won't work.

Best, Fabian

2016-01-25 16:46 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:

> Hi Fabian,
>
> Thank you for the information.
>
> So, is there a way I can get the task number within the InputFormat? That
> way I can use it to offset the block of rows.
>
> The file size is large to fit in a single process' memory, so the current
> setup in MPI and Hadoop use the rank (task number) info to memory map the
> corresponding block of rows. In our experiments, we found this approach to
> be the fastest because of the memory mapping rather buffered reads. Also,
> the file is replicated across nodes and the reading (mapping) happens only
> once.
>
> Thank you,
> Saliya
>
> On Mon, Jan 25, 2016 at 4:38 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Saliya,
>>
>> yes that is possible, however the requirements for reading a binary file
>> from local fs are basically the same as for reading it from HDSF.
>> In order to be able to start reading different sections of a file in
>> parallel, you need to know the different starting positions. This can be
>> done by either having fixed offsets for blocks or adding some meta
>> information for the block start positions. InputFormats can divide the work
>> of reading a file by generating multiple input splits. Each input split
>> defines the file, the start offset and the length to read.
>>
>> However, are you sure that reading a file in parallel will be faster than
>> reading it sequentially?
>> At least for HDDs, IO-bound workloads with "random" reading patterns are
>> usually much slower than sequential reads.
>>
>> Cheers, Fabian
>>
>> 2016-01-24 19:10 GMT+01:00 Suneel Marthi <suneel.mar...@gmail.com>:
>>
>>> There should be a env.readbinaryfile() IIRC, check that
>>>
>>> Sent from my iPhone
>>>
>>> On Jan 24, 2016, at 12:44 PM, Saliya Ekanayake <esal...@gmail.com>
>>> wrote:
>>>
>>> Thank you for the response on this, but I still have some doubt. Simply,
>>> the files is not in HDFS, it's in local storage. In Flink if I run the
>>> program with, say 5 parallel tasks, what I would like to do is to read a
>>> block of rows in each task as shown below. I looked at the simple CSV
>>> reader and was thinking to create a custom one like that, but I would need
>>> to know the task number to read the relevant block. Is this possible?
>>>
>>> <image.png>
>>>
>>> Thank you,
>>> Saliya
>>>
>>> On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> With readHadoopFile you can use all of Hadoop’s FileInputFormats and
>>>> thus you can also do everything with Flink, what you can do with Hadoop.
>>>> Simply take the same Hadoop FileInputFormat which you would take for
>>>> your MapReduce job.
>>>>
>>>> Cheers,
>>>> Till
>>>> ​
>>>>
>>>> On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake <esal...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thank you, I saw the readHadoopFile, but I was not sure how it can be
>>>>> used to the following, which is what I need. The logic of the code 
>>>>> requires
>>>>> an entire row to operate on, so in our current implementation with P 
>>>>> tasks,
>>>>> each of them will read a rectangular block of (N/P) x N from the matrix. 
>>>>> Is
>>>>> this possible with readHadoopFile? Also, the file may not be in hdfs, so 
>>>>> is
>>>>> it possible to refer to local disk in doing this?
>>>>>
>>>>> Thank you
>>>>>
>>>>> On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park <chiwanp...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Saliya,
>>>>>>
>>>>>> You can use the input format from Hadoop in Flink by using
>>>>>> readHadoopFile method. The method returns a dataset which of type is
>>>>>> Tuple2<Key, Value>. Note that MapReduce equivalent transformation in 
>>>>>> Flink
>>>>>> is composed of map, groupBy, and reduceGroup.
>>>>>>
>>>>>> > On Jan 20, 2016, at 3:04 PM, Suneel Marthi <smar...@apache.org>
>>>>>> wrote:
>>>>>> >
>>>>>> > Guess u r looking for Flink's BinaryInputFormat to be able to read
>>>>>> blocks of data from HDFS
>>>>>> >
>>>>>> >
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html
>>>>>> >
>>>>>> > On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <
>>>>>> esal...@gmail.com> wrote:
>>>>>> > Hi,
>>>>>> >
>>>>>> > I am trying to use Flink perform a parallel batch operation on a
>>>>>> NxN matrix represented as a binary file. Each (i,j) element is stored as 
>>>>>> a
>>>>>> Java Short value. In a typical MapReduce programming with Hadoop, each 
>>>>>> map
>>>>>> task will read a block of rows of this matrix and perform computation on
>>>>>> that block and emit result to the reducer.
>>>>>> >
>>>>>> > How is this done in Flink? I am new to Flink and couldn't find a
>>>>>> binary reader so far. Any help is greatly appreciated.
>>>>>> >
>>>>>> > Thank you,
>>>>>> > Saliya
>>>>>> >
>>>>>> > --
>>>>>> > Saliya Ekanayake
>>>>>> > Ph.D. Candidate | Research Assistant
>>>>>> > School of Informatics and Computing | Digital Science Center
>>>>>> > Indiana University, Bloomington
>>>>>> > Cell 812-391-4914
>>>>>> > http://saliya.org
>>>>>> >
>>>>>>
>>>>>> Regards,
>>>>>> Chiwan Park
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Saliya Ekanayake
>>>>> Ph.D. Candidate | Research Assistant
>>>>> School of Informatics and Computing | Digital Science Center
>>>>> Indiana University, Bloomington
>>>>> Cell 812-391-4914
>>>>> http://saliya.org
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>> Cell 812-391-4914
>>> http://saliya.org
>>>
>>>
>>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell 812-391-4914
> http://saliya.org
>

Reply via email to