[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6075 @zhangminglei For your interest - there is a new Bucketing Sink in the Flink master (called `StreamingFileSink`), with a different design: Managing all state in Flink state (so it is consistent), with a new File System writer abstraction to generalize across HDFS, POSIX, and S3 (S3 still WIP) and with a more pluggable way to add encoders, like parquet and orc. As an example, we added a Parquet writer, which is quite straightforward and flexible with the new interface. Would be great to get your opinion on that and see if your ORC writer code also works with that. If it works out, the new StreamingFileSink could replace the current BucketingSink. ---
[GitHub] flink issue #6353: [FLINK-9875][runtime] Add concurrent creation of executio...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6353 Maybe we can solve this simpler? Avoiding concurrency in the execution graph creation makes the code simpler and more robust - very desirable for an already fairly complex construct. The issue here is the time it takes to create the splits, so we could see if we parallelize that, rather than parallelizing the job vertex creation. I would think in the direction of having a Future that supplies the input splits and computing the future in the IOExecutor. That would parallelize the core problem and leave the executiongraph as it is. ---
[GitHub] flink pull request #:
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/commit/8231b62ff42aae53ca3a7b552980838ccab824ab#commitcomment-29792609 In flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java: In flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java on line 81: Looks redundant, I agree. ---
[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6300 Nice feature, thanks a lot. Merged this into the 1.6 and 1.7 branches ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r202556032 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java --- @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.Writer; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.ResumableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Sink that emits its input elements to {@link FileSystem} files within buckets. This is + * integrated with the checkpointing mechanism to provide exactly once semantics. + * + * + * When creating the sink a {@code basePath} must be specified. The base directory contains + * one directory for every bucket. The bucket directories themselves contain several part files, + * with at least one for each parallel subtask of the sink which is writing data to that bucket. + * These part files contain the actual output data. + * + * + * The sink uses a {@link Bucketer} to determine in which bucket directory each element should + * be written to inside the base directory. The {@code Bucketer} can, for example, use time or + * a property of the element to determine the bucket directory. The default {@code Bucketer} is a + * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify + * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. + * + * + * The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink + * and a rolling counter. For example the file {@code "part-1-17"} contains the data from + * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. + * When a part file becomes bigger than the user-specified part size or when the part file becomes older + * than the user-specified roll over interval the current part file is closed, the part counter is increased + * and a new part file is created. The batch size defaults to {@code 384MB}, this can be configured + * using {@link #setPartFileSize(long)}. The roll over interval defaults to {@code Long.MAX_VALUE} and
[GitHub] flink issue #6326: Mutual authentication for internal communication
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6326 Thanks for the review and for merging. @NicoK has an end-to-end test for SSL PR already (#6327) which would be great to rebase on top of this change. ---
[GitHub] flink issue #6326: Mutual authentication for internal communication
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6326 Pushed another commit that rebuilds the generated config docs ---
[GitHub] flink issue #6302: [FLINK-9061][checkpointing] add entropy to s3 path for be...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6302 Thanks for this contribution, that's a valuable fix. I have a few thoughts and suggestions on how we might improve the feature a bit still: - Can we get id of the `commons-text` dependency? The fewer dependencies, the fewer possible problems for users due to dependency clashes. It seems a bit heavy to add a new library for just one random string generation. - The feature is configured through additional constructor parameters. I am wondering if we may want to move this to the `Configuration`. That would allow the "ops side of things" to configure this for a setup (setting entropy key and checkpoints directory) without needing everyone that writes a Flink program to be aware of this. - If I read the code correctly, the code logs warnings for every file in case the feature is not activated. That will probably confuse a lot of users and make them dig into whether they have a wrong setup, when they simply don't use this new feature. ---
[GitHub] flink issue #6321: [FLINK-9829] fix the wrapper classes be compared by symbo...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6321 Is the issue addressed here a bug? If not, and if it seems that the original authors of the code had an intention of writing the code as it is now, I would suggest to leave it as it is. ---
[GitHub] flink issue #6321: [FLINK-9829] fix the wrapper classes be compared by symbo...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6321 This would be clearly not a hotfix. As per the pull request template, contributors should use hotfixes mainly for typos and JavaDoc updates. ---
[GitHub] flink issue #6326: Mutual authentication for internal communication
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6326 @EronWright Given our last discussion, I think this should be interesting to you. ---
[GitHub] flink issue #6050: [FLINK-9404][flink-connector-filesystem] Adapter viewfs i...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6050 I think makes sense, merging... ---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5982 @sihuazhou I got caught up in some other tasks - will try to get back to this here soon, I would like to have this feature in as a base for "search for completed checkpoint". ---
[GitHub] flink issue #6116: [FLINK-9498][build] Disable dependency convergence for fl...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6116 Is there a workaround for users to disable dependency convergence? It is actually a problem that we don't control the convergence of some dependency that is used with varying versions (Hadoop), but rely on it for our own builds to succeed. In the long run, we may want to work only "hadoop free", referencing for compilation-only our own shaded Hadoop (maybe even move it to flink-shaded) and let users export the Hadoop Classpath for their own Hadoop (rather than changing the dependency when building Flink). ---
[GitHub] flink issue #6281: [FLINK-9750] Add new StreamingFileSink with ResumableWrit...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6281 @xndai The umbrella issue is [FLINK-9749](https://issues.apache.org/jira/browse/FLINK-9749) and some parts on the specifics of block formats (ORC / Parquet) are in [FLINK-9753](https://issues.apache.org/jira/browse/FLINK-9753) ---
[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6300 I like the idea of this - should make it much easier to use. Have you run this code on some heavier data stream to validate that it works well in practice? If yes, I would be +1 to this ---
[GitHub] flink issue #6324: [FLINK-9424] [security] Set default cipher suite to a mor...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6324 Thanks, merging... ---
[GitHub] flink issue #6328: [FLINK-9816][network] add option to configure SSL engine ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6328 Could you rebase this on top of #6326 ? That PR makes sure SSLEngine factories are used everywhere, giving a single point to integrate the provider such that it is available for all SSL endpoints. ---
[GitHub] flink pull request #5966: [FLINK-9312] [security] Add mutual authentication ...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/5966 ---
[GitHub] flink issue #5966: [FLINK-9312] [security] Add mutual authentication for RPC...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5966 This PR is subsumed by #6326 ---
[GitHub] flink issue #5966: [FLINK-9312] [security] Add mutual authentication for RPC...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5966 @EronWright Just saw this - I have concurrently reworked this PR into #6326 which does things more cleanly. I would like to get that PR in for 1.6 (got many users asking for this). I would be happy if you want to build on top of that for the next steps... ---
[GitHub] flink issue #6327: [FLINK-9839][e2e] add a streaming allround test with SSL ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6327 Could we save testing time by just activating SSL for existing test jobs? Please also check the update of the SSL config keys that may come through #6326 ---
[GitHub] flink pull request #6326: Mutual authentication for internal communication
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/6326 Mutual authentication for internal communication ** This is based on #6324 - hence the first commit in this PR should be discarded from review** ## What is the purpose of the change Splits the SSL configuration into **internal communication** *(RPC, data transport, blob server)* and **external/REST** communication. Also activates mutual authentication for all internal communication. This continues the security features of Flink as outlined in http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-26-SSL-Mutual-Authentication-td22188.html Most of these changes are straightforward, the most important thing for reviewers to in my opinion would be to check whether the configuration keys make sense: - One can configure SSL independently for internal and external/REST communication - This is due to feedback from users that internal communication needs to be protected more and by Flink itself, while external communication is frequently protected by REST proxies (often as side car processes to the JobManager / Dispatcher) - All keytore and password settings now exist additionally in the `security.ssl.internal.*` and `security.ssl.rest.*` key namespace. The `security.ssl.*` config keys still exist and used if the more specific key is not set. This is meant both for backwards compatibility, and to make it easy to use a uniform config across internal/external communication. ## Brief change log - Introduces new config option families: `security.ssl.internal.*` and `security.ssl.rest.*` - Adds code to fall back to the `security.ssl.*` keys if no internal or rest specific options are set - Refactors all instantiation of `SSLEngine` and `SSL(Server)Socket` to go through factories. That way, the different endpoint instantiations do not need to apply configurations themselves. - Activates mutual auth for akka/rpc via akka config, plus adds a test - Activates mutual auth in the SSL Socket/Engine factories (netty / blob) and adds a test ## Verifying this change - There are additional unit tests checking that clients with untrusted certificates cannot connect. - Verifying end-to-end works by building the code, enabling internal SSL in the flink-conf.yaml, starting a standalone cluster, checking the logs and akka urls for SSL entries ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **yes** - If yes, how is the feature documented? Docs coming in a separate PR once we have agreement on the config keys You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink client_auth Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6326.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6326 commit 37abf46f6030b6404707958e5a3a3fae0051dbea Author: Stephan Ewen Date: 2018-07-13T07:31:18Z [FLINK-9424] [security] Set default cipher suite to a more compatible cipher suite. The upgraded ciphers are not yet supported on all platforms and JDK versions, making the getting-started process rough. Instead, we document our recommendation to set these values in the configuration. This reverts "[FLINK-9310] [security] Update standard cipher suites for secure mode" commit 0d33c8ab2be6502a56d8ea97a72fda5ec8b865c0 Author: Stephan Ewen Date: 2018-07-12T09:28:57Z [FLINK-9313] [security] (part 1) Instantiate all SSLSocket and SSLServerSocket through factories. This removes hostname verification from SSL client sockets. With client authentication, this is no longer needed and it is not compatible with various container environments. commit 80cd8bec111bb91943bd691adf80275c79b57ca0 Author: Stephan Ewen Date: 2018-05-07T17:44:33Z [FLINK-9313] [security] (part 3) Activate mutual authentication for RPC/akka commit 97425b2962861922ac3d7e64fb57400de787966d Author: Stephan Ewen Date: 2018-07-12T15:20:30Z [FLINK-9313] [security] (part 2) Split SSL configuration into internal (rpc, data transport, b
[GitHub] flink pull request #6324: [FLINK-9424] [security] Set default cipher suite t...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/6324 [FLINK-9424] [security] Set default cipher suite to a more compatible cipher suite ## What is the purpose of the change This reverts "[FLINK-9310] [security] Update standard cipher suites for secure mode" The upgraded ciphers are not yet supported on all platforms and JDK versions, making the getting-started process rough. Instead, we document our recommendation to set these values in the configuration. ## Brief change log - Reverts "[FLINK-9310] [security] Update standard cipher suites for secure mode" - Add docs to manually configure the stronger cipher suites ## Documentation Adds a section to the SSL docs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink downgrade_ciphers Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6324.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6324 commit 37abf46f6030b6404707958e5a3a3fae0051dbea Author: Stephan Ewen Date: 2018-07-13T07:31:18Z [FLINK-9424] [security] Set default cipher suite to a more compatible cipher suite. The upgraded ciphers are not yet supported on all platforms and JDK versions, making the getting-started process rough. Instead, we document our recommendation to set these values in the configuration. This reverts "[FLINK-9310] [security] Update standard cipher suites for secure mode" ---
[GitHub] flink pull request #6304: [FLINK-9801][build] Add missing example dependenci...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6304#discussion_r201937327 --- Diff: flink-dist/pom.xml --- @@ -140,6 +140,22 @@ under the License. + --- End diff -- There is a section further down on the POM, with a comment "dependencies for "/examples" in scope provided. How about moving these entries there, then we get comments for "why it is like that" for free... ---
[GitHub] flink issue #6309: [FLINK-9809] [DataSteam API] Allow setting co-location co...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6309 Thanks, merging once CI is green... ---
[GitHub] flink pull request #6309: [FLINK-9809] [DataSteam API] Allow setting co-loca...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/6309 [FLINK-9809] [DataSteam API] Allow setting co-location constraints on StreamTransformations ## What is the purpose of the change Flink supports co location constraints for operator placement during scheduling. This is used internally for iterations, for example, but is not exposed to users. This PR adds a way for expert users to set these constraints. As a first step, it adds them to the `StreamTransformation`, which is not part of the public user-facing classes, but a more internal class in the DataStream API. That way we make this initially a hidden feature and can gradually expose it more prominently when we agree that this would be a good idea. You can use them as follows: ```java DataStream stream = ... stream.getTransformation().setCoLocationGroupKey("group2"); ``` ## Verifying this change - You can test setting the constraints as in the example above. - The unit test `StreamGraphCoLocationConstraintTest` adds further tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? A *hidden* feature. - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink colocation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6309.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6309 commit bd58f39290842e9088dfdb997b0704ada7bd79c8 Author: Stephan Ewen Date: 2018-07-11T15:48:10Z [FLINK-9809] [DataSteam API] Allow setting co-location constraints on StreamTransformations. This feature is currently only exposed on StreamTransformations (internal API) rather than in the public API, because it is a hidden expert feature. ---
[GitHub] flink issue #6305: [FLINK-9807][tests] Optimize EventTimeWindowCheckpointITC...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6305 Please update the description so that reviewers can take a look. ---
[GitHub] flink issue #6275: [FLINK-9776] [runtime] Stop sending periodic interrupts o...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6275 All right, thanks, merging! ---
[GitHub] flink issue #6290: [Flink-9691] [Kinesis Connector] Attempt to call getRecor...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6290 Thanks, merging this... ---
[GitHub] flink issue #6286: [FLINK-9754][release] Remove references to scala profiles
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6286 +1 ---
[GitHub] flink issue #6285: [FLINK-9768][release] Speed up binary release
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6285 +1 ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201374783 --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.local; + +import org.apache.flink.core.fs.AbstractResumableWriterTest; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; + +/** + * Tests for the {@link LocalResumableWriter}. + */ +public class LocalFileSystemResumableWriterTest extends AbstractResumableWriterTest { + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); --- End diff -- It is good practice to make these final ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201400623 --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java --- @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.fail; + +public abstract class AbstractResumableWriterTest extends TestLogger { + + private static final Random RND = new Random(); + + private static final String testData1 = "THIS IS A TEST 1."; + private static final String testData2 = "THIS IS A TEST 2."; + private static final String testData3 = "THIS IS A TEST 3."; + + private Path basePathForTest; + + private static FileSystem fileSystem; + + public abstract Path getBasePath() throws Exception; + + public abstract FileSystem initializeFileSystem(); + + public Path getBasePathForTest() { + return basePathForTest; + } + + private FileSystem getFileSystem() { + if (fileSystem == null) { + fileSystem = initializeFileSystem(); + } + return fileSystem; + } + + private ResumableWriter getNewFileSystemWriter() throws IOException { + return getFileSystem().createRecoverableWriter(); + } + + @Before + public void prepare() throws Exception { + basePathForTest = new Path(getBasePath(), randomName()); + getFileSystem().mkdirs(basePathForTest); + } + + @After + public void cleanup() throws Exception { + getFileSystem().delete(basePathForTest, true); + } + + @Test + public void testCloseWithNoData() throws Exception { + final ResumableWriter writer = getNewFileSystemWriter(); + + final Path testDir = getBasePathForTest(); + + final Path path = new Path(testDir + File.separator + "part-0"); --- End diff -- Avoid `File.separator` for cross platform path, use `new Path(testDir, "part-0");`. ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201369555 --- Diff: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java --- @@ -253,4 +265,39 @@ public CommitRecoverable getRecoverable() { return recoverable; } } + + /** +* Called when resuming execution after a failure and waits until the lease +* of the file we are resuming is free. +* +* The lease of the file we are resuming writing/committing to may still +* belong to the process that failed previously and whose state we are +* recovering. +* +* @param path The path to the file we want to resume writing to. +*/ + private boolean waitUntilLeaseIsRevoked(final Path path) throws IOException { + Preconditions.checkState(fs instanceof DistributedFileSystem); + + final DistributedFileSystem dfs = (DistributedFileSystem) fs; + dfs.recoverLease(path); + boolean isclosed = dfs.isFileClosed(path); + + final StopWatch sw = new StopWatch(); --- End diff -- Let's use `Deadline` from the Flink utils instead to reduce external dependencies. ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201374633 --- Diff: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java --- @@ -253,4 +265,39 @@ public CommitRecoverable getRecoverable() { return recoverable; } } + + /** +* Called when resuming execution after a failure and waits until the lease +* of the file we are resuming is free. +* +* The lease of the file we are resuming writing/committing to may still +* belong to the process that failed previously and whose state we are +* recovering. +* +* @param path The path to the file we want to resume writing to. +*/ + private boolean waitUntilLeaseIsRevoked(final Path path) throws IOException { + Preconditions.checkState(fs instanceof DistributedFileSystem); + + final DistributedFileSystem dfs = (DistributedFileSystem) fs; + dfs.recoverLease(path); + boolean isclosed = dfs.isFileClosed(path); + + final StopWatch sw = new StopWatch(); + sw.start(); + + while (!isclosed) { + if (sw.getTime() > LEASE_TIMEOUT) { + break; + } + + try { --- End diff -- This basically locks the thread in for up to LEASE_TIMEOUT time, making it not possible to cancel. I would either propagate the InterruptedException, or rethrow it as an IOException indicating that recovering the lease failed (because this is a single-purpose util function that works here). ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201375290 --- Diff: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java --- @@ -130,7 +130,7 @@ public static boolean hasHDFSDelegationToken() throws Exception { */ public static boolean isMinHadoopVersion(int major, int minor) throws FlinkRuntimeException { String versionString = VersionInfo.getVersion(); - String[] versionParts = versionString.split("."); + String[] versionParts = versionString.split("\\."); --- End diff -- Good catch! ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201401033 --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java --- @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.fail; + +public abstract class AbstractResumableWriterTest extends TestLogger { + + private static final Random RND = new Random(); + + private static final String testData1 = "THIS IS A TEST 1."; + private static final String testData2 = "THIS IS A TEST 2."; + private static final String testData3 = "THIS IS A TEST 3."; + + private Path basePathForTest; + + private static FileSystem fileSystem; + + public abstract Path getBasePath() throws Exception; + + public abstract FileSystem initializeFileSystem(); + + public Path getBasePathForTest() { + return basePathForTest; + } + + private FileSystem getFileSystem() { + if (fileSystem == null) { + fileSystem = initializeFileSystem(); + } + return fileSystem; + } + + private ResumableWriter getNewFileSystemWriter() throws IOException { + return getFileSystem().createRecoverableWriter(); + } + + @Before + public void prepare() throws Exception { + basePathForTest = new Path(getBasePath(), randomName()); + getFileSystem().mkdirs(basePathForTest); + } + + @After + public void cleanup() throws Exception { + getFileSystem().delete(basePathForTest, true); + } + + @Test + public void testCloseWithNoData() throws Exception { + final ResumableWriter writer = getNewFileSystemWriter(); + + final Path testDir = getBasePathForTest(); + + final Path path = new Path(testDir + File.separator + "part-0"); + + final RecoverableFsDataOutputStream stream = writer.open(path); + for (Map.Entry fileContents : getFileContentByPath(testDir).entrySet()) { + Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress.")); + Assert.assertTrue(fileContents.getValue().isEmpty()); + } + + stream.closeForCommit().commit(); + + for (Map.Entry fileContents : getFileContentByPath(testDir).entrySet()) { + Assert.assertEquals("part-0", fileContents.getKey().getName()); + Assert.assertTrue(fileContents.getValue().isEmpty()); + } + } + + @Test + public void testCommitAfterNormalClose() throws Exception { + final ResumableWriter writer = getNewFileSystemWriter(); + + final Path testDir = getBasePathForTest(); + + final Path path = new Path(testDir.getPath() + File.separator + "part-0"); + + try (final RecoverableFsDataOutputStream stream = writer.open(path)) { + stream.write(testData1.getBytes(Charset.forName("UTF-8"))); --- End diff -- Use `StandardCharsets.UTF_8` instead of "UTF-8". ---
[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6281#discussion_r201375170 --- Diff: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java --- @@ -41,6 +44,8 @@ @Internal class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream { + private static final long LEASE_TIMEOUT = 10L; --- End diff -- Can we add digit grouping chars here? Makes it easier to read... ---
[GitHub] flink pull request #6275: [FLINK-9776] [runtime] Stop sending periodic inter...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6275#discussion_r200814568 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -1563,7 +1573,7 @@ public void run() { // log stack trace where the executing thread is stuck and // interrupt the running thread periodically while it is still alive - while (executerThread.isAlive()) { + while (task.shouldInterruptOnCancel() && executerThread.isAlive()) { --- End diff -- True, this is no 100% guarantee that interrupts do not come. That would need an atomic "interrupt if flag is set call", but I don't know if that is possible in Java without introducing a locked code block, which I wanted to avoid. It may also not be necessary. I think the variant here is already strictly better than the current state, which is correct already. The current state mainly suffers from shutdowns "looking rough" due to interruptions. This change should the majority of that, because in the vast majority of shutdowns, the thread exits before the first of the "repeated interrupts". The thread only experiences the initial interrupt. In some sense, only clearing the initial interrupt flag would probably help > 90% of the cases already. This solves a few more % of the cases by guarding the repeated interrupts. ---
[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6075 The dependencies of the `flink-connector-filesystem` are not well set up already, having an Avro dependency and a Hadoop dependency. I agree that it would be good to not introduce yet more dependencies, or at the very least, make them optional dependencies. FYI: In the re-work of the BucketingSink under [FLINK-9749](https://issues.apache.org/jira/browse/FLINK-9749), we want to introduce a `BulkEncoder` interface in `flink-core` that is used by the BucketingSink, and can be implemented by classes in `flink-orc` (and later a new `flink-parquet` project). That way we cleanly separate dependencies of the projects. ---
[GitHub] flink issue #6235: [FLINK-9377] [core] Remove serializers from checkpointed ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6235 Took a look at this WIP and I think it goes into a good direction. My most important comment is that I think it would help to move the "ensureCompatibility" into the config snapshot, for the following reasons: - Clearer separation of concerns, the serializer has only the serialization logic, and creating the snapshot. Compatibility is not the serializers immediate concern. - The current design means that the serializer mutates internal fields on reconfiguration. That is error prone. Consider a serializer like the KryoSerializer, where the configuration is not fully deep copied on duplication (makes sense, it is read only during serialization). Mutating that configuration would change the behavior of other previously duplicated serializers as well, which is unexpected. Thoughts for improvements with lower priority: - Can we avoid setting the ClassLoader into a field in the config snapshot, and then deserializing? I think such solutions are fragile and should be avoided if possible. The ClassLoader is not really part of the snapshots state, it is an auxiliary to the deserialization and should, as such, be passed as an argument to the read method: read(in, classloader). This means that the TypeSerializerConfigSnapshot would not implement `IOReadableWritable`, but that might be not a problem. - Is the TypeSerializerConfigSnapshotSerializationProxy needed? It seems like an unnecessary indirection given that it is used exclusively in the TypeSerializerSerializationUtil and could be a static util method instead. ---
[GitHub] flink pull request #6275: [FLINK-9776] [runtime] Stop sending periodic inter...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/6275 [FLINK-9776] [runtime] Stop sending periodic interrupts once executing thread leaves user function / operator code. ## What is the purpose of the change Upon cancellation, the task thread is periodically interrupted. This helps to pull the thread out of blocking operations in the user code. However, once the thread leaves the user code, the repeated interrupts may interfere with the shutdown cleanup logic, causing confusing exceptions. This PR changes the behavior to stop sending the periodic interrupts once the thread leaves the user code. ## Brief change log - `AbstractInvokable` maintains a flag whether interrupts should be sent. - `StreamTask` sets to not receive interrupts after coming out of the user code ## Verifying this change This change is a trivial rework that currently only avoids throwing and catching of InterruptedExceptions that may cause noise in the logs. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink stop_interrupts Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6275.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6275 commit 73d31551574f3c18e4cbc079681ed93f9ec2ef34 Author: Stephan Ewen Date: 2018-07-06T11:34:27Z [FLINK-9776] [runtime] Stop sending periodic interrupts once executing thread leaves user function / operator code. ---
[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6075 If it is not a problem that this can lead to poor compression when checkpoint intervals are short, we could think about merging this as a temporary solution until [FLINK-9749](https://issues.apache.org/jira/browse/FLINK-9749) is fully developed. I would suggest to make the ORC and table dependencies optional, though, that not every user of the BucketingSink needs to have these dependencies. ---
[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6075 Hi @zhangminglei Sorry for the late response - I thought about this solution quite a bit and came to the conclusion that we may need to do a bit more for efficient results: Please take a look at [FLINK-9749](https://issues.apache.org/jira/browse/FLINK-9749) and the subtask [FLINK-9753](https://issues.apache.org/jira/browse/FLINK-9753) The description outlines why I believe the simple approach suggested here may not be enough (will frequently result in badly compressed ORC/Parquet). We have already started this effort to completely redesign the BucketingSink. The initial work-in-progress looks quite promising. ---
[GitHub] flink issue #6118: [FLINK-9525][filesystem] Add missing `META-INF/services/*...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6118 Where in the loading of the factories do you see the error? My suspicion is still an issue with inverted class loading. To confirm, can we check the following? - Are you running this on Flink 1.4.0 or 1.4.1? - Do you have `hadoop-common` in the job's jar, or in the `flink/lib` folder? - Does the error go away if you set "classloader.resolve-order: parent-first" in the config? ---
[GitHub] flink issue #6118: [FLINK-9525][filesystem] Add missing `META-INF/services/*...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6118 I think this is a misdiagnosis, this should not be merged. Flink does not need a file system factory for Hadoop, it uses Hadoop's FS as the general fallback for all schemes that it does not have a factory for. The exception in the linked JIRA comes from Hadoop's own File System discovery. There is probably some casting error or so (may be due to inverted classloading). ---
[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6103 That all depends why the failure happens in the first place. It seems to happen if the receiver of a channel starts much faster than the sender. The longest part of the deployment is library distribution, which happens only once. After one failure / recovery, the library should be cached and the next attempt to start the task should be very fast. ---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5448 There is no problem reusing old keys, if their default unit was "bytes", because the `MemorySize.parse(...)` interprets a number as bytes, if there is no unit attached to it. I did not realize that you switched the config keys already - in that case we need to backwards support the old keys as well. Also, we need to update all the shell scripts (`config.sh`, `jobmanager.sh`, `taskmanager.sh` and so on) to be consistent with the new config keys. ---
[GitHub] flink issue #6108: [FLINK-9367] [Streaming Connectors] Allow to do truncate(...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6108 @kl0u please link the issue once you created it. This is currently very early, in design discussions between @kl0u, me, and @aljoscha. The main points about the rewrite are - Use Flink's FileSystem abstraction, to make it work with shaded S3, swift, etc and give an easier interface - Add a proper "ChunkedWriter" abstraction to the FileSystems, which handles write, persist-on-checkpoint, and rollback-to-checkpoint in a FileSystem specific way. For example, use truncate()/append() on POSIX and HDFS, use MultiPartUploads on S3, ... - Add support for gathering large chunks across checkpoints, to make Parquet and ORC compression more effective. ---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5448 Okay, after taking a look, I think we need to add a few changes: - We need to add an additional `MemoryUnit.parse()` method that takes the "default" unit, so that we parse the old heap sizes such that they are in MB if nothing else is specified. - We should either change the return value of `getMebiBytes()` to `int` or have a `getMebiBytesAsInt()` method that uses a `MathUtils.checkedDownCast()` to avoid unnoticed overflow errors. Open question: As we are changing the value type of the heap size config options, should we deprecate the current config keys and introduce new ones (like `jobmanager.heap-size`)? ---
[GitHub] flink issue #6094: [FLINK-9468][filesystem] fix calculate outputLimit incorr...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6094 Good catch. Apparently the tests don't check all combinations of settings, otherwise this would have shown up. Merging this... ---
[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6103 How critical is it to change this setting? I would assume this should be caught by the regular recovery, so unless this occurs very often and thus leads to confusing exceptions in the log, should we maybe leave it as it is? ---
[GitHub] flink issue #6108: [FLINK-9367] [Streaming Connectors] Allow to do truncate(...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6108 Do you have a Hadoop version older than 2.7? We are currently attempting to rewrite the Bucketing Sink completely for better compatibility with S3 and with better support for Parquet / ORC. We were actually thinking to drop support for file systems that do not support `truncate()` - so getting this feedback would be good. ---
[GitHub] flink issue #6111: [FLINK-9504]Change the log level of checkpoint duration t...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6111 I have often wondered about this as well. There sure are a lot of log lines. - The state backends themselves should probably log on `debug` level, otherwise there is a line per operator in the task - The task itself should still log on info level, so that one has information to analyze checkpoints from the logs ---
[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5448 Will try and take a look at this soon... Sorry for the delay. What I would consider very important is that users who don't change their configuration do not get different behavior all of a sudden. Meaning in the absence of a "unit" we do not always interpret the value as a "byte" but as whatever the config value was measured in before (such as MBs, ...). ---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5982 I think we need to have a special output stream type (`AtomicCreatingFsDataOutputStream` or similar) as the return type of `FileSystem.createAtomic()`. Otherwise, how can a user actually create a file? The `closeAndPublish()` method is not part of any API class. ---
[GitHub] flink issue #6073: [FLINK-9091] [table] Fix dependency convergence for flink...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6073 DependencyManagement in the root pom should cover dependencies that we share and expose across modules. Enforcing convergence with one module (`flink-table`) for a dependency that is hidden (shaded) should be handled in that module, not clutter the project-global configuration. ---
[GitHub] flink pull request #6073: [FLINK-9091] [table] Fix dependency convergence fo...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6073#discussion_r190855869 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -146,6 +147,12 @@ under the License. flink-test-utils_${scala.binary.version} ${project.version} test + --- End diff -- Why does `flink-test-utils` even have a Guava dependency? Can we fix that? ---
[GitHub] flink issue #5843: [FLINK-8744][docs] Generate "Common Option" section
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5843 One could add an int to the annotation, as "priority / position" and sort by that. Not sure nice, but could be okay. I think it was nice for users that the most common options (the ones you need first) were at the top of the list. Out of curiosity, what happens to options like `env.java.opts` which are shell script only options, but very common? ---
[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6043 @cjolif I agree, let's do something here. @tzulitai what do you think about trying to use the switch to REST to make a clean cut and start a new connector project (without dependency on `flink-connector-elasticsearch-base`). As an experiment, we could try how much code we would actually need to copy into the new project. @aljoscha and @patricklucas I remember you also had some thoughts on the elasticsearch connectors. I am +1 for seeing if we can drop ElasticSearch 1.x and 2.x support, but that should be a separate thread. ---
[GitHub] flink issue #5963: [FLINK-9305][s3] also register flink-s3-fs-hadoop's facto...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5963 Merging this... ---
[GitHub] flink issue #5954: [FLINK-9276] Improve error message when TaskManager fails
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5954 This looks fine from my side. Would like to get a +1 from @tillrohrmann before merging this... ---
[GitHub] flink issue #5891: [FLINK-9088][nifi-connector][build] Bump nifi-site-to-sit...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5891 All new dependencies are okay with the Apache License (not for shading, though!) Because nothing gets shaded here and we left dependency management to the user, this upgrade is okay. Merging this... ---
[GitHub] flink issue #5891: [FLINK-9088][nifi-connector][build] Bump nifi-site-to-sit...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5891 To get such dependency upgrade PRs merged, it is useful to always add the relevant parts of the dependency tree after the update: Old: ``` [INFO] org.apache.flink:flink-connector-nifi_2.11:jar:1.6-SNAPSHOT [INFO] +- org.apache.nifi:nifi-site-to-site-client:jar:0.6.1:compile [INFO] | +- org.apache.nifi:nifi-api:jar:0.6.1:compile [INFO] | +- org.apache.nifi:nifi-utils:jar:0.6.1:compile [INFO] | +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile [INFO] | | \- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile [INFO] | \- org.apache.nifi:nifi-client-dto:jar:0.6.1:compile [INFO] | \- com.wordnik:swagger-annotations:jar:1.5.3-M1:compile ``` New: ``` [INFO] +- org.apache.nifi:nifi-site-to-site-client:jar:1.6.0:compile [INFO] | +- org.apache.nifi:nifi-api:jar:1.6.0:compile [INFO] | +- org.apache.nifi:nifi-framework-api:jar:1.6.0:compile [INFO] | +- org.apache.nifi:nifi-utils:jar:1.6.0:compile [INFO] | +- org.apache.nifi:nifi-security-utils:jar:1.6.0:compile [INFO] | | +- commons-codec:commons-codec:jar:1.10:compile [INFO] | | +- org.bouncycastle:bcprov-jdk15on:jar:1.59:compile [INFO] | | +- org.bouncycastle:bcpkix-jdk15on:jar:1.59:compile [INFO] | | \- org.apache.nifi:nifi-properties:jar:1.6.0:compile [INFO] | +- org.apache.commons:commons-lang3:jar:3.3.2:compile [INFO] | +- com.fasterxml.jackson.core:jackson-databind:jar:2.9.4:compile [INFO] | | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.9.0:compile [INFO] | | \- com.fasterxml.jackson.core:jackson-core:jar:2.9.4:compile [INFO] | +- org.apache.nifi:nifi-client-dto:jar:1.6.0:compile [INFO] | | +- io.swagger:swagger-annotations:jar:1.5.16:compile [INFO] | | \- org.apache.nifi.registry:nifi-registry-data-model:jar:0.1.0:compile [INFO] | | +- javax.validation:validation-api:jar:2.0.0.Final:compile [INFO] | | \- javax.ws.rs:javax.ws.rs-api:jar:2.1:compile [INFO] | +- org.apache.httpcomponents:httpclient:jar:4.5.3:compile [INFO] | | +- org.apache.httpcomponents:httpcore:jar:4.4.6:compile [INFO] | | \- commons-logging:commons-logging:jar:1.1.3:compile [INFO] | \- org.apache.httpcomponents:httpasyncclient:jar:4.1.3:compile [INFO] | \- org.apache.httpcomponents:httpcore-nio:jar:4.4.6:compile ``` ---
[GitHub] flink issue #5948: [FLINK-9286][docs] Update classloading docs
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5948 Thanks. Will merge this, possibly add one more sentence in the process... ---
[GitHub] flink issue #5843: [FLINK-8744][docs] Generate "Common Option" section
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5843 Looks pretty good. Is there a way we can "sort" the common options? Something like - host:port (for standalone setups) - java memory - default parallelism / slots - fault tolerance - HA - security ---
[GitHub] flink issue #5857: [FLINK-9187][METRICS] add prometheus pushgateway reporter
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5857 I think this is a nice addition. Basically turns the prometheus "pull model" into a "push model". @lamber-ken Can you check that the new dependency is correctly shaded? @zentol Do you think this is good in the same project as the prometheus reporter, or should this be in a separate project? ---
[GitHub] flink issue #6015: [FLINK-8933] Avoid calling Class#newInstance(part 1)
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6015 Similar as to #6016: Should we close this PR until we have consensus whether we want to change this? Especially the performance implications in methods/classes on the "hot code paths" makes this a tricky change... ---
[GitHub] flink issue #6016: [FLINK-8933] Avoid calling Class#newInstance(part 2)
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6016 Should we close this PR until we have consensus whether we want to change this? Especially the performance implications in methods/classes on the "hot code paths" makes this a tricky change... ---
[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6043 As a high-level comment, I think we may want start making the ElasticSearch connectors projects independent of each other. We previously tried to share code between versions, which has made things clumsy both from the dependency management and the implementation (api bridges, etc.). It also couples different versions, such that a bug fix in one connector version often affects other connectors as well. The REST-based client may be a good time to start clean, create a new project with no dependencies on the base connector project, and copy the necessary code over. What do you think? ---
[GitHub] flink pull request #6066: [FLINK-9428] [checkpointing] Allow operators to fl...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/6066 ---
[GitHub] flink issue #6066: [FLINK-9428] [checkpointing] Allow operators to flush dat...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6066 Closed in e1d1234477c731fe3f398c7f3f12123f73764242 ---
[GitHub] flink issue #6066: [FLINK-9428] [checkpointing] Allow operators to flush dat...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6066 Thanks for the review. Addressing the comments and merging this... ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r190554149 --- Diff: pom.xml --- @@ -308,7 +308,7 @@ under the License. errors. [1] https://github.com/netty/netty/issues/3704 --> --- End diff -- Looks like you can remove this comment now. ---
[GitHub] flink pull request #6066: [FLINK-9428] [checkpointing] Allow operators to fl...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6066#discussion_r190543815 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.io.RecordWriterOutput; +import org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider; +import org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector; +import org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingOutput; +import org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput; + +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +/** + * This class test the {@link OperatorChain}. + * + * It takes a different (simpler) approach at testing the operator chain than + * {@link StreamOperatorChainingTest}. + */ +public class OperatorChainTest { + + @Test + public void testPrepareCheckpointPreBarrier() throws Exception { + final AtomicInteger intRef = new AtomicInteger(); + + final OneInputStreamOperator<String, String> one = new ValidatingOperator(intRef, 0); + final OneInputStreamOperator<String, String> two = new ValidatingOperator(intRef, 1); + final OneInputStreamOperator<String, String> three = new ValidatingOperator(intRef, 2); + + final OperatorChain chain = setupOperatorChain(one, two, three); + chain.prepareSnapshotPreBarrier(ValidatingOperator.CHECKPOINT_ID); + + assertEquals(3, intRef.get()); + } + + // + // Operator Chain Setup Utils + // + + @SafeVarargs + private static <T, OP extends StreamOperator> OperatorChain<T, OP> setupOperatorChain( --- End diff -- This is a lot of mocking, but the alternative approach ties itself not only to the internals of the `OperatorChain`, but also to the stream config specifics. In that sense, I would like to keep this, because it at least ties itself to details one component, rather than two components. This hints that OperatorChain could really use come refactoring. ---
[GitHub] flink pull request #6066: [FLINK-9428] [checkpointing] Allow operators to fl...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/6066 [FLINK-9428] [checkpointing] Allow operators to flush data on checkpoint pre-barrier ## What is the purpose of the change Some operators maintain some small transient state that may be inefficient to checkpoint, especially when it would need to be checkpointed also in a re-scalable way. An example are opportunistic pre-aggregation operators, which have small the pre-aggregation state that is frequently flushed downstream. Rather that persisting that state in a checkpoint, it can make sense to flush the data downstream upon a checkpoint, to let it be part of the downstream operator's state. This feature is sensitive, because flushing state has a clean implication on the downstream operator's checkpoint alignment. However, used with care, and with the new back-pressure-based checkpoint alignment, this feature can be very useful. Because it is sensitive, this PR makes this an internal feature (accessible to operators) and does NOT expose it in the public API. ## Brief change log - Adds the `prepareSnapshotPreBarrier(long checkpointId)` call to `(Abstract)StreamOperator`, with an empty default implementation. - Adds a call on `OperatorChain` to call this in front-to-back order on the operators. ## Verifying this change - This change does not yet alter any behavior, it adds only a plug point for future stream operators. - The `OperatorChainTest` Unit Test validates that the call happens, and that operators are called in the right order. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink pre_barrier Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6066.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6066 ---
[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5995 Looks good, thanks! +1 to merge this ---
[GitHub] flink issue #5996: [FLINK-9343] [Example] Add Async Example with External Re...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5996 Async I/O works best with asynchronous clients. For synchronous client, you need a threadpool or something else to concurrently fire off requests. ---
[GitHub] flink issue #5996: [FLINK-9343] [Example] Add Async Example with External Re...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5996 I had a quick look at the code example, and it looks like it might not actually do asynchronous I/O. It dispatches a synchronous HTTP request on a direct executor (`onComplete`s in a direct executor as well), which should result us purely synchronous operations. Have you verified that this does in fact send off multiple requests concurrently (beyond the parallelism)? ---
[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5995 Added a few more comment, most importantly around exception wrapping. Otherwise, looking good... ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189197766 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = -6766681879020862312L; + + /** Class to deserialize to. */ + private final Class recordClazz; + + /** Schema in case of GenericRecord for serialization purpose. */ + private final String schemaString; + + /** Reader that deserializes byte array into a record. */ + private transient GenericDatumReader datumReader; + + /** Input stream to read message from. */ + private transient MutableByteArrayInputStream inputStream; + + /** Avro decoder that decodes binary data. */ + private transient Decoder decoder; + + /** Avro schema for the reader. */ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.reader = reader; + if (reader != null) { + this.schemaString = reader.toString(); + } else { + this.schemaString = null; + } + } + + /** +* Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema. +* +* @param schema schema of produced records +* @return deserialized record in form of {@link GenericRecord} +*/ + public static AvroDeserializationSchema forGeneric(Schema schema) { + return new AvroDeserializationSchema<>(GenericRecord.class, schema); + } + + /** +* Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro schema. +* +* @param tClass class of record to be produced +* @return deserialized record +*/ + public static AvroDeserializationSchema forSpecific(Class
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189197633 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = -6766681879020862312L; + + /** Class to deserialize to. */ + private final Class recordClazz; + + /** Schema in case of GenericRecord for serialization purpose. */ + private final String schemaString; + + /** Reader that deserializes byte array into a record. */ + private transient GenericDatumReader datumReader; + + /** Input stream to read message from. */ + private transient MutableByteArrayInputStream inputStream; + + /** Avro decoder that decodes binary data. */ + private transient Decoder decoder; + + /** Avro schema for the reader. */ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.reader = reader; + if (reader != null) { + this.schemaString = reader.toString(); + } else { + this.schemaString = null; + } + } + + /** +* Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema. +* +* @param schema schema of produced records +* @return deserialized record in form of {@link GenericRecord} +*/ + public static AvroDeserializationSchema forGeneric(Schema schema) { --- End diff -- Minor comment: I found it helps code structure/readability to move static/factory methods either to the top or the bottom of the class. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189195014 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}. + * + * @param type of record it produces + */ +public class RegistryAvroDeserializationSchema extends AvroDeserializationSchema { + + private static final long serialVersionUID = -884738268437806062L; + + /** Provider for schema coder. Used for initializing in each task. */ + private final SchemaCoder.SchemaCoderProvider schemaCoderProvider; + + /** Coder used for reading schema from incoming stream. */ + private transient SchemaCoder schemaCoder; + + /** +* Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for +*schema reading +*/ + protected RegistryAvroDeserializationSchema(Class recordClazz, @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader); + this.schemaCoderProvider = schemaCoderProvider; + this.schemaCoder = schemaCoderProvider.get(); + } + + @Override + public T deserialize(byte[] message) { + // read record + try { + checkAvroInitialized(); + getInputStream().setBuffer(message); + Schema writerSchema = schemaCoder.readSchema(getInputStream()); + Schema readerSchema = getReaderSchema(); + + GenericDatumReader datumReader = getDatumReader(); + + datumReader.setSchema(writerSchema); + datumReader.setExpected(readerSchema); + + return datumReader.read(null, getDecoder()); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize Row.", e); --- End diff -- The method `deserialize()` can throw an IOException. That got dropped from the signature, and exceptions are not wrapped into a RuntimeException. That makes exception stack traces more complicated, and hides the fact that "there is a possible exceptional case to handle" from the consumers of that code. I think that this makes a general rule: Whenever using `RutimeException`, take a step back and look at the exception structure and signatures, and see if something is not declared well. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189185420 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = -6766681879020862312L; + + /** Class to deserialize to. */ + private final Class recordClazz; + + /** Schema in case of GenericRecord for serialization purpose. */ + private final String schemaString; + + /** Reader that deserializes byte array into a record. */ + private transient GenericDatumReader datumReader; + + /** Input stream to read message from. */ + private transient MutableByteArrayInputStream inputStream; + + /** Avro decoder that decodes binary data. */ + private transient Decoder decoder; + + /** Avro schema for the reader. */ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); --- End diff -- I would skip the initialization in the constructor, if you have he initialization in `checkAvroInitialized()`. Simpler, and avoids having two places that to the initialization which have to be kept in sync. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r189185186 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; + + /** +* Reader that deserializes byte array into a record. +*/ + private transient GenericDatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private transient MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data. +*/ + private transient Decoder decoder; + + /** +* Avro schema for the reader. +*/ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.r
[GitHub] flink pull request #:
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/commit/c8fa8d025684c2225824c54a7285bbfdec7cfddc#commitcomment-29021995 In flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java: In flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java on line 25: true, did not see that, and checkstyle is off for that class. The class is removed now in a follow-up commit anyways, so the problem is fixed now. ---
[GitHub] flink issue #6039: [hotfix] [docs] Add Release Notes for Flink 1.5.
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6039 Very nice, very helpful for users. +1 to merge this ---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5979 Okay, looks really good from my side. Would be good if @StefanRRichter or @azagrebin to double check the change, otherwise good to go. ---
[GitHub] flink issue #5966: [FLINK-9312] [security] Add mutual authentication for RPC...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5966 I would move ahead with this PR as follows: - Separate internal and external SSL config - Activate SSL client auth for akka, netty, and blob server (pure internal communication) Let's discuss external connectivity on FLIP-26 ---
[GitHub] flink issue #6001: [FLINK-9299] ProcessWindowFunction documentation Java exa...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6001 Looks good, thanks, merging this... ---
[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5979 Could you share some micro-benchmark numbers? When we change something that we know works well to something new, would be good to understand what benefits we are talking about. ---
[GitHub] flink issue #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5970 Merging this... ---
[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5977#discussion_r188725700 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java --- @@ -71,6 +71,17 @@ @PublicEvolving MetricGroup getMetricGroup(); + /** +* Returned value is guaranteed to be unique between operators within the same job and to be +* stable and the same across job submissions. +* +* This operation is currently only supported in Streaming (DataStream) contexts. +* +* @return String representation of the operator's unique id. +*/ + @PublicEvolving + String getOperatorUniqueID(); --- End diff -- I am still much in favor of not exposing this in the RuntimeContext: - Having the state accesses in the RuntimeContext was a necessity of that moment, because there was no `initializeState()` and it is crucial to be exposed to users. - This operatorID is not crucial to be exposed to users, hence a very different case to me. - It is super easy to expose it later, it is much harder (even if marked as PublicEvolving) to hide it later. For a quick move, not exposing an addition publicly should always be the default choice, also beyond this specific case here. ---
[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5995 I would actually keep the package name for now. It makes sense, because the connection to the registry is avro-specific at the moment... ---
[GitHub] flink pull request #6001: [FLINK-9299] ProcessWindowFunction documentation J...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6001#discussion_r188529928 --- Diff: docs/dev/stream/operators/windows.md --- @@ -797,7 +797,7 @@ DataStream input = ...; input .keyBy() - .timeWindow() + .timeWindow() --- End diff -- How about using "duration" instead of "time size". I think "time size" is not a commonly used term... ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188340240 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; + + /** +* Reader that deserializes byte array into a record. +*/ + private transient GenericDatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private transient MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data. +*/ + private transient Decoder decoder; + + /** +* Avro schema for the reader. +*/ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.r
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188316643 --- Diff: flink-formats/flink-avro-confluent-registry/pom.xml --- @@ -0,0 +1,94 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + flink-formats + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-avro-confluent-registry + + + + confluent + http://packages.confluent.io/maven/ + + + + + + io.confluent + kafka-schema-registry-client + 3.3.1 + + + org.apache.avro + avro + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + org.apache.flink + flink-avro + ${project.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + + + com.fasterxml.jackson.core + org.apache.flink.shaded.com.fasterxml.jackson.core --- End diff -- We may need to qualify this further by this project, because we have that relocation pattern already in other places, for potentially different jackson versions. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188325819 --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.SchemaCoder; + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import org.apache.avro.Schema; + +import java.io.DataInputStream; +import java.io.InputStream; + +/** + * Reads schema using Confluent Schema Registry protocol. + */ +public class ConfluentSchemaRegistryCoder implements SchemaCoder { + + private final SchemaRegistryClient schemaRegistryClient; + + /** +* Creates {@link SchemaCoder} that uses provided {@link SchemaRegistryClient} to connect to +* schema registry. +* +* @param schemaRegistryClient client to connect schema registry +*/ + public ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient) { + this.schemaRegistryClient = schemaRegistryClient; + } + + @Override + public Schema readSchema(InputStream in) throws Exception { + DataInputStream dataInputStream = new DataInputStream(in); + + if (dataInputStream.readByte() != 0) { + throw new RuntimeException("Unknown data format. Magic number does not match"); --- End diff -- RuntimeExceptions (unchecked exceptions) are usually used to indicate programming errors, or (as a workaround) if the scope does not allow throwing any exception. This here is a case for a checked exception, in my opinion, like an `IOException`, `FlinkException`, etc. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188328236 --- Diff: flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link ConfluentSchemaRegistryCoder}. + */ +public class ConfluentSchemaRegistryCoderTest { --- End diff -- Do we want to test the magic byte verification? ---