You’re welcome :)
> On 23 Oct 2017, at 20:43, Rodrigo Lazoti <rodrigolaz...@gmail.com> wrote:
>
> Piotr,
>
> I did as you suggested and it worked perfectly.
> Thank you! :)
>
> Best,
> Rodrigo
>
> On Thu, Oct 12, 2017 at 5:11 AM, Piotr Nowojski <pi...@data-artisans.com
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
>
> There is no straightforward way to do that. First of all, the error you are
> getting is because you are trying to start new application (
> env.fromElements(items) ) inside your reduce function.
>
> To do what you want, you have to hash partition the products based on
> category (instead of grouping by and reducing) and after that either:
>
> 1. Sort the hash partitioned products and implement custom OutputFormat
> (maybe based on FileOutputFormat), that would start a new file when key value
> has changed.
>
> Or
>
> 2. Implement custom OutputFormat (maybe based on FileOutputFormat), that
> would keep multiple opened files - one file per category - and write records
> accordingly.
>
> Note that both options require first to hash partition the products. 1. Will
> be more CPU and memory consuming (have to sort the data), 2. Can exceed the
> maximum number of simultaneously opened file if number of categories is very
> high.
>
> Piotrek
>
> > On 11 Oct 2017, at 17:47, rlazoti <rodrigolaz...@gmail.com
> > <mailto:rodrigolaz...@gmail.com>> wrote:
> >
> > Hi,
> >
> > Is there a way to write each group to its own file using the Dataset api
> > (Batch)?
> >
> > For example, lets use the following class:
> >
> > case class Product(name: String, category: String)
> >
> > And the following Dataset:
> >
> > val products = env.fromElements(Product("i7", "cpu"), Product("R5", "cpu"),
> > Product("gtx1080", "gpu"), Product("vega64", "gpu"), Product("evo250gb",
> > "ssd"))
> >
> > So in this example my output should be these 3 files:
> >
> > - cpu.csv
> > i7, cpu
> > R5, cpu
> >
> > - gpu.csv
> > gtx1080, gpu
> > vega64, gpu
> >
> > - ssd.csv
> > evo250gb, ssd
> >
> >
> > I tried the following code, but got
> > org.apache.flink.api.common.InvalidProgramException: Task not serializable.
> >
> > products.groupBy("category").reduceGroup { group: Iterator[Product] =>
> > val items = group.toSeq
> > env.fromElements(items).writeAsCsv(s"${items.head.category}.csv")
> > items
> > }
> >
> > I welcome any of your inputs.
> >
> > Thanks!
> >
> >
> >
> > --
> > Sent from:
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>
>