[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15448497#comment-15448497
 ] 

ASF GitHub Bot commented on FLINK-4190:
---

Github user joshfg closed the pull request at:

https://github.com/apache/flink/pull/2269


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
> Fix For: 1.2.0
>
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15448498#comment-15448498
 ] 

ASF GitHub Bot commented on FLINK-4190:
---

Github user joshfg commented on the issue:

https://github.com/apache/flink/pull/2269
  
That's great, thanks Aljoscha! 


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
> Fix For: 1.2.0
>
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15448459#comment-15448459
 ] 

ASF GitHub Bot commented on FLINK-4190:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2269
  
I merged it, could you please close this PR if it's not closed 
automatically.

Thanks for your work, and sorry again for taking so long to get it in!  


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
> Fix For: 1.2.0
>
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439249#comment-15439249
 ] 

ASF GitHub Bot commented on FLINK-4190:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2269
  
Hi,
I'm very for the delays! I still have this sitting at the top of my list 
and I'm hoping to get this in by beginning of next week.


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-08-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15438770#comment-15438770
 ] 

ASF GitHub Bot commented on FLINK-4190:
---

Github user joshfg commented on the issue:

https://github.com/apache/flink/pull/2269
  
Hi Aljoscha, just wanted to remind you about this - any idea when the 
changes will be merged in? Thanks!


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15391952#comment-15391952
 ] 

ASF GitHub Bot commented on FLINK-4190:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2269
  
Thanks for moving the tests! It should be ok to leave these other two 
ITCases.

I'll merge the commits into one once I can find some time to look at the 
tests and merge it.


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15391934#comment-15391934
 ] 

ASF GitHub Bot commented on FLINK-4190:
---

Github user joshfg commented on the issue:

https://github.com/apache/flink/pull/2269
  
Ok I've migrated `BucketingSinkITCase` and 
`BucketingSinkMultipleActiveBucketsCase` over to `BucketingSinkTest` using the 
test harness with `TimeServiceProvider`. I've left the two fault tolerance IT 
cases as they are because it looks like they need to run a proper Flink job 
with a custom source/mappers. Does that sound OK?

If you think it's ready to merge, should I move all the commits into a 
single commit for FLINK-4190?


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15391842#comment-15391842
 ] 

ASF GitHub Bot commented on FLINK-4190:
---

Github user joshfg commented on the issue:

https://github.com/apache/flink/pull/2269
  
That works, thanks! :)


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15391765#comment-15391765
 ] 

ASF GitHub Bot commented on FLINK-4190:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2269
  
This is caused by `setInputType()` not being called on 
`SequenceFileWriter`. In the test, you can call `setInputType` on the 
`BucketingSink` once with the input `TypeInformation` and a `new 
ExecutionConfig()`. That should do the trick.


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15391760#comment-15391760
 ] 

ASF GitHub Bot commented on FLINK-4190:
---

Github user joshfg commented on the issue:

https://github.com/apache/flink/pull/2269
  
Ah I see, that makes sense.
I've began refactoring the tests here: 
https://github.com/joshfg/flink/blob/flink-4190/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java

But have run into this strange exception in the 
`testNonRollingSequenceFileWithoutCompressionWriter` test:
```
java.lang.IllegalStateException: Key Class has not been initialized.
at 
org.apache.flink.streaming.connectors.fs.SequenceFileWriter.open(SequenceFileWriter.java:84)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:500)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:396)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:226)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testNonRollingSequenceFileWithoutCompressionWriter(BucketingSinkTest.java:220)
```

Any ideas what would cause that? I've copied the HDFS cluster 
initialisation exactly as it was in the original tests...





> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15391647#comment-15391647
 ] 

ASF GitHub Bot commented on FLINK-4190:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2269
  
Not really, the reason for having an ITCase is just that they really 
exercise the sink embedded in a proper Flink job, which might bring up 
interactions that where overlooked when writing a test case. I think with 
proper test cases we could go completely without an ITCase. Which also improves 
testing time.

The purpose of the `MiniDFSCluster` is to test the sink against an actual 
HDFS cluster because there is some stuff in there that would only be tested 
when using HDFS. For example, the truncate support.

tl;dr Go for it and move everything to a test case.  


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15391621#comment-15391621
 ] 

ASF GitHub Bot commented on FLINK-4190:
---

Github user joshfg commented on the issue:

https://github.com/apache/flink/pull/2269
  
Thanks! Oh nice, this looks like a better solution for checking for bucket 
inactivity...
For the tests, is there any reason not to fold all of those tests into the 
new `BucketingSinkTest`? Currently there's 4: (BucketingSinkITCase, 
BucketingSinkFaultToleranceITCase, BucketingSinkFaultTolerance2ITCase, 
BucketingSinkMultipleActiveBucketsCase)

Also, do you know what's the purpose of using MiniDFSCluster in the tests? 
Could we rewrite the other tests in the same way as your example test, without 
running a local HDFS cluster?


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-21 Thread Josh Forman-Gornall (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387961#comment-15387961
 ] 

Josh Forman-Gornall commented on FLINK-4190:


Thanks! Oh nice, this looks like a better solution for checking for bucket 
inactivity...

For the tests, is there any reason not to fold all of those tests into the new 
`BucketingSinkTest`? Currently there's 4:(BucketingSinkITCase, 
BucketingSinkFaultToleranceITCase, BucketingSinkFaultTolerance2ITCase, 
BucketingSinkMultipleActiveBucketsCase)

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15387767#comment-15387767
 ] 

ASF GitHub Bot commented on FLINK-4190:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2269
  
Very good work!

I know we discussed before whether to check for inactivity in a different 
thread or in `invoke()`. There's actually a third option that I'm showcasing in 
the PR I did against your PR.  The StreamTask already has a TimerService that 
can be set for testing. If we use the appropriate methods in the bucketing sink 
then we get testability with a settable clock for free.

I also added a `BucketSinkTest`. It think it would be good if the 
`BucketSinkITCase` and `BucketingSinkMultipleActiveBucketsCase` could be folded 
into this one because having the ITCases means having a lot of overhead and our 
build is already taking quite long.

What do you think?


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-19 Thread Josh Forman-Gornall (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15384302#comment-15384302
 ] 

Josh Forman-Gornall commented on FLINK-4190:


Hmm it is showing up here for me as a "links to"... Here it is anyway: 
https://github.com/apache/flink/pull/2269

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-19 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15384249#comment-15384249
 ] 

Aljoscha Krettek commented on FLINK-4190:
-

Did you open the PR with the correct FLINK-4190 tag? Normally, it should show 
up here.

Could you post a link to the PR?

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-19 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15384250#comment-15384250
 ] 

Aljoscha Krettek commented on FLINK-4190:
-

Did you open the PR with the correct FLINK-4190 tag? Normally, it should show 
up here.

Could you post a link to the PR?

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-19 Thread Josh Forman-Gornall (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15384206#comment-15384206
 ] 

Josh Forman-Gornall commented on FLINK-4190:


Ok cool, thanks! I've just submitted a pull request. 

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-18 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15382297#comment-15382297
 ] 

Aljoscha Krettek commented on FLINK-4190:
-

I think checking in invoke is the right thing to do. Having extra Threads 
almost always is a bad idea.

Could you please go ahead and open PR once you moved to a {{bucketing}} (or 
something similar) packed? I had a quick glance at the code and it seemed fine. 
I would do a thorough review once the PR is open.

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-13 Thread Josh Forman-Gornall (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15374589#comment-15374589
 ] 

Josh Forman-Gornall commented on FLINK-4190:


Ok I'll do that. Can you see any issues with the code itself? The main thing I 
wasn't sure about was whether the inactive buckets check should occur in a 
separate thread spawned by a timer/scheduled executor, or whether it should 
occur in the {{invoke}} method, when new elements arrive. I decided to do the 
latter since I thought it might be better running on the main operator thread. 
But it has the disadvantage that if no new elements arrive, the inactive 
buckets would not be closed (although I guess this is unlikely to happen in 
practice).

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-13 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15374568#comment-15374568
 ] 

Aljoscha Krettek commented on FLINK-4190:
-

What you could also do is create a new package and put the new sink and related 
classes in there. This way you wouldn't have to rename {{Bucketed}} to 
{{BucketFunction}} and stuff would be nicely "isolated".

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-12 Thread Josh Forman-Gornall (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372902#comment-15372902
 ] 

Josh Forman-Gornall commented on FLINK-4190:


I've made those changes to my branch - I also added back the {{Bucketer}} 
interface and its subclasses (all marked as deprecated) and called the new 
version {{BucketingFunction}}

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-12 Thread Josh Forman-Gornall (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372649#comment-15372649
 ] 

Josh Forman-Gornall commented on FLINK-4190:


Ah yeah that's a good point! I'll add the deprecated RollingSink back in then. 

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-11 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371342#comment-15371342
 ] 

Aljoscha Krettek commented on FLINK-4190:
-

I'm very happy you're working on this! What I would suggest is to leave the 
existing {{RollingSink}} as is and deprecate it while adding the new sink under 
the {{BucktingSink}} name. I don't like breaking code of people that are 
already using the current sink and we can remove it once Flink 2.0 is released.

What do you think?

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-11 Thread Josh Forman-Gornall (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370921#comment-15370921
 ] 

Josh Forman-Gornall commented on FLINK-4190:


I needed this feature and already have a working solution - the changes I made 
can be found here:
https://github.com/joshfg/flink/tree/flink-4190

If people think these changes are good and will be useful to others, I will 
submit a PR.

There are three main changes:
- The Bucketer interface now takes the sink's input element as a generic 
parameter, enabling us to bucket based on attributes of the sink's input.
- While maintaining the same rolling mechanics of the existing implementation 
(e.g. rolling when the file size reaches a threshold), the sink implementation 
can now have many 'active' buckets at any point in time. The checkpointing 
mechanics have been extended to support maintaining the state of multiple 
active buckets and files, instead of just one.
- For use cases where the buckets being written to are changing over time, the 
sink now needs to determine when a bucket has become 'inactive', in order to 
flush and close the file. In the existing implementation, this is simply when 
the bucket path changes. Instead, we now determine a bucket as inactive if it 
hasn't been written to recently. To support this there are two additional user 
configurable settings: inactiveBucketCheckInterval and inactiveBucketThreshold.

Also, I've renamed RollingSink to BucketingSink to reflect its more general use.

Any comments are welcome, thanks!


> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

2016-07-11 Thread Josh Forman-Gornall (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370596#comment-15370596
 ] 

Josh Forman-Gornall commented on FLINK-4190:


I needed this feature and already have a working solution - will submit a PR 
today (comments welcome!)

> Generalise RollingSink to work with arbitrary buckets
> -
>
> Key: FLINK-4190
> URL: https://issues.apache.org/jira/browse/FLINK-4190
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector, Streaming Connectors
>Reporter: Josh Forman-Gornall
>Assignee: Josh Forman-Gornall
>Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to 
> directories that are bucketed by system time (e.g. minutely) and to only be 
> writing to one file within one bucket at any point in time. When the system 
> time determines that the current bucket should be changed, the current bucket 
> and file are closed and a new bucket and file are created. The sink cannot be 
> used for the more general problem of writing to arbitrary buckets, perhaps 
> determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used 
> for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an 
> attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes 
> the file and moves on to the new bucket. Therefore the sink cannot have more 
> than one bucket/file open at a time. Additionally the checkpointing mechanics 
> only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when 
> the bucket path changes. We need another way to determine when a bucket has 
> become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)