Thanks, indeed the desired behavior is to flush if bucket size exceeds a
limit but also if the bucket has been open long enough. Contrary to the
current RollingSink we don't want to flush all the time if the bucket
changes but have multiple buckets "open" as needed.

In our case the date to use for partitioning comes from an event field, but
needs to be formatted, too. The partitioning feature should be generic,
allowing to pass a function that formats the bucket path for each tuple.

Does it seem like a valid plan to create a sink that internally caches
multiple rolling sinks?

On Tue, May 24, 2016 at 3:50 PM, Kostas Kloudas <k.klou...@data-artisans.com
> wrote:

> Hi Juho,
>
> If I understand correctly, you want a custom RollingSink that caches some
> buckets, one for each topic/date key, and whenever the volume of data
> buffered
> exceeds a limit, then it flushes to disk, right?
>
> If this is the case, then you are right that this is not currently
> supported
> out-of-the-box, but it would be interesting to update the RollingSink
> to support such scenarios.
>
> One clarification: when you say that you want partition by date,
> you mean the date of the event, right? Not the processing time.
>
> Kostas
>
> > On May 24, 2016, at 1:22 PM, Juho Autio <juho.au...@rovio.com> wrote:
> >
> > Could you suggest how to dynamically partition data with Flink streaming?
> >
> > We've looked at RollingSink, that takes care of writing batches to S3,
> but
> > it doesn't allow defining the partition dynamically based on the tuple
> > fields.
> >
> > Our data is coming from Kafka and essentially has the kafka topic and a
> > date, among other fields.
> >
> > We'd like to consume all topics (also automatically subscribe to new
> ones)
> > and write to S3 partitioned by topic and date, for example:
> >
> > s3://bucket/path/topic=topic2/date=20160522/
> > s3://bucket/path/topic=topic2/date=20160523/
> > s3://bucket/path/topic=topic1/date=20160522/
> > s3://bucket/path/topic=topic1/date=20160523/
> >
> > There are two problems with RollingSink as it is now:
> > - Only allows partitioning by date
> > - Flushes the batch every time the path changes. In our case the stream
> can
> > for example have a random mix of different topics and that would mean
> that
> > RollingSink isn't able to respect the max flush size but keeps flushing
> the
> > files pretty much on every tuple.
> >
> > We've thought that we could implement a sink that internally creates and
> > handles multiple RollingSink instances as needed for partitions. But it
> > would be great to first hear any suggestions that you might have.
> >
> > If we have to extend RollingSink, it would be nice to make it take a
> > partitioning function as a parameter. The function would be called for
> each
> > tuple to create the output path.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-partitioning-for-stream-output-tp7122.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>

Reply via email to