Piotr, I did as you suggested and it worked perfectly. Thank you! :)
Best, Rodrigo On Thu, Oct 12, 2017 at 5:11 AM, Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi, > > There is no straightforward way to do that. First of all, the error you > are getting is because you are trying to start new application ( > env.fromElements(items) ) inside your reduce function. > > To do what you want, you have to hash partition the products based on > category (instead of grouping by and reducing) and after that either: > > 1. Sort the hash partitioned products and implement custom OutputFormat > (maybe based on FileOutputFormat), that would start a new file when key > value has changed. > > Or > > 2. Implement custom OutputFormat (maybe based on FileOutputFormat), that > would keep multiple opened files - one file per category - and write > records accordingly. > > Note that both options require first to hash partition the products. 1. > Will be more CPU and memory consuming (have to sort the data), 2. Can > exceed the maximum number of simultaneously opened file if number of > categories is very high. > > Piotrek > > > On 11 Oct 2017, at 17:47, rlazoti <rodrigolaz...@gmail.com> wrote: > > > > Hi, > > > > Is there a way to write each group to its own file using the Dataset api > > (Batch)? > > > > For example, lets use the following class: > > > > case class Product(name: String, category: String) > > > > And the following Dataset: > > > > val products = env.fromElements(Product("i7", "cpu"), Product("R5", > "cpu"), > > Product("gtx1080", "gpu"), Product("vega64", "gpu"), Product("evo250gb", > > "ssd")) > > > > So in this example my output should be these 3 files: > > > > - cpu.csv > > i7, cpu > > R5, cpu > > > > - gpu.csv > > gtx1080, gpu > > vega64, gpu > > > > - ssd.csv > > evo250gb, ssd > > > > > > I tried the following code, but got > > org.apache.flink.api.common.InvalidProgramException: Task not > serializable. > > > > products.groupBy("category").reduceGroup { group: Iterator[Product] => > > val items = group.toSeq > > env.fromElements(items).writeAsCsv(s"${items.head.category}.csv") > > items > > } > > > > I welcome any of your inputs. > > > > Thanks! > > > > > > > > -- > > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ > >