[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...
Github user joshfg commented on the issue: https://github.com/apache/flink/pull/2269 That's great, thanks Aljoscha! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2269 I merged it, could you please close this PR if it's not closed automatically. Thanks for your work, and sorry again for taking so long to get it in! ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2269 Hi, I'm very for the delays! I still have this sitting at the top of my list and I'm hoping to get this in by beginning of next week. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...
Github user joshfg commented on the issue: https://github.com/apache/flink/pull/2269 Hi Aljoscha, just wanted to remind you about this - any idea when the changes will be merged in? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2269 Thanks for moving the tests! It should be ok to leave these other two ITCases. I'll merge the commits into one once I can find some time to look at the tests and merge it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...
Github user joshfg commented on the issue: https://github.com/apache/flink/pull/2269 Ok I've migrated `BucketingSinkITCase` and `BucketingSinkMultipleActiveBucketsCase` over to `BucketingSinkTest` using the test harness with `TimeServiceProvider`. I've left the two fault tolerance IT cases as they are because it looks like they need to run a proper Flink job with a custom source/mappers. Does that sound OK? If you think it's ready to merge, should I move all the commits into a single commit for FLINK-4190? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...
Github user joshfg commented on the issue: https://github.com/apache/flink/pull/2269 That works, thanks! :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2269 This is caused by `setInputType()` not being called on `SequenceFileWriter`. In the test, you can call `setInputType` on the `BucketingSink` once with the input `TypeInformation` and a `new ExecutionConfig()`. That should do the trick. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...
Github user joshfg commented on the issue: https://github.com/apache/flink/pull/2269 Ah I see, that makes sense. I've began refactoring the tests here: https://github.com/joshfg/flink/blob/flink-4190/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java But have run into this strange exception in the `testNonRollingSequenceFileWithoutCompressionWriter` test: ``` java.lang.IllegalStateException: Key Class has not been initialized. at org.apache.flink.streaming.connectors.fs.SequenceFileWriter.open(SequenceFileWriter.java:84) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:500) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:396) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:226) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testNonRollingSequenceFileWithoutCompressionWriter(BucketingSinkTest.java:220) ``` Any ideas what would cause that? I've copied the HDFS cluster initialisation exactly as it was in the original tests... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2269 Not really, the reason for having an ITCase is just that they really exercise the sink embedded in a proper Flink job, which might bring up interactions that where overlooked when writing a test case. I think with proper test cases we could go completely without an ITCase. Which also improves testing time. The purpose of the `MiniDFSCluster` is to test the sink against an actual HDFS cluster because there is some stuff in there that would only be tested when using HDFS. For example, the truncate support. tl;dr Go for it and move everything to a test case. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...
Github user joshfg commented on the issue: https://github.com/apache/flink/pull/2269 Thanks! Oh nice, this looks like a better solution for checking for bucket inactivity... For the tests, is there any reason not to fold all of those tests into the new `BucketingSinkTest`? Currently there's 4: (BucketingSinkITCase, BucketingSinkFaultToleranceITCase, BucketingSinkFaultTolerance2ITCase, BucketingSinkMultipleActiveBucketsCase) Also, do you know what's the purpose of using MiniDFSCluster in the tests? Could we rewrite the other tests in the same way as your example test, without running a local HDFS cluster? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2269: [FLINK-4190] Generalise RollingSink to work with arbitrar...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2269 Very good work! I know we discussed before whether to check for inactivity in a different thread or in `invoke()`. There's actually a third option that I'm showcasing in the PR I did against your PR. ð The StreamTask already has a TimerService that can be set for testing. If we use the appropriate methods in the bucketing sink then we get testability with a settable clock for free. I also added a `BucketSinkTest`. It think it would be good if the `BucketSinkITCase` and `BucketingSinkMultipleActiveBucketsCase` could be folded into this one because having the ITCases means having a lot of overhead and our build is already taking quite long. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---