Related issue: https://issues.apache.org/jira/browse/FLINK-2672
On Wed, May 25, 2016 at 9:21 AM, Juho Autio <juho.au...@rovio.com> wrote: > Thanks, indeed the desired behavior is to flush if bucket size exceeds a > limit but also if the bucket has been open long enough. Contrary to the > current RollingSink we don't want to flush all the time if the bucket > changes but have multiple buckets "open" as needed. > > In our case the date to use for partitioning comes from an event field, > but needs to be formatted, too. The partitioning feature should be generic, > allowing to pass a function that formats the bucket path for each tuple. > > Does it seem like a valid plan to create a sink that internally caches > multiple rolling sinks? > > > On Tue, May 24, 2016 at 3:50 PM, Kostas Kloudas < > k.klou...@data-artisans.com> wrote: > >> Hi Juho, >> >> If I understand correctly, you want a custom RollingSink that caches some >> buckets, one for each topic/date key, and whenever the volume of data >> buffered >> exceeds a limit, then it flushes to disk, right? >> >> If this is the case, then you are right that this is not currently >> supported >> out-of-the-box, but it would be interesting to update the RollingSink >> to support such scenarios. >> >> One clarification: when you say that you want partition by date, >> you mean the date of the event, right? Not the processing time. >> >> Kostas >> >> > On May 24, 2016, at 1:22 PM, Juho Autio <juho.au...@rovio.com> wrote: >> > >> > Could you suggest how to dynamically partition data with Flink >> streaming? >> > >> > We've looked at RollingSink, that takes care of writing batches to S3, >> but >> > it doesn't allow defining the partition dynamically based on the tuple >> > fields. >> > >> > Our data is coming from Kafka and essentially has the kafka topic and a >> > date, among other fields. >> > >> > We'd like to consume all topics (also automatically subscribe to new >> ones) >> > and write to S3 partitioned by topic and date, for example: >> > >> > s3://bucket/path/topic=topic2/date=20160522/ >> > s3://bucket/path/topic=topic2/date=20160523/ >> > s3://bucket/path/topic=topic1/date=20160522/ >> > s3://bucket/path/topic=topic1/date=20160523/ >> > >> > There are two problems with RollingSink as it is now: >> > - Only allows partitioning by date >> > - Flushes the batch every time the path changes. In our case the stream >> can >> > for example have a random mix of different topics and that would mean >> that >> > RollingSink isn't able to respect the max flush size but keeps flushing >> the >> > files pretty much on every tuple. >> > >> > We've thought that we could implement a sink that internally creates and >> > handles multiple RollingSink instances as needed for partitions. But it >> > would be great to first hear any suggestions that you might have. >> > >> > If we have to extend RollingSink, it would be nice to make it take a >> > partitioning function as a parameter. The function would be called for >> each >> > tuple to create the output path. >> > >> > >> > >> > -- >> > View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-partitioning-for-stream-output-tp7122.html >> > Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> >