Piotr,

I did as you suggested and it worked perfectly.
Thank you! :)

Best,
Rodrigo

On Thu, Oct 12, 2017 at 5:11 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> There is no straightforward way to do that. First of all, the error you
> are getting is because you are trying to start new application (
> env.fromElements(items) ) inside your reduce function.
>
> To do what you want, you have to hash partition the products based on
> category (instead of grouping by and reducing) and after that either:
>
> 1. Sort the hash partitioned products and implement custom OutputFormat
> (maybe based on FileOutputFormat), that would start a new file when key
> value has changed.
>
> Or
>
> 2. Implement custom OutputFormat (maybe based on FileOutputFormat), that
> would keep multiple opened files - one file per category - and write
> records accordingly.
>
> Note that both options require first to hash partition the products. 1.
> Will be more CPU and memory consuming (have to sort the data), 2. Can
> exceed the maximum number of simultaneously opened file if number of
> categories is very high.
>
> Piotrek
>
> > On 11 Oct 2017, at 17:47, rlazoti <rodrigolaz...@gmail.com> wrote:
> >
> > Hi,
> >
> > Is there a way to write each group to its own file using the Dataset api
> > (Batch)?
> >
> > For example, lets use the following class:
> >
> > case class Product(name: String, category: String)
> >
> > And the following Dataset:
> >
> > val products = env.fromElements(Product("i7", "cpu"), Product("R5",
> "cpu"),
> > Product("gtx1080", "gpu"), Product("vega64", "gpu"), Product("evo250gb",
> > "ssd"))
> >
> > So in this example my output should be these 3 files:
> >
> > - cpu.csv
> > i7, cpu
> > R5, cpu
> >
> > - gpu.csv
> > gtx1080, gpu
> > vega64, gpu
> >
> > - ssd.csv
> > evo250gb, ssd
> >
> >
> > I tried the following code, but got
> > org.apache.flink.api.common.InvalidProgramException: Task not
> serializable.
> >
> > products.groupBy("category").reduceGroup { group: Iterator[Product] =>
> >  val items = group.toSeq
> >  env.fromElements(items).writeAsCsv(s"${items.head.category}.csv")
> >  items
> > }
> >
> > I welcome any of your inputs.
> >
> > Thanks!
> >
> >
> >
> > --
> > Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>

Reply via email to