Great, thanks! On 28 Nov 2016 8:54 p.m., "Fabian Hueske" <fhue...@gmail.com> wrote:
> Hi Flavio, > > sure. > This code should be close to what you need: > > public static class BatchingMapper implements MapPartitionFunction<String, > String[]> { > > int cnt = 0; > String[] batch = new String[1000]; > > @Override > public void mapPartition(Iterable<String> values, Collector<String[]> out) > throws Exception { > for(String v : values) { > batch[cnt++] = v; > if (cnt == 1000) { > // emit batch > out.collect(batch); > Arrays.fill(batch, null); > cnt = 0; > } > } > // handle the last batch > String[] lastBatch = Arrays.copyOf(batch, cnt); > out.collect(lastBatch); > } > } > > Cheers, Fabian > > 2016-11-28 20:44 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>: > >> Thanks for the support Fabian! >> I think I'll try the tumbling window method, it seems cleaner. Btw, just >> for the sake of completeness, can you show me a brief snippet (also in >> pseudocode) of a mapPartition that groups together elements into chunks of >> size n? >> >> Best, >> Flavio >> >> On Mon, Nov 28, 2016 at 8:24 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Flavio, >>> >>> I think the easiest solution is to read the CSV file with the >>> CsvInputFormat and use a subsequent MapPartition to batch 1000 rows >>> together. >>> In each partition, you might end up with an incomplete batch. >>> However, I don't see yet how you can feed these batches into the >>> JdbcInputFormat which does not accept a DataSet as input. You could create >>> a RichMapFunction that contains the logic of the JdbcInputFormat to >>> directly query the database with the input of the MapPartitionOperator. >>> >>> If you want to use the DataStream API, you can use a tumbling count >>> window to group IDs together and query the external database in a >>> subsequent Map operator. >>> >>> Hope this helps, >>> Fabian >>> >>> >>> 2016-11-28 18:32 GMT+01:00 Flavio Pompermaier <pomperma...@okkam.it>: >>> >>>> Hi to all, >>>> >>>> I have a use case where I have to read a huge csv containing ids to >>>> fetch from a table in a db. >>>> The jdbc input format can handle parameterized queries so I was >>>> thinking to fetch data using 1000 id at a time. What is the easiest whay to >>>> divide a dataset by slices of 1000 ids each (in order to create parameters >>>> for my JDBC Input format)? Is that possible? >>>> Or maybe there's an easiest solutions using streaming APIs? >>>> >>>> Best, >>>> Flavio >>>> >>> >