[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...

2016-08-30 Thread joshfg
Github user joshfg commented on the issue:

https://github.com/apache/flink/pull/2269
  
That's great, thanks Aljoscha! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...

2016-08-30 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2269
  
I merged it, could you please close this PR if it's not closed 
automatically.

Thanks for your work, and sorry again for taking so long to get it in! 👍 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...

2016-08-26 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2269
  
Hi,
I'm very for the delays! I still have this sitting at the top of my list 
and I'm hoping to get this in by beginning of next week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...

2016-08-26 Thread joshfg
Github user joshfg commented on the issue:

https://github.com/apache/flink/pull/2269
  
Hi Aljoscha, just wanted to remind you about this - any idea when the 
changes will be merged in? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...

2016-07-25 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2269
  
Thanks for moving the tests! It should be ok to leave these other two 
ITCases.

I'll merge the commits into one once I can find some time to look at the 
tests and merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...

2016-07-25 Thread joshfg
Github user joshfg commented on the issue:

https://github.com/apache/flink/pull/2269
  
Ok I've migrated `BucketingSinkITCase` and 
`BucketingSinkMultipleActiveBucketsCase` over to `BucketingSinkTest` using the 
test harness with `TimeServiceProvider`. I've left the two fault tolerance IT 
cases as they are because it looks like they need to run a proper Flink job 
with a custom source/mappers. Does that sound OK?

If you think it's ready to merge, should I move all the commits into a 
single commit for FLINK-4190?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...

2016-07-25 Thread joshfg
Github user joshfg commented on the issue:

https://github.com/apache/flink/pull/2269
  
That works, thanks! :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...

2016-07-25 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2269
  
This is caused by `setInputType()` not being called on 
`SequenceFileWriter`. In the test, you can call `setInputType` on the 
`BucketingSink` once with the input `TypeInformation` and a `new 
ExecutionConfig()`. That should do the trick.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...

2016-07-25 Thread joshfg
Github user joshfg commented on the issue:

https://github.com/apache/flink/pull/2269
  
Ah I see, that makes sense.
I've began refactoring the tests here: 
https://github.com/joshfg/flink/blob/flink-4190/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java

But have run into this strange exception in the 
`testNonRollingSequenceFileWithoutCompressionWriter` test:
```
java.lang.IllegalStateException: Key Class has not been initialized.
at 
org.apache.flink.streaming.connectors.fs.SequenceFileWriter.open(SequenceFileWriter.java:84)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:500)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:396)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:226)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testNonRollingSequenceFileWithoutCompressionWriter(BucketingSinkTest.java:220)
```

Any ideas what would cause that? I've copied the HDFS cluster 
initialisation exactly as it was in the original tests...





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...

2016-07-25 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2269
  
Not really, the reason for having an ITCase is just that they really 
exercise the sink embedded in a proper Flink job, which might bring up 
interactions that where overlooked when writing a test case. I think with 
proper test cases we could go completely without an ITCase. Which also improves 
testing time.

The purpose of the `MiniDFSCluster` is to test the sink against an actual 
HDFS cluster because there is some stuff in there that would only be tested 
when using HDFS. For example, the truncate support.

tl;dr Go for it and move everything to a test case. 😃 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...

2016-07-25 Thread joshfg
Github user joshfg commented on the issue:

https://github.com/apache/flink/pull/2269
  
Thanks! Oh nice, this looks like a better solution for checking for bucket 
inactivity...
For the tests, is there any reason not to fold all of those tests into the 
new `BucketingSinkTest`? Currently there's 4: (BucketingSinkITCase, 
BucketingSinkFaultToleranceITCase, BucketingSinkFaultTolerance2ITCase, 
BucketingSinkMultipleActiveBucketsCase)

Also, do you know what's the purpose of using MiniDFSCluster in the tests? 
Could we rewrite the other tests in the same way as your example test, without 
running a local HDFS cluster?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...

2016-07-21 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2269
  
Very good work!

I know we discussed before whether to check for inactivity in a different 
thread or in `invoke()`. There's actually a third option that I'm showcasing in 
the PR I did against your PR. 😃 The StreamTask already has a TimerService 
that can be set for testing. If we use the appropriate methods in the bucketing 
sink then we get testability with a settable clock for free.

I also added a `BucketSinkTest`. It think it would be good if the 
`BucketSinkITCase` and `BucketingSinkMultipleActiveBucketsCase` could be folded 
into this one because having the ITCases means having a lot of overhead and our 
build is already taking quite long.

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---