Hi Saliya, the number of parallel splits is controlled by the number of input splits returned by the InputFormat.createInputSplits() method. This method receives a parameter minNumSplits with is equal to the number of DataSource tasks.
Flink handles input splits a bit different from Hadoop. In Hadoop, each input split corresponds to one map task. In Flink you have a fixed number of DataSource tasks and input splits are lazily distributed to source tasks. If you have more splits than tasks, a data source requests a new split when it is done with its last split until all splits are assigned. If your createInputSplits method returns less splits than minNumSplits, some source tasks won't receive a split. If you read files from a local FS in a distributed (multi-node) setup, you have to be careful. Each node must have an exact copy of the data at exactly the same location. Otherwise, it won't work. Best, Fabian 2016-01-25 16:46 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: > Hi Fabian, > > Thank you for the information. > > So, is there a way I can get the task number within the InputFormat? That > way I can use it to offset the block of rows. > > The file size is large to fit in a single process' memory, so the current > setup in MPI and Hadoop use the rank (task number) info to memory map the > corresponding block of rows. In our experiments, we found this approach to > be the fastest because of the memory mapping rather buffered reads. Also, > the file is replicated across nodes and the reading (mapping) happens only > once. > > Thank you, > Saliya > > On Mon, Jan 25, 2016 at 4:38 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Saliya, >> >> yes that is possible, however the requirements for reading a binary file >> from local fs are basically the same as for reading it from HDSF. >> In order to be able to start reading different sections of a file in >> parallel, you need to know the different starting positions. This can be >> done by either having fixed offsets for blocks or adding some meta >> information for the block start positions. InputFormats can divide the work >> of reading a file by generating multiple input splits. Each input split >> defines the file, the start offset and the length to read. >> >> However, are you sure that reading a file in parallel will be faster than >> reading it sequentially? >> At least for HDDs, IO-bound workloads with "random" reading patterns are >> usually much slower than sequential reads. >> >> Cheers, Fabian >> >> 2016-01-24 19:10 GMT+01:00 Suneel Marthi <suneel.mar...@gmail.com>: >> >>> There should be a env.readbinaryfile() IIRC, check that >>> >>> Sent from my iPhone >>> >>> On Jan 24, 2016, at 12:44 PM, Saliya Ekanayake <esal...@gmail.com> >>> wrote: >>> >>> Thank you for the response on this, but I still have some doubt. Simply, >>> the files is not in HDFS, it's in local storage. In Flink if I run the >>> program with, say 5 parallel tasks, what I would like to do is to read a >>> block of rows in each task as shown below. I looked at the simple CSV >>> reader and was thinking to create a custom one like that, but I would need >>> to know the task number to read the relevant block. Is this possible? >>> >>> <image.png> >>> >>> Thank you, >>> Saliya >>> >>> On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> With readHadoopFile you can use all of Hadoop’s FileInputFormats and >>>> thus you can also do everything with Flink, what you can do with Hadoop. >>>> Simply take the same Hadoop FileInputFormat which you would take for >>>> your MapReduce job. >>>> >>>> Cheers, >>>> Till >>>> >>>> >>>> On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake <esal...@gmail.com> >>>> wrote: >>>> >>>>> Thank you, I saw the readHadoopFile, but I was not sure how it can be >>>>> used to the following, which is what I need. The logic of the code >>>>> requires >>>>> an entire row to operate on, so in our current implementation with P >>>>> tasks, >>>>> each of them will read a rectangular block of (N/P) x N from the matrix. >>>>> Is >>>>> this possible with readHadoopFile? Also, the file may not be in hdfs, so >>>>> is >>>>> it possible to refer to local disk in doing this? >>>>> >>>>> Thank you >>>>> >>>>> On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park <chiwanp...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Saliya, >>>>>> >>>>>> You can use the input format from Hadoop in Flink by using >>>>>> readHadoopFile method. The method returns a dataset which of type is >>>>>> Tuple2<Key, Value>. Note that MapReduce equivalent transformation in >>>>>> Flink >>>>>> is composed of map, groupBy, and reduceGroup. >>>>>> >>>>>> > On Jan 20, 2016, at 3:04 PM, Suneel Marthi <smar...@apache.org> >>>>>> wrote: >>>>>> > >>>>>> > Guess u r looking for Flink's BinaryInputFormat to be able to read >>>>>> blocks of data from HDFS >>>>>> > >>>>>> > >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html >>>>>> > >>>>>> > On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake < >>>>>> esal...@gmail.com> wrote: >>>>>> > Hi, >>>>>> > >>>>>> > I am trying to use Flink perform a parallel batch operation on a >>>>>> NxN matrix represented as a binary file. Each (i,j) element is stored as >>>>>> a >>>>>> Java Short value. In a typical MapReduce programming with Hadoop, each >>>>>> map >>>>>> task will read a block of rows of this matrix and perform computation on >>>>>> that block and emit result to the reducer. >>>>>> > >>>>>> > How is this done in Flink? I am new to Flink and couldn't find a >>>>>> binary reader so far. Any help is greatly appreciated. >>>>>> > >>>>>> > Thank you, >>>>>> > Saliya >>>>>> > >>>>>> > -- >>>>>> > Saliya Ekanayake >>>>>> > Ph.D. Candidate | Research Assistant >>>>>> > School of Informatics and Computing | Digital Science Center >>>>>> > Indiana University, Bloomington >>>>>> > Cell 812-391-4914 >>>>>> > http://saliya.org >>>>>> > >>>>>> >>>>>> Regards, >>>>>> Chiwan Park >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Saliya Ekanayake >>>>> Ph.D. Candidate | Research Assistant >>>>> School of Informatics and Computing | Digital Science Center >>>>> Indiana University, Bloomington >>>>> Cell 812-391-4914 >>>>> http://saliya.org >>>>> >>>> >>>> >>> >>> >>> -- >>> Saliya Ekanayake >>> Ph.D. Candidate | Research Assistant >>> School of Informatics and Computing | Digital Science Center >>> Indiana University, Bloomington >>> Cell 812-391-4914 >>> http://saliya.org >>> >>> >> > > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > Cell 812-391-4914 > http://saliya.org >