[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544494#comment-16544494 ] ASF GitHub Bot commented on FLINK-8558: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6323 > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6323 ---
[GitHub] flink pull request #6201: [FLINK-8866][Table API & SQL] Add support for unif...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6201 ---
[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544478#comment-16544478 ] ASF GitHub Bot commented on FLINK-8858: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202536081 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java --- @@ -173,55 +180,92 @@ public void open() { if (line == null || line.equals("")) { continue; } + parseAndCall(line); + } + } - final SqlCommandCall cmdCall = SqlCommandParser.parse(line); + /** +* Submits a SQL update statement and prints status information and/or errors on the terminal. +* +* @param statement SQL update statement +* @return flag to indicate if the submission was successful or not +*/ + public boolean submitUpdate(String statement) { --- End diff -- I added some unit tests. > Add support for INSERT INTO in SQL Client > - > > Key: FLINK-8858 > URL: https://issues.apache.org/jira/browse/FLINK-8858 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The current design of SQL Client embedded mode doesn't support long running > queries. It would be useful for simple jobs that can be expressed in a single > sql statement if we can submit sql statements stored in files as long running > queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-8866. - Resolution: Fixed Fix Version/s: 1.6.0 Fixed in 1.6.0: 9597248a41b34e126aac6a807651b1d376dc6de1 [FLINK-8866] [table] Create unified interfaces to configure and instatiate TableSinks abbb89059f2a83705f41e405da14073800fb1870 [FLINK-8866] [table] Merge table source/sink/format factories 09fbfdfa76b068fcc8de249fe7cdcd01fd1f350e [FLINK-8866] [table] Move table type out of descriptors 0e5ac4d791a8e35b3e65836bd08c8f96fd900e0b [FLINK-8866] [table] Make source/sink factories environment-dependent > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544495#comment-16544495 ] ASF GitHub Bot commented on FLINK-8866: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6201 > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9850) Add a string to the print method to identify output for DataStream
Hequn Cheng created FLINK-9850: -- Summary: Add a string to the print method to identify output for DataStream Key: FLINK-9850 URL: https://issues.apache.org/jira/browse/FLINK-9850 Project: Flink Issue Type: New Feature Components: DataStream API Reporter: Hequn Cheng The output of the print method of {[DataSet}} allows the user to supply a String to identify the output(see [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But {[DataStream}} doesn't support now. It is valuable to add this feature for {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9675) Avoid FileInputStream/FileOutputStream
[ https://issues.apache.org/jira/browse/FLINK-9675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544452#comment-16544452 ] ASF GitHub Bot commented on FLINK-9675: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6335 Thanks @yanghua I will push the rest of the code then and fix the travis error. > Avoid FileInputStream/FileOutputStream > -- > > Key: FLINK-9675 > URL: https://issues.apache.org/jira/browse/FLINK-9675 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Minor > Labels: filesystem, pull-request-available > > They rely on finalizers (before Java 11), which create unnecessary GC load. > The alternatives, Files.newInputStream, are as easy to use and don't have > this issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6335 Thanks @yanghua I will push the rest of the code then and fix the travis error. ---
[jira] [Created] (FLINK-9852) Expose descriptor-based sink creation in table environments
Timo Walther created FLINK-9852: --- Summary: Expose descriptor-based sink creation in table environments Key: FLINK-9852 URL: https://issues.apache.org/jira/browse/FLINK-9852 Project: Flink Issue Type: New Feature Components: Table API SQL Reporter: Timo Walther Assignee: Timo Walther Currently, only a table source can be created using the unified table descriptors with {{tableEnv.from(...)}}. A similar approach should be supported for defining sinks or even both types at the same time. I suggest the following syntax: {code} tableEnv.connect(Kafka(...)).registerSource("name") tableEnv.connect(Kafka(...)).registerSink("name") tableEnv.connect(Kafka(...)).registerSourceAndSink("name") {code} A table could then access the registered source/sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6336: [FLINK-9630] [connector] Kafka09PartitionDiscovere...
GitHub user ubyyj opened a pull request: https://github.com/apache/flink/pull/6336 [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection ⦠â¦leak on TopicAuthorizationException **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change Fix the bug that Kafka09PartitionDiscoverer can cause TCP connection leak, if getAllPartitionsForTopics() get a TopicAuthorizationException. ## Brief change log catch TopicAuthorizationException and close the kafkaConsumer in getAllPartitionsForTopics(). ## Verifying this change This change added tests and can be verified as follows: - *Manually verified the change by running job which consumes from an non-exist kafka topic, and verified the # of opened TCP connection and # file handle did not increase of the task manager process, The fix has beening running in our production for weeks now, without problem * ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/ubyyj/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6336.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6336 commit 0aa8d75af085c2465e8cfd9e5a572770a5d95738 Author: yuanyoujun Date: 2018-07-15T13:07:49Z [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException ---
[jira] [Updated] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9630: -- Labels: pull-request-available (was: ) > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544603#comment-16544603 ] ASF GitHub Bot commented on FLINK-9061: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6302 Thanks for this contribution, that's a valuable fix. I have a few thoughts and suggestions on how we might improve the feature a bit still: - Can we get id of the `commons-text` dependency? The fewer dependencies, the fewer possible problems for users due to dependency clashes. It seems a bit heavy to add a new library for just one random string generation. - The feature is configured through additional constructor parameters. I am wondering if we may want to move this to the `Configuration`. That would allow the "ops side of things" to configure this for a setup (setting entropy key and checkpoints directory) without needing everyone that writes a Flink program to be aware of this. - If I read the code correctly, the code logs warnings for every file in case the feature is not activated. That will probably confuse a lot of users and make them dig into whether they have a wrong setup, when they simply don't use this new feature. > add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > Labels: pull-request-available > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6302: [FLINK-9061][checkpointing] add entropy to s3 path for be...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6302 Thanks for this contribution, that's a valuable fix. I have a few thoughts and suggestions on how we might improve the feature a bit still: - Can we get id of the `commons-text` dependency? The fewer dependencies, the fewer possible problems for users due to dependency clashes. It seems a bit heavy to add a new library for just one random string generation. - The feature is configured through additional constructor parameters. I am wondering if we may want to move this to the `Configuration`. That would allow the "ops side of things" to configure this for a setup (setting entropy key and checkpoints directory) without needing everyone that writes a Flink program to be aware of this. - If I read the code correctly, the code logs warnings for every file in case the feature is not activated. That will probably confuse a lot of users and make them dig into whether they have a wrong setup, when they simply don't use this new feature. ---
[GitHub] flink pull request #5516: [FLINK-8544] [Kafka Connector] Handle null message...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5516 ---
[GitHub] flink pull request #6109: [FLINK-9483] 'Building Flink' doc doesn't highligh...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6109 ---
[jira] [Commented] (FLINK-9483) "Building Flink" doc doesn't highlight quick build command
[ https://issues.apache.org/jira/browse/FLINK-9483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544630#comment-16544630 ] ASF GitHub Bot commented on FLINK-9483: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6109 > "Building Flink" doc doesn't highlight quick build command > -- > > Key: FLINK-9483 > URL: https://issues.apache.org/jira/browse/FLINK-9483 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.6.0 > Environment: see difference between red and blue parts >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: Screen Shot 2018-05-31 at 4.12.32 PM.png > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6260: [FLINK-9758] Fix ContinuousFileProcessingTest fail...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6260 ---
[GitHub] flink issue #6329: [FLINK-9841] Web UI only show partial taskmanager log
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6329 @dawidwys can you review this PR? ---
[jira] [Commented] (FLINK-7251) Merge the flink-java8 project into flink-core
[ https://issues.apache.org/jira/browse/FLINK-7251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544663#comment-16544663 ] ASF GitHub Bot commented on FLINK-7251: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6120#discussion_r202553410 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -584,21 +581,6 @@ private static void registerFactory(Type t, Class fac // number of parameters the SAM of implemented interface has; the parameter indexing applies to this range final int baseParametersLen = sam.getParameterTypes().length; - // executable references "this" implicitly - if (paramLen <= 0) { - // executable declaring class can also be a super class of the input type - // we only validate if the executable exists in input type - validateInputContainsExecutable(exec, inType); - } - else { - final Type input = TypeExtractionUtils.extractTypeFromLambda( - exec, - lambdaInputTypeArgumentIndices, --- End diff -- Good point. I will remove it. > Merge the flink-java8 project into flink-core > - > > Key: FLINK-7251 > URL: https://issues.apache.org/jira/browse/FLINK-7251 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544671#comment-16544671 ] ASF GitHub Bot commented on FLINK-9489: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202553358 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java --- @@ -85,10 +85,10 @@ public int compare(T o1, T o2) { return ((Comparable) o1).compareTo(o2); } - // we catch this case before moving to more expensive tie breaks. - if (o1.equals(o2)) { - return 0; - } +// // we catch this case before moving to more expensive tie breaks. --- End diff -- I think this is some commented out code which should be removed. > Checkpoint timers as part of managed keyed state instead of raw keyed state > --- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside the managed keyed state. This means that we have to connect our > preparation for asynchronous checkpoints with the backend, so that the timers > are written as part of the state for each key-group. This means that we will > also free up the raw keyed state an might expose it to user functions in the > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202553358 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java --- @@ -85,10 +85,10 @@ public int compare(T o1, T o2) { return ((Comparable) o1).compareTo(o2); } - // we catch this case before moving to more expensive tie breaks. - if (o1.equals(o2)) { - return 0; - } +// // we catch this case before moving to more expensive tie breaks. --- End diff -- I think this is some commented out code which should be removed. ---
[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544673#comment-16544673 ] ASF GitHub Bot commented on FLINK-9489: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202552616 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java --- @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nonnull; + +/** + * --- End diff -- JavaDocs missing > Checkpoint timers as part of managed keyed state instead of raw keyed state > --- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside the managed keyed state. This means that we have to connect our > preparation for asynchronous checkpoints with the backend, so that the timers > are written as part of the state for each key-group. This means that we will > also free up the raw keyed state an might expose it to user functions in the > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544675#comment-16544675 ] ASF GitHub Bot commented on FLINK-9489: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202553765 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -342,16 +379,20 @@ private void restorePartitionedState(Collection state) throws for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) { restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); - StateTable stateTable = stateTables.get(restoredMetaInfo.getName()); + StateSnapshotRestore snapshotRestore = registeredStates.get(restoredMetaInfo.getName()); //important: only create a new table we did not already create it previously - if (null == stateTable) { + if (null == snapshotRestore) { - RegisteredKeyedBackendStateMetaInfo registeredKeyedBackendStateMetaInfo = - new RegisteredKeyedBackendStateMetaInfo<>(restoredMetaInfo); + if (restoredMetaInfo.getBackendStateType() == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE) { + RegisteredKeyValueStateBackendMetaInfo registeredKeyedBackendStateMetaInfo = + new RegisteredKeyValueStateBackendMetaInfo<>(restoredMetaInfo); - stateTable = snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo); - stateTables.put(restoredMetaInfo.getName(), stateTable); + snapshotRestore = snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo); + registeredStates.put(restoredMetaInfo.getName(), snapshotRestore); + } else { --- End diff -- Maybe check that `(restoredMetaInfo.getBackendStateType() == PRIORITY_QUEUE` > Checkpoint timers as part of managed keyed state instead of raw keyed state > --- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside the managed keyed state. This means that we have to connect our > preparation for asynchronous checkpoints with the backend, so that the timers > are written as part of the state for each key-group. This means that we will > also free up the raw keyed state an might expose it to user functions in the > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544669#comment-16544669 ] ASF GitHub Bot commented on FLINK-9489: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202552563 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java --- @@ -264,6 +265,42 @@ public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) } } + public static StateSnapshotKeyGroupReader createKeyGroupPartitionReader( + @Nonnull ElementReaderFunction readerFunction, + @Nonnull KeyGroupElementsConsumer elementConsumer) { --- End diff -- Indenting these parameter one more level would help to distinguish the body from the parameter list. > Checkpoint timers as part of managed keyed state instead of raw keyed state > --- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside the managed keyed state. This means that we have to connect our > preparation for asynchronous checkpoints with the backend, so that the timers > are written as part of the state for each key-group. This means that we will > also free up the raw keyed state an might expose it to user functions in the > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202552563 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java --- @@ -264,6 +265,42 @@ public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) } } + public static StateSnapshotKeyGroupReader createKeyGroupPartitionReader( + @Nonnull ElementReaderFunction readerFunction, + @Nonnull KeyGroupElementsConsumer elementConsumer) { --- End diff -- Indenting these parameter one more level would help to distinguish the body from the parameter list. ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202553887 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -446,8 +485,10 @@ public String toString() { @Override public int numStateEntries() { int sum = 0; - for (StateTable stateTable : stateTables.values()) { - sum += stateTable.size(); + for (StateSnapshotRestore stateTable : registeredStates.values()) { + if (stateTable instanceof StateTable) { + sum += ((StateTable) stateTable).size(); + } --- End diff -- Why does the timers don't count for the total number of state entries? ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202552705 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java --- @@ -24,37 +24,46 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; -public class RegisteredBroadcastBackendStateMetaInfo extends RegisteredStateMetaInfoBase { +public class RegisteredBroadcastStateBackendMetaInfo extends RegisteredStateMetaInfoBase { /** The mode how elements in this state are assigned to tasks during restore. */ + @Nonnull private final OperatorStateHandle.Mode assignmentMode; /** The type serializer for the keys in the map state. */ + @Nonnull private final TypeSerializer keySerializer; /** The type serializer for the values in the map state. */ + @Nonnull private final TypeSerializer valueSerializer; - public RegisteredBroadcastBackendStateMetaInfo( - final String name, - final OperatorStateHandle.Mode assignmentMode, - final TypeSerializer keySerializer, - final TypeSerializer valueSerializer) { + /** The precomputed immutable snapshot of this state */ + @Nullable + private StateMetaInfoSnapshot precomputedSnapshot; --- End diff -- nit: Maybe rename to `precomputedStateMetaInfoSnapshot` ---
[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544672#comment-16544672 ] ASF GitHub Bot commented on FLINK-9489: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202552483 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java --- @@ -28,9 +28,22 @@ @FunctionalInterface public interface KeyExtractorFunction { + KeyExtractorFunction> FOR_KEYED_OBJECTS = new KeyExtractorFunction>() { + @Nonnull + @Override + public Object extractKeyFromElement(@Nonnull Keyed element) { + return element.getKey(); + } + }; --- End diff -- Could we move this extractor into its own `KeyedKeyExtractorFunction` singleton? > Checkpoint timers as part of managed keyed state instead of raw keyed state > --- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside the managed keyed state. This means that we have to connect our > preparation for asynchronous checkpoints with the backend, so that the timers > are written as part of the state for each key-group. This means that we will > also free up the raw keyed state an might expose it to user functions in the > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202552876 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java --- @@ -63,54 +72,46 @@ public RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMe } @SuppressWarnings("unchecked") - public RegisteredBroadcastBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) { + public RegisteredBroadcastStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) { this( snapshot.getName(), OperatorStateHandle.Mode.valueOf( snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)), - (TypeSerializer) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER), - (TypeSerializer) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)); + (TypeSerializer) Preconditions.checkNotNull( + snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)), + (TypeSerializer) Preconditions.checkNotNull( + snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))); Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType()); } /** * Creates a deep copy of the itself. */ - public RegisteredBroadcastBackendStateMetaInfo deepCopy() { - return new RegisteredBroadcastBackendStateMetaInfo<>(this); + @Nonnull + public RegisteredBroadcastStateBackendMetaInfo deepCopy() { + return new RegisteredBroadcastStateBackendMetaInfo<>(this); } @Nonnull @Override public StateMetaInfoSnapshot snapshot() { - Map optionsMap = Collections.singletonMap( - StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(), - assignmentMode.toString()); - Map> serializerMap = new HashMap<>(2); - Map serializerConfigSnapshotsMap = new HashMap<>(2); - String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(); - String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(); - serializerMap.put(keySerializerKey, keySerializer.duplicate()); - serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration()); - serializerMap.put(valueSerializerKey, valueSerializer.duplicate()); - serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration()); - - return new StateMetaInfoSnapshot( - name, - StateMetaInfoSnapshot.BackendStateType.BROADCAST, - optionsMap, - serializerConfigSnapshotsMap, - serializerMap); + if (precomputedSnapshot == null) { + precomputedSnapshot = precomputeSnapshot(); + } + return precomputedSnapshot; --- End diff -- As an easy fix, we could remove the `precomputedSnapshot` field and keep it like it was before that the snapshot was computed with every `snapshot` call. ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202553524 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java --- @@ -80,13 +82,57 @@ public E peek() { @Override public void bulkPoll(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { + if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) { + bulkPollRelaxedOrder(canConsume, consumer); + } else { + bulkPollStrictOrder(canConsume, consumer); + } + } + + private void bulkPollRelaxedOrder(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { + if (orderedCache.isEmpty()) { + bulkPollStore(canConsume, consumer); + } else { + while (!orderedCache.isEmpty() && canConsume.test(orderedCache.peekFirst())) { + final E next = orderedCache.removeFirst(); + orderedStore.remove(next); + consumer.accept(next); + } + + if (orderedCache.isEmpty()) { + bulkPollStore(canConsume, consumer); + } + } + } + + private void bulkPollStrictOrder(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { E element; while ((element = peek()) != null && canConsume.test(element)) { poll(); consumer.accept(element); } } + private void bulkPollStore(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { + try (CloseableIterator iterator = orderedStore.orderedIterator()) { + while (iterator.hasNext()) { + final E next = iterator.next(); + if (canConsume.test(next)) { + orderedStore.remove(next); + consumer.accept(next); + } else { + orderedCache.add(next); + while (iterator.hasNext() && !orderedCache.isFull()) { + orderedCache.add(iterator.next()); + } + break; + } + } + } catch (Exception e) { + throw new FlinkRuntimeException("Exception while bulk polling store.", e); --- End diff -- I would prefer throwing a checked exception here. ---
[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544667#comment-16544667 ] ASF GitHub Bot commented on FLINK-9489: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202552705 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java --- @@ -24,37 +24,46 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; -public class RegisteredBroadcastBackendStateMetaInfo extends RegisteredStateMetaInfoBase { +public class RegisteredBroadcastStateBackendMetaInfo extends RegisteredStateMetaInfoBase { /** The mode how elements in this state are assigned to tasks during restore. */ + @Nonnull private final OperatorStateHandle.Mode assignmentMode; /** The type serializer for the keys in the map state. */ + @Nonnull private final TypeSerializer keySerializer; /** The type serializer for the values in the map state. */ + @Nonnull private final TypeSerializer valueSerializer; - public RegisteredBroadcastBackendStateMetaInfo( - final String name, - final OperatorStateHandle.Mode assignmentMode, - final TypeSerializer keySerializer, - final TypeSerializer valueSerializer) { + /** The precomputed immutable snapshot of this state */ + @Nullable + private StateMetaInfoSnapshot precomputedSnapshot; --- End diff -- nit: Maybe rename to `precomputedStateMetaInfoSnapshot` > Checkpoint timers as part of managed keyed state instead of raw keyed state > --- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside the managed keyed state. This means that we have to connect our > preparation for asynchronous checkpoints with the backend, so that the timers > are written as part of the state for each key-group. This means that we will > also free up the raw keyed state an might expose it to user functions in the > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202553958 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java --- @@ -60,36 +61,35 @@ /** Result of partitioning the snapshot by key-group. */ @Nullable - private KeyGroupPartitionedSnapshot partitionedSnapshot; + private StateKeyGroupWriter partitionedSnapshot; --- End diff -- nit: rename field ---
[jira] [Commented] (FLINK-9853) add hex support in table api and sql
[ https://issues.apache.org/jira/browse/FLINK-9853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544542#comment-16544542 ] ASF GitHub Bot commented on FLINK-9853: --- GitHub user xueyumusic opened a pull request: https://github.com/apache/flink/pull/6337 [FLINK-9853][Tabel API & SQL] add HEX support ## What is the purpose of the change This PR propose to add HEX in table api and sql, the syntax like mysql, which could take int or string arguments. For a integer argument N, it returns a hexadecimal string representation of the value of N. For a string argument str, it returns a hexadecimal string representation of str where each byte of each character in str is converted to two hexadecimal digits. Syntax: HEX(100) = 64 HEX('This is a test String.') = '546869732069732061207465737420537472696e672e' ## Brief change log - *The expressionDsl, scalarSqlFunctions and mathExpressions to add hex* - *The FunctionGenerator to support hex generator* ## Verifying this change This change added tests and can be verified as follows: *(example:)* - *Added ScalaFunctionTests tests for table api and sql expressions* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xueyumusic/flink hex Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6337.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6337 commit 8d3bef913bee1d4913ef3ae056e3d15d4cda2cec Author: xueyu <278006819@...> Date: 2018-07-15T12:01:15Z hex support > add hex support in table api and sql > > > Key: FLINK-9853 > URL: https://issues.apache.org/jira/browse/FLINK-9853 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: xueyu >Priority: Major > Labels: pull-request-available > > like in mysql, HEX could take int or string arguments, For a integer argument > N, it returns a hexadecimal string representation of the value of N. For a > string argument str, it returns a hexadecimal string representation of str > where each byte of each character in str is converted to two hexadecimal > digits. > Syntax: > HEX(100) = 64 > HEX('This is a test String.') = '546869732069732061207465737420537472696e672e' > See more: [link > MySQL|https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_hex] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6337: [FLINK-9853][Tabel API & SQL] add HEX support
GitHub user xueyumusic opened a pull request: https://github.com/apache/flink/pull/6337 [FLINK-9853][Tabel API & SQL] add HEX support ## What is the purpose of the change This PR propose to add HEX in table api and sql, the syntax like mysql, which could take int or string arguments. For a integer argument N, it returns a hexadecimal string representation of the value of N. For a string argument str, it returns a hexadecimal string representation of str where each byte of each character in str is converted to two hexadecimal digits. Syntax: HEX(100) = 64 HEX('This is a test String.') = '546869732069732061207465737420537472696e672e' ## Brief change log - *The expressionDsl, scalarSqlFunctions and mathExpressions to add hex* - *The FunctionGenerator to support hex generator* ## Verifying this change This change added tests and can be verified as follows: *(example:)* - *Added ScalaFunctionTests tests for table api and sql expressions* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xueyumusic/flink hex Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6337.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6337 commit 8d3bef913bee1d4913ef3ae056e3d15d4cda2cec Author: xueyu <278006819@...> Date: 2018-07-15T12:01:15Z hex support ---
[jira] [Updated] (FLINK-9853) add hex support in table api and sql
[ https://issues.apache.org/jira/browse/FLINK-9853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9853: -- Labels: pull-request-available (was: ) > add hex support in table api and sql > > > Key: FLINK-9853 > URL: https://issues.apache.org/jira/browse/FLINK-9853 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: xueyu >Priority: Major > Labels: pull-request-available > > like in mysql, HEX could take int or string arguments, For a integer argument > N, it returns a hexadecimal string representation of the value of N. For a > string argument str, it returns a hexadecimal string representation of str > where each byte of each character in str is converted to two hexadecimal > digits. > Syntax: > HEX(100) = 64 > HEX('This is a test String.') = '546869732069732061207465737420537472696e672e' > See more: [link > MySQL|https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_hex] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544541#comment-16544541 ] ASF GitHub Bot commented on FLINK-9630: --- GitHub user ubyyj opened a pull request: https://github.com/apache/flink/pull/6336 [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection … …leak on TopicAuthorizationException **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change Fix the bug that Kafka09PartitionDiscoverer can cause TCP connection leak, if getAllPartitionsForTopics() get a TopicAuthorizationException. ## Brief change log catch TopicAuthorizationException and close the kafkaConsumer in getAllPartitionsForTopics(). ## Verifying this change This change added tests and can be verified as follows: - *Manually verified the change by running job which consumes from an non-exist kafka topic, and verified the # of opened TCP connection and # file handle did not increase of the task manager process, The fix has beening running in our production for weeks now, without problem * ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/ubyyj/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6336.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6336 commit 0aa8d75af085c2465e8cfd9e5a572770a5d95738 Author: yuanyoujun Date: 2018-07-15T13:07:49Z [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery
[jira] [Commented] (FLINK-9841) Web UI only show partial taskmanager log
[ https://issues.apache.org/jira/browse/FLINK-9841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544549#comment-16544549 ] ASF GitHub Bot commented on FLINK-9841: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6329 @dawidwys can you review this PR? > Web UI only show partial taskmanager log > - > > Key: FLINK-9841 > URL: https://issues.apache.org/jira/browse/FLINK-9841 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 > Environment: env : Flink on YARN > version : 1.5.0 >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > > In the web UI, we select a task manager and click the "log" tab, but the UI > only show the partial log (first part), can never update even if we click the > "refresh" button. > However, the job manager is always OK. > The reason is the resource be closed twice. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8544) JSONKeyValueDeserializationSchema throws NPE when message key is null
[ https://issues.apache.org/jira/browse/FLINK-8544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz resolved FLINK-8544. - Resolution: Fixed Fix Version/s: 1.6.0 > JSONKeyValueDeserializationSchema throws NPE when message key is null > - > > Key: FLINK-8544 > URL: https://issues.apache.org/jira/browse/FLINK-8544 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Bill Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > JSONKeyValueDeserializationSchema call Jaskon to deserialize the message key > without validation. > If a message with key == null is read, flink throws an NPE. > {code:java} > @Override > public ObjectNode deserialize(byte[] messageKey, byte[] message, String > topic, int partition, long offset) throws IOException { > if (mapper == null) { > mapper = new ObjectMapper(); > } > ObjectNode node = mapper.createObjectNode(); > node.set("key", mapper.readValue(messageKey, JsonNode.class)); > // messageKey is not validate against null. > node.set("value", mapper.readValue(message, JsonNode.class)); > {code} > The fix is very straightforward. > {code:java} > if (messageKey == null) { > node.set("key", null) > } else { > node.set("key", mapper.readValue(messageKey, > JsonNode.class)); > } > {code} > If it is appreciated, I would send a pull request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9483) "Building Flink" doc doesn't highlight quick build command
[ https://issues.apache.org/jira/browse/FLINK-9483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz resolved FLINK-9483. - Resolution: Fixed > "Building Flink" doc doesn't highlight quick build command > -- > > Key: FLINK-9483 > URL: https://issues.apache.org/jira/browse/FLINK-9483 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.6.0 > Environment: see difference between red and blue parts >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: Screen Shot 2018-05-31 at 4.12.32 PM.png > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544654#comment-16544654 ] ASF GitHub Bot commented on FLINK-8858: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6332 > Add support for INSERT INTO in SQL Client > - > > Key: FLINK-8858 > URL: https://issues.apache.org/jira/browse/FLINK-8858 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The current design of SQL Client embedded mode doesn't support long running > queries. It would be useful for simple jobs that can be expressed in a single > sql statement if we can submit sql statements stored in files as long running > queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9854) Allow passing multi-line input to SQL Client CLI
Timo Walther created FLINK-9854: --- Summary: Allow passing multi-line input to SQL Client CLI Key: FLINK-9854 URL: https://issues.apache.org/jira/browse/FLINK-9854 Project: Flink Issue Type: Sub-task Components: Table API SQL Reporter: Timo Walther We should support {{flink-cli < query01.sql}} or {{echo "INSERT INTO bar SELECT * FROM foo" | flink-cli}} for convenience. I'm not sure how well we support multilines and EOF right now. Currenlty, with the experimental {{-u}} flag the user also gets the correct error code after the submission, with {{flink-cli < query01.sql}} the CLI would either stay in interactive mode or always return success. We should also discuss which statements are allowed. Actually, only DDL and {{INSERT INTO}} statements make sense so far. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202553765 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -342,16 +379,20 @@ private void restorePartitionedState(Collection state) throws for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) { restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); - StateTable stateTable = stateTables.get(restoredMetaInfo.getName()); + StateSnapshotRestore snapshotRestore = registeredStates.get(restoredMetaInfo.getName()); //important: only create a new table we did not already create it previously - if (null == stateTable) { + if (null == snapshotRestore) { - RegisteredKeyedBackendStateMetaInfo registeredKeyedBackendStateMetaInfo = - new RegisteredKeyedBackendStateMetaInfo<>(restoredMetaInfo); + if (restoredMetaInfo.getBackendStateType() == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE) { + RegisteredKeyValueStateBackendMetaInfo registeredKeyedBackendStateMetaInfo = + new RegisteredKeyValueStateBackendMetaInfo<>(restoredMetaInfo); - stateTable = snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo); - stateTables.put(restoredMetaInfo.getName(), stateTable); + snapshotRestore = snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo); + registeredStates.put(restoredMetaInfo.getName(), snapshotRestore); + } else { --- End diff -- Maybe check that `(restoredMetaInfo.getBackendStateType() == PRIORITY_QUEUE` ---
[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544668#comment-16544668 ] ASF GitHub Bot commented on FLINK-9489: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202552876 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java --- @@ -63,54 +72,46 @@ public RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMe } @SuppressWarnings("unchecked") - public RegisteredBroadcastBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) { + public RegisteredBroadcastStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) { this( snapshot.getName(), OperatorStateHandle.Mode.valueOf( snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)), - (TypeSerializer) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER), - (TypeSerializer) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)); + (TypeSerializer) Preconditions.checkNotNull( + snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)), + (TypeSerializer) Preconditions.checkNotNull( + snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))); Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType()); } /** * Creates a deep copy of the itself. */ - public RegisteredBroadcastBackendStateMetaInfo deepCopy() { - return new RegisteredBroadcastBackendStateMetaInfo<>(this); + @Nonnull + public RegisteredBroadcastStateBackendMetaInfo deepCopy() { + return new RegisteredBroadcastStateBackendMetaInfo<>(this); } @Nonnull @Override public StateMetaInfoSnapshot snapshot() { - Map optionsMap = Collections.singletonMap( - StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(), - assignmentMode.toString()); - Map> serializerMap = new HashMap<>(2); - Map serializerConfigSnapshotsMap = new HashMap<>(2); - String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(); - String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(); - serializerMap.put(keySerializerKey, keySerializer.duplicate()); - serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration()); - serializerMap.put(valueSerializerKey, valueSerializer.duplicate()); - serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration()); - - return new StateMetaInfoSnapshot( - name, - StateMetaInfoSnapshot.BackendStateType.BROADCAST, - optionsMap, - serializerConfigSnapshotsMap, - serializerMap); + if (precomputedSnapshot == null) { + precomputedSnapshot = precomputeSnapshot(); + } + return precomputedSnapshot; --- End diff -- As an easy fix, we could remove the `precomputedSnapshot` field and keep it like it was before that the snapshot was computed with every `snapshot` call. > Checkpoint timers as part of managed keyed state instead of raw keyed state > --- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside the managed keyed state. This means that we have to connect our > preparation for asynchronous checkpoints with the backend, so that the timers > are written as part of the state for each key-group. This means that we will > also free up the raw keyed state an might expose it to user functions in the > future. -- This message was sent
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202552616 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java --- @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nonnull; + +/** + * --- End diff -- JavaDocs missing ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202552483 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java --- @@ -28,9 +28,22 @@ @FunctionalInterface public interface KeyExtractorFunction { + KeyExtractorFunction> FOR_KEYED_OBJECTS = new KeyExtractorFunction>() { + @Nonnull + @Override + public Object extractKeyFromElement(@Nonnull Keyed element) { + return element.getKey(); + } + }; --- End diff -- Could we move this extractor into its own `KeyedKeyExtractorFunction` singleton? ---
[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544676#comment-16544676 ] ASF GitHub Bot commented on FLINK-9489: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202553524 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java --- @@ -80,13 +82,57 @@ public E peek() { @Override public void bulkPoll(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { + if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) { + bulkPollRelaxedOrder(canConsume, consumer); + } else { + bulkPollStrictOrder(canConsume, consumer); + } + } + + private void bulkPollRelaxedOrder(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { + if (orderedCache.isEmpty()) { + bulkPollStore(canConsume, consumer); + } else { + while (!orderedCache.isEmpty() && canConsume.test(orderedCache.peekFirst())) { + final E next = orderedCache.removeFirst(); + orderedStore.remove(next); + consumer.accept(next); + } + + if (orderedCache.isEmpty()) { + bulkPollStore(canConsume, consumer); + } + } + } + + private void bulkPollStrictOrder(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { E element; while ((element = peek()) != null && canConsume.test(element)) { poll(); consumer.accept(element); } } + private void bulkPollStore(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { + try (CloseableIterator iterator = orderedStore.orderedIterator()) { + while (iterator.hasNext()) { + final E next = iterator.next(); + if (canConsume.test(next)) { + orderedStore.remove(next); + consumer.accept(next); + } else { + orderedCache.add(next); + while (iterator.hasNext() && !orderedCache.isFull()) { + orderedCache.add(iterator.next()); + } + break; + } + } + } catch (Exception e) { + throw new FlinkRuntimeException("Exception while bulk polling store.", e); --- End diff -- I would prefer throwing a checked exception here. > Checkpoint timers as part of managed keyed state instead of raw keyed state > --- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside the managed keyed state. This means that we have to connect our > preparation for asynchronous checkpoints with the backend, so that the timers > are written as part of the state for each key-group. This means that we will > also free up the raw keyed state an might expose it to user functions in the > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544670#comment-16544670 ] ASF GitHub Bot commented on FLINK-9489: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202553958 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java --- @@ -60,36 +61,35 @@ /** Result of partitioning the snapshot by key-group. */ @Nullable - private KeyGroupPartitionedSnapshot partitionedSnapshot; + private StateKeyGroupWriter partitionedSnapshot; --- End diff -- nit: rename field > Checkpoint timers as part of managed keyed state instead of raw keyed state > --- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside the managed keyed state. This means that we have to connect our > preparation for asynchronous checkpoints with the backend, so that the timers > are written as part of the state for each key-group. This means that we will > also free up the raw keyed state an might expose it to user functions in the > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544674#comment-16544674 ] ASF GitHub Bot commented on FLINK-9489: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202553887 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -446,8 +485,10 @@ public String toString() { @Override public int numStateEntries() { int sum = 0; - for (StateTable stateTable : stateTables.values()) { - sum += stateTable.size(); + for (StateSnapshotRestore stateTable : registeredStates.values()) { + if (stateTable instanceof StateTable) { + sum += ((StateTable) stateTable).size(); + } --- End diff -- Why does the timers don't count for the total number of state entries? > Checkpoint timers as part of managed keyed state instead of raw keyed state > --- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside the managed keyed state. This means that we have to connect our > preparation for asynchronous checkpoints with the backend, so that the timers > are written as part of the state for each key-group. This means that we will > also free up the raw keyed state an might expose it to user functions in the > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9850: --- Assignee: vinoyang > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6260: [FLINK-9758] Fix ContinuousFileProcessingTest failure due...
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/6260 +1, merging ---
[jira] [Commented] (FLINK-9758) ContinuousFileProcessingTest failed due to not setting runtimeContext
[ https://issues.apache.org/jira/browse/FLINK-9758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544537#comment-16544537 ] ASF GitHub Bot commented on FLINK-9758: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/6260 +1, merging > ContinuousFileProcessingTest failed due to not setting runtimeContext > - > > Key: FLINK-9758 > URL: https://issues.apache.org/jira/browse/FLINK-9758 > Project: Flink > Issue Type: Bug > Components: DataStream API, Tests >Affects Versions: 1.5.0, 1.6.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Minor > Labels: pull-request-available > Fix For: 1.5.2 > > > Currently, ContinuousFileMonitoringFunction#open() method will print runtime > context if the log-level is Debug: > {code:java} > if (LOG.isDebugEnabled()) { >LOG.debug("Opened {} (taskIdx= {}) for path: {}", > getClass().getSimpleName(), > getRuntimeContext().getIndexOfThisSubtask(), path); > } > {code} > However, ContinuousFileProcessingTest did not set runtime context for > monitoringFunction, which will result in UT failure due to > IllegalStateException if log level is set to DEBUG > {code:java} > IllegalStateException("The runtime context has not been initialized."); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8731) TwoInputStreamTaskTest flaky on travis
[ https://issues.apache.org/jira/browse/FLINK-8731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544632#comment-16544632 ] ASF GitHub Bot commented on FLINK-8731: --- GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/6338 [FLINK-8731] Replaced mockito with custom mock in TestInputChannel Replaced questionable usage if Mockito with custom written mock. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink test-mock-lock Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6338.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6338 commit a2c78e6193c57bb69fc6f7d7b629ef0ec47171bb Author: Dawid Wysakowicz Date: 2018-07-12T14:17:48Z [FLINK-8731] Replaced mockito with custom mock in TestInputChannel > TwoInputStreamTaskTest flaky on travis > -- > > Key: FLINK-8731 > URL: https://issues.apache.org/jira/browse/FLINK-8731 > Project: Flink > Issue Type: Bug > Components: Streaming, Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.5.2, 1.6.0 > > > https://travis-ci.org/zentol/flink/builds/344307861 > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 2.479 sec <<< > FAILURE! - in org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest > testOpenCloseAndTimestamps(org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest) > Time elapsed: 0.05 sec <<< ERROR! > java.lang.Exception: error in task > at > org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:250) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:233) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.testOpenCloseAndTimestamps(TwoInputStreamTaskTest.java:99) > Caused by: org.mockito.exceptions.misusing.WrongTypeOfReturnValue: > Boolean cannot be returned by getChannelIndex() > getChannelIndex() should return int > *** > If you're unsure why you're getting above error read on. > Due to the nature of the syntax above problem might occur because: > 1. This exception *might* occur in wrongly written multi-threaded tests. >Please refer to Mockito FAQ on limitations of concurrency testing. > 2. A spy is stubbed using when(spy.foo()).then() syntax. It is safer to stub > spies - >- with doReturn|Throw() family of methods. More in javadocs for > Mockito.spy() method. > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:212) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:158) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:164) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness$TaskThread.run(StreamTaskTestHarness.java:437) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9758) ContinuousFileProcessingTest failed due to not setting runtimeContext
[ https://issues.apache.org/jira/browse/FLINK-9758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz resolved FLINK-9758. - Resolution: Fixed Fix Version/s: (was: 1.5.2) 1.6.0 > ContinuousFileProcessingTest failed due to not setting runtimeContext > - > > Key: FLINK-9758 > URL: https://issues.apache.org/jira/browse/FLINK-9758 > Project: Flink > Issue Type: Bug > Components: DataStream API, Tests >Affects Versions: 1.5.0, 1.6.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Minor > Labels: pull-request-available > Fix For: 1.6.0 > > > Currently, ContinuousFileMonitoringFunction#open() method will print runtime > context if the log-level is Debug: > {code:java} > if (LOG.isDebugEnabled()) { >LOG.debug("Opened {} (taskIdx= {}) for path: {}", > getClass().getSimpleName(), > getRuntimeContext().getIndexOfThisSubtask(), path); > } > {code} > However, ContinuousFileProcessingTest did not set runtime context for > monitoringFunction, which will result in UT failure due to > IllegalStateException if log level is set to DEBUG > {code:java} > IllegalStateException("The runtime context has not been initialized."); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6120#discussion_r202553410 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -584,21 +581,6 @@ private static void registerFactory(Type t, Class fac // number of parameters the SAM of implemented interface has; the parameter indexing applies to this range final int baseParametersLen = sam.getParameterTypes().length; - // executable references "this" implicitly - if (paramLen <= 0) { - // executable declaring class can also be a super class of the input type - // we only validate if the executable exists in input type - validateInputContainsExecutable(exec, inType); - } - else { - final Type input = TypeExtractionUtils.extractTypeFromLambda( - exec, - lambdaInputTypeArgumentIndices, --- End diff -- Good point. I will remove it. ---
[jira] [Created] (FLINK-9853) add hex support in table api and sql
xueyu created FLINK-9853: Summary: add hex support in table api and sql Key: FLINK-9853 URL: https://issues.apache.org/jira/browse/FLINK-9853 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: xueyu like in mysql, HEX could take int or string arguments, For a integer argument N, it returns a hexadecimal string representation of the value of N. For a string argument str, it returns a hexadecimal string representation of str where each byte of each character in str is converted to two hexadecimal digits. Syntax: HEX(100) = 64 HEX('This is a test String.') = '546869732069732061207465737420537472696e672e' See more: [link MySQL|https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_hex] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6335 @tedyu Could you please take a look on this ? Thank you very much. ---
[jira] [Commented] (FLINK-9675) Avoid FileInputStream/FileOutputStream
[ https://issues.apache.org/jira/browse/FLINK-9675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544544#comment-16544544 ] ASF GitHub Bot commented on FLINK-9675: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6335 @tedyu Could you please take a look on this ? Thank you very much. > Avoid FileInputStream/FileOutputStream > -- > > Key: FLINK-9675 > URL: https://issues.apache.org/jira/browse/FLINK-9675 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Minor > Labels: filesystem, pull-request-available > > They rely on finalizers (before Java 11), which create unnecessary GC load. > The alternatives, Files.newInputStream, are as easy to use and don't have > this issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9794) JDBCOutputFormat does not consider idle connection and multithreads synchronization
[ https://issues.apache.org/jira/browse/FLINK-9794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544554#comment-16544554 ] ASF GitHub Bot commented on FLINK-9794: --- Github user jrthe42 commented on the issue: https://github.com/apache/flink/pull/6301 Hi @hequn8128, I agree with you that a connection pool is more effective using connection resource. I didn't choose connection pool because that will introduce new dependencies, and I'm not sure if that tradeoff is acceptable. I will check ```MiniConnectionPoolManager ``` to see if it's a better way. > JDBCOutputFormat does not consider idle connection and multithreads > synchronization > --- > > Key: FLINK-9794 > URL: https://issues.apache.org/jira/browse/FLINK-9794 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.0, 1.5.0 >Reporter: wangsan >Assignee: wangsan >Priority: Major > Labels: pull-request-available > > Current implementation of JDBCOutputFormat has two potential problems: > 1. The Connection was established when JDBCOutputFormat is opened, and will > be used all the time. But if this connection lies idle for a long time, the > database will force close the connection, thus errors may occur. > 2. The flush() method is called when batchCount exceeds the threshold, but it > is also called while snapshotting state. So two threads may modify upload and > batchCount, but without synchronization. > We need fix these two problems to make JDBCOutputFormat more reliable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...
Github user jrthe42 commented on the issue: https://github.com/apache/flink/pull/6301 Hi @hequn8128, I agree with you that a connection pool is more effective using connection resource. I didn't choose connection pool because that will introduce new dependencies, and I'm not sure if that tradeoff is acceptable. I will check ```MiniConnectionPoolManager ``` to see if it's a better way. ---
[jira] [Commented] (FLINK-9829) The wrapper classes be compared by symbol of '==' directly in BigDecSerializer.java
[ https://issues.apache.org/jira/browse/FLINK-9829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544599#comment-16544599 ] ASF GitHub Bot commented on FLINK-9829: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6321 Is the issue addressed here a bug? If not, and if it seems that the original authors of the code had an intention of writing the code as it is now, I would suggest to leave it as it is. > The wrapper classes be compared by symbol of '==' directly in > BigDecSerializer.java > --- > > Key: FLINK-9829 > URL: https://issues.apache.org/jira/browse/FLINK-9829 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > The wrapper classes should be compared by equals method rather than by symbol > of '==' directly in BigDecSerializer.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6321: [FLINK-9829] fix the wrapper classes be compared by symbo...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6321 Is the issue addressed here a bug? If not, and if it seems that the original authors of the code had an intention of writing the code as it is now, I would suggest to leave it as it is. ---
[jira] [Commented] (FLINK-9121) Remove Flip-6 prefixes from code base
[ https://issues.apache.org/jira/browse/FLINK-9121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544602#comment-16544602 ] ASF GitHub Bot commented on FLINK-9121: --- Github user Matrix42 commented on a diff in the pull request: https://github.com/apache/flink/pull/5801#discussion_r202548776 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java --- @@ -49,30 +54,45 @@ public YarnClusterDescriptor( @Override protected String getYarnSessionClusterEntrypoint() { - return YarnApplicationMasterRunner.class.getName(); + return YarnSessionClusterEntrypoint.class.getName(); } @Override protected String getYarnJobClusterEntrypoint() { - throw new UnsupportedOperationException("The old Yarn descriptor does not support proper per-job mode."); + return YarnJobClusterEntrypoint.class.getName(); } @Override - public YarnClusterClient deployJobCluster( - ClusterSpecification clusterSpecification, - JobGraph jobGraph, - boolean detached) { - throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet."); + public ClusterClient deployJobCluster( + ClusterSpecification clusterSpecification, + JobGraph jobGraph, + boolean detached) throws ClusterDeploymentException { + + // this is required because the slots are allocated lazily + jobGraph.setAllowQueuedScheduling(true); + + try { + return deployInternal( + clusterSpecification, + "Flink per-job cluster", + getYarnJobClusterEntrypoint(), + jobGraph, + detached); + } catch (Exception e) { + throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e); + } } @Override - protected ClusterClient createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster) throws Exception { - return new YarnClusterClient( - descriptor, - numberTaskManagers, - slotsPerTaskManager, - report, + protected ClusterClient createYarnClusterClient( + AbstractYarnClusterDescriptor descriptor, + int numberTaskManagers, + int slotsPerTaskManager, + ApplicationReport report, + Configuration flinkConfiguration, + boolean perJobCluster) throws Exception { + return new RestClusterClient<>( --- End diff -- why don't return a YarnClusterClient here? > Remove Flip-6 prefixes from code base > - > > Key: FLINK-9121 > URL: https://issues.apache.org/jira/browse/FLINK-9121 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > We should remove all Flip-6 prefixes and other references from the code base > since it is not a special case but the new default architecture. Instead we > should prefix old code with legacy. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5801: [FLINK-9121] [flip6] Remove Flip6 prefixes and oth...
Github user Matrix42 commented on a diff in the pull request: https://github.com/apache/flink/pull/5801#discussion_r202548776 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java --- @@ -49,30 +54,45 @@ public YarnClusterDescriptor( @Override protected String getYarnSessionClusterEntrypoint() { - return YarnApplicationMasterRunner.class.getName(); + return YarnSessionClusterEntrypoint.class.getName(); } @Override protected String getYarnJobClusterEntrypoint() { - throw new UnsupportedOperationException("The old Yarn descriptor does not support proper per-job mode."); + return YarnJobClusterEntrypoint.class.getName(); } @Override - public YarnClusterClient deployJobCluster( - ClusterSpecification clusterSpecification, - JobGraph jobGraph, - boolean detached) { - throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet."); + public ClusterClient deployJobCluster( + ClusterSpecification clusterSpecification, + JobGraph jobGraph, + boolean detached) throws ClusterDeploymentException { + + // this is required because the slots are allocated lazily + jobGraph.setAllowQueuedScheduling(true); + + try { + return deployInternal( + clusterSpecification, + "Flink per-job cluster", + getYarnJobClusterEntrypoint(), + jobGraph, + detached); + } catch (Exception e) { + throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e); + } } @Override - protected ClusterClient createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster) throws Exception { - return new YarnClusterClient( - descriptor, - numberTaskManagers, - slotsPerTaskManager, - report, + protected ClusterClient createYarnClusterClient( + AbstractYarnClusterDescriptor descriptor, + int numberTaskManagers, + int slotsPerTaskManager, + ApplicationReport report, + Configuration flinkConfiguration, + boolean perJobCluster) throws Exception { + return new RestClusterClient<>( --- End diff -- why don't return a YarnClusterClient here? ---
[jira] [Commented] (FLINK-9404) Adapter viewfs in BucketingSink when verify that truncate actually works
[ https://issues.apache.org/jira/browse/FLINK-9404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544611#comment-16544611 ] ASF GitHub Bot commented on FLINK-9404: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6050 > Adapter viewfs in BucketingSink when verify that truncate actually works > > > Key: FLINK-9404 > URL: https://issues.apache.org/jira/browse/FLINK-9404 > Project: Flink > Issue Type: Bug >Reporter: xymaqingxiang >Priority: Major > Labels: pull-request-available > > Hi: > > when the viewfs function is enabled, the following operation will report > errors, > "new Path(UUID.randomUUID().toString())" > Hence, I think that we can add the bashPath, like this: > "new Path(basePath, UUID.randomUUID().toString())" > > thanks, > Ma Qingxiang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6326: Mutual authentication for internal communication
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6326 Pushed another commit that rebuilds the generated config docs ---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6120#discussion_r202552984 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -584,21 +581,6 @@ private static void registerFactory(Type t, Class fac // number of parameters the SAM of implemented interface has; the parameter indexing applies to this range final int baseParametersLen = sam.getParameterTypes().length; - // executable references "this" implicitly - if (paramLen <= 0) { --- End diff -- The input validation caused more errors than it solved. Especially with generic types. For lambdas this validation is limited anyway in a JDK compiler. ---
[jira] [Commented] (FLINK-7251) Merge the flink-java8 project into flink-core
[ https://issues.apache.org/jira/browse/FLINK-7251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544657#comment-16544657 ] ASF GitHub Bot commented on FLINK-7251: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6120#discussion_r202552984 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -584,21 +581,6 @@ private static void registerFactory(Type t, Class fac // number of parameters the SAM of implemented interface has; the parameter indexing applies to this range final int baseParametersLen = sam.getParameterTypes().length; - // executable references "this" implicitly - if (paramLen <= 0) { --- End diff -- The input validation caused more errors than it solved. Especially with generic types. For lambdas this validation is limited anyway in a JDK compiler. > Merge the flink-java8 project into flink-core > - > > Key: FLINK-7251 > URL: https://issues.apache.org/jira/browse/FLINK-7251 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9829) The wrapper classes be compared by symbol of '==' directly in BigDecSerializer.java
[ https://issues.apache.org/jira/browse/FLINK-9829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544598#comment-16544598 ] ASF GitHub Bot commented on FLINK-9829: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6321 This would be clearly not a hotfix. As per the pull request template, contributors should use hotfixes mainly for typos and JavaDoc updates. > The wrapper classes be compared by symbol of '==' directly in > BigDecSerializer.java > --- > > Key: FLINK-9829 > URL: https://issues.apache.org/jira/browse/FLINK-9829 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > The wrapper classes should be compared by equals method rather than by symbol > of '==' directly in BigDecSerializer.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6321: [FLINK-9829] fix the wrapper classes be compared by symbo...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6321 This would be clearly not a hotfix. As per the pull request template, contributors should use hotfixes mainly for typos and JavaDoc updates. ---
[GitHub] flink pull request #6050: [FLINK-9404][flink-connector-filesystem] Adapter v...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6050 ---
[jira] [Commented] (FLINK-8544) JSONKeyValueDeserializationSchema throws NPE when message key is null
[ https://issues.apache.org/jira/browse/FLINK-8544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544629#comment-16544629 ] ASF GitHub Bot commented on FLINK-8544: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5516 > JSONKeyValueDeserializationSchema throws NPE when message key is null > - > > Key: FLINK-8544 > URL: https://issues.apache.org/jira/browse/FLINK-8544 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Bill Lee >Priority: Major > Labels: pull-request-available > Original Estimate: 1h > Remaining Estimate: 1h > > JSONKeyValueDeserializationSchema call Jaskon to deserialize the message key > without validation. > If a message with key == null is read, flink throws an NPE. > {code:java} > @Override > public ObjectNode deserialize(byte[] messageKey, byte[] message, String > topic, int partition, long offset) throws IOException { > if (mapper == null) { > mapper = new ObjectMapper(); > } > ObjectNode node = mapper.createObjectNode(); > node.set("key", mapper.readValue(messageKey, JsonNode.class)); > // messageKey is not validate against null. > node.set("value", mapper.readValue(message, JsonNode.class)); > {code} > The fix is very straightforward. > {code:java} > if (messageKey == null) { > node.set("key", null) > } else { > node.set("key", mapper.readValue(messageKey, > JsonNode.class)); > } > {code} > If it is appreciated, I would send a pull request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9758) ContinuousFileProcessingTest failed due to not setting runtimeContext
[ https://issues.apache.org/jira/browse/FLINK-9758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544631#comment-16544631 ] ASF GitHub Bot commented on FLINK-9758: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6260 > ContinuousFileProcessingTest failed due to not setting runtimeContext > - > > Key: FLINK-9758 > URL: https://issues.apache.org/jira/browse/FLINK-9758 > Project: Flink > Issue Type: Bug > Components: DataStream API, Tests >Affects Versions: 1.5.0, 1.6.0 >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Minor > Labels: pull-request-available > Fix For: 1.5.2 > > > Currently, ContinuousFileMonitoringFunction#open() method will print runtime > context if the log-level is Debug: > {code:java} > if (LOG.isDebugEnabled()) { >LOG.debug("Opened {} (taskIdx= {}) for path: {}", > getClass().getSimpleName(), > getRuntimeContext().getIndexOfThisSubtask(), path); > } > {code} > However, ContinuousFileProcessingTest did not set runtime context for > monitoringFunction, which will result in UT failure due to > IllegalStateException if log level is set to DEBUG > {code:java} > IllegalStateException("The runtime context has not been initialized."); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8731) TwoInputStreamTaskTest flaky on travis
[ https://issues.apache.org/jira/browse/FLINK-8731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-8731: -- Labels: pull-request-available test-stability (was: test-stability) > TwoInputStreamTaskTest flaky on travis > -- > > Key: FLINK-8731 > URL: https://issues.apache.org/jira/browse/FLINK-8731 > Project: Flink > Issue Type: Bug > Components: Streaming, Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.5.2, 1.6.0 > > > https://travis-ci.org/zentol/flink/builds/344307861 > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 2.479 sec <<< > FAILURE! - in org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest > testOpenCloseAndTimestamps(org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest) > Time elapsed: 0.05 sec <<< ERROR! > java.lang.Exception: error in task > at > org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:250) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness.waitForTaskCompletion(StreamTaskTestHarness.java:233) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.testOpenCloseAndTimestamps(TwoInputStreamTaskTest.java:99) > Caused by: org.mockito.exceptions.misusing.WrongTypeOfReturnValue: > Boolean cannot be returned by getChannelIndex() > getChannelIndex() should return int > *** > If you're unsure why you're getting above error read on. > Due to the nature of the syntax above problem might occur because: > 1. This exception *might* occur in wrongly written multi-threaded tests. >Please refer to Mockito FAQ on limitations of concurrency testing. > 2. A spy is stubbed using when(spy.foo()).then() syntax. It is safer to stub > spies - >- with doReturn|Throw() family of methods. More in javadocs for > Mockito.spy() method. > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:212) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:158) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:164) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness$TaskThread.run(StreamTaskTestHarness.java:437) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6338: [FLINK-8731] Replaced mockito with custom mock in ...
GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/6338 [FLINK-8731] Replaced mockito with custom mock in TestInputChannel Replaced questionable usage if Mockito with custom written mock. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink test-mock-lock Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6338.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6338 commit a2c78e6193c57bb69fc6f7d7b629ef0ec47171bb Author: Dawid Wysakowicz Date: 2018-07-12T14:17:48Z [FLINK-8731] Replaced mockito with custom mock in TestInputChannel ---
[jira] [Commented] (FLINK-9675) Avoid FileInputStream/FileOutputStream
[ https://issues.apache.org/jira/browse/FLINK-9675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544617#comment-16544617 ] ASF GitHub Bot commented on FLINK-9675: --- Github user tedyu commented on the issue: https://github.com/apache/flink/pull/6335 From https://travis-ci.org/apache/flink/jobs/404127448 : ``` Failed tests: BlobServerPutTest.testPutBufferFailsIncomingForJob Expected: (an instance of java.io.IOException and exception with message a string containing " (Permission denied)") but: exception with message a string containing " (Permission denied)" message was "/tmp/junit4230160308108538699/junit7824839689112636430/blobStore-94b4189a-4e00-4bca-82c2-5ca72b1fb950/incoming/temp-0001" Stacktrace was: java.nio.file.AccessDeniedException: /tmp/junit4230160308108538699/junit7824839689112636430/blobStore-94b4189a-4e00-4bca-82c2-5ca72b1fb950/incoming/temp-0001 at sun.nio.fs.UnixException.translateToIOException(UnixException.java:84) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) at java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434) at java.nio.file.Files.newOutputStream(Files.java:216) at org.apache.flink.runtime.blob.BlobServer.putBuffer(BlobServer.java:594) at org.apache.flink.runtime.blob.BlobServer.putTransient(BlobServer.java:542) at org.apache.flink.runtime.blob.BlobServerPutTest.put(BlobServerPutTest.java:799) at org.apache.flink.runtime.blob.BlobServerPutTest.testPutBufferFailsIncoming(BlobServerPutTest.java:559) at org.apache.flink.runtime.blob.BlobServerPutTest.testPutBufferFailsIncomingForJob(BlobServerPutTest.java:516) ``` Please check the test failure. Thanks > Avoid FileInputStream/FileOutputStream > -- > > Key: FLINK-9675 > URL: https://issues.apache.org/jira/browse/FLINK-9675 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: zhangminglei >Priority: Minor > Labels: filesystem, pull-request-available > > They rely on finalizers (before Java 11), which create unnecessary GC load. > The alternatives, Files.newInputStream, are as easy to use and don't have > this issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6335: [FLINK-9675] [fs] Avoid FileInputStream/FileOutputStream
Github user tedyu commented on the issue: https://github.com/apache/flink/pull/6335 From https://travis-ci.org/apache/flink/jobs/404127448 : ``` Failed tests: BlobServerPutTest.testPutBufferFailsIncomingForJob Expected: (an instance of java.io.IOException and exception with message a string containing " (Permission denied)") but: exception with message a string containing " (Permission denied)" message was "/tmp/junit4230160308108538699/junit7824839689112636430/blobStore-94b4189a-4e00-4bca-82c2-5ca72b1fb950/incoming/temp-0001" Stacktrace was: java.nio.file.AccessDeniedException: /tmp/junit4230160308108538699/junit7824839689112636430/blobStore-94b4189a-4e00-4bca-82c2-5ca72b1fb950/incoming/temp-0001 at sun.nio.fs.UnixException.translateToIOException(UnixException.java:84) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) at java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434) at java.nio.file.Files.newOutputStream(Files.java:216) at org.apache.flink.runtime.blob.BlobServer.putBuffer(BlobServer.java:594) at org.apache.flink.runtime.blob.BlobServer.putTransient(BlobServer.java:542) at org.apache.flink.runtime.blob.BlobServerPutTest.put(BlobServerPutTest.java:799) at org.apache.flink.runtime.blob.BlobServerPutTest.testPutBufferFailsIncoming(BlobServerPutTest.java:559) at org.apache.flink.runtime.blob.BlobServerPutTest.testPutBufferFailsIncomingForJob(BlobServerPutTest.java:516) ``` Please check the test failure. Thanks ---
[jira] [Resolved] (FLINK-8858) Add support for INSERT INTO in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-8858. - Resolution: Fixed Fix Version/s: 1.6.0 Fixed in 1.6.0: 695bc56a9e20b9d86eea14a02899b400d324a7ea > Add support for INSERT INTO in SQL Client > - > > Key: FLINK-8858 > URL: https://issues.apache.org/jira/browse/FLINK-8858 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Renjie Liu >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The current design of SQL Client embedded mode doesn't support long running > queries. It would be useful for simple jobs that can be expressed in a single > sql statement if we can submit sql statements stored in files as long running > queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6332 ---
[GitHub] flink issue #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified table ...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/6323 Thank @pnowojski. Merging this... ---
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544464#comment-16544464 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202535180 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T],
[jira] [Created] (FLINK-9851) Add documentation for unified table sources/sinks
Timo Walther created FLINK-9851: --- Summary: Add documentation for unified table sources/sinks Key: FLINK-9851 URL: https://issues.apache.org/jira/browse/FLINK-9851 Project: Flink Issue Type: Improvement Components: Documentation, Table API SQL Reporter: Timo Walther Assignee: Timo Walther FLINK-8558 and FLINK-8866 reworked a lot of the existing table sources/sinks and the way they are discovered. We should rework the documentation about: - Built-in sinks/source/formats and their properties for Table API and SQL Client - How to write custom sinks/sources/formats - Limitations such as {{property-version}}, {{rowtime.timestamps.from-source}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6323: [FLINK-8558] [FLINK-8866] [table] Finalize unified...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202535180 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala --- @@ -18,143 +18,358 @@ package org.apache.flink.table.factories -import java.util.{ServiceConfigurationError, ServiceLoader} +import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.descriptors.{DescriptorProperties, TableDescriptor, TableDescriptorValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.util.Logging +import org.apache.flink.util.Preconditions import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable /** - * Unified interface to search for TableFactoryDiscoverable of provided type and properties. + * Unified interface to search for a [[TableFactory]] of provided type and properties. */ object TableFactoryService extends Logging { private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory]) - def find(clz: Class[_], descriptor: TableDescriptor): TableFactory = { -find(clz, descriptor, null) + /** +* Finds a table factory of the given class and descriptor. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) + +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], descriptor: TableDescriptor, classLoader: ClassLoader) - : TableFactory = { + /** +* Finds a table factory of the given class, descriptor, and classloader. +* +* @param factoryClass desired factory class +* @param descriptor descriptor describing the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(descriptor) +Preconditions.checkNotNull(classLoader) -val properties = new DescriptorProperties() -descriptor.addProperties(properties) -find(clz, properties.asMap.asScala.toMap, classLoader) +val descriptorProperties = new DescriptorProperties() +descriptor.addProperties(descriptorProperties) +findInternal(factoryClass, descriptorProperties.asMap, None) } - def find(clz: Class[_], properties: Map[String, String]): TableFactory = { -find(clz: Class[_], properties, null) + /** +* Finds a table factory of the given class and property map. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @tparam T factory class type +* @return the matching factory +*/ + def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) + +findInternal(factoryClass, propertyMap, None) } - def find(clz: Class[_], properties: Map[String, String], - classLoader: ClassLoader): TableFactory = { + /** +* Finds a table factory of the given class, property map, and classloader. +* +* @param factoryClass desired factory class +* @param propertyMap properties that describe the factory configuration +* @param classLoader classloader for service loading +* @tparam T factory class type +* @return the matching factory +*/ + def find[T]( + factoryClass: Class[T], + propertyMap: JMap[String, String], + classLoader: ClassLoader) +: T = { +Preconditions.checkNotNull(factoryClass) +Preconditions.checkNotNull(propertyMap) +
[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544465#comment-16544465 ] ASF GitHub Bot commented on FLINK-8558: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/6323 Thank @pnowojski. Merging this... > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6332: [FLINK-8858] [sql-client] Add support for INSERT I...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6332#discussion_r202536081 --- Diff: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java --- @@ -173,55 +180,92 @@ public void open() { if (line == null || line.equals("")) { continue; } + parseAndCall(line); + } + } - final SqlCommandCall cmdCall = SqlCommandParser.parse(line); + /** +* Submits a SQL update statement and prints status information and/or errors on the terminal. +* +* @param statement SQL update statement +* @return flag to indicate if the submission was successful or not +*/ + public boolean submitUpdate(String statement) { --- End diff -- I added some unit tests. ---
[jira] [Resolved] (FLINK-8558) Add unified format interfaces and format discovery
[ https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-8558. - Resolution: Fixed Fix Version/s: 1.6.0 Fixed in 1.6.0: ee40335ffa40fb32a692fa6be70946d9a70301b2 > Add unified format interfaces and format discovery > -- > > Key: FLINK-8558 > URL: https://issues.apache.org/jira/browse/FLINK-8558 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > In the last release, we introduced a new module {{flink-formats}}. Currently > only {{flink-avro}} is located there but we will add more formats such as > {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of > concerns we want decouple connectors from formats: e.g., remove > {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}. > A newly introduced {{FormatFactory}} will use Java service loaders to > discovery available formats in the classpath (similar to how file systems are > discovered now). A {{Format}} will provide a method for converting {{byte[]}} > to target record type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause...
Github user ubyyj commented on the issue: https://github.com/apache/flink/pull/6336 unrelavent build failure, close and will reopen again, to trigger travis build again ---
[GitHub] flink pull request #6336: [FLINK-9630] [connector] Kafka09PartitionDiscovere...
Github user ubyyj closed the pull request at: https://github.com/apache/flink/pull/6336 ---
[GitHub] flink pull request #6336: [FLINK-9630] [connector] Kafka09PartitionDiscovere...
GitHub user ubyyj reopened a pull request: https://github.com/apache/flink/pull/6336 [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection ⦠â¦leak on TopicAuthorizationException **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change Fix the bug that Kafka09PartitionDiscoverer can cause TCP connection leak, if getAllPartitionsForTopics() get a TopicAuthorizationException. ## Brief change log catch TopicAuthorizationException and close the kafkaConsumer in getAllPartitionsForTopics(). ## Verifying this change This change added tests and can be verified as follows: - *Manually verified the change by running job which consumes from an non-exist kafka topic, and verified the # of opened TCP connection and # file handle did not increase of the task manager process, The fix has beening running in our production for weeks now, without problem * ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/ubyyj/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6336.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6336 commit 0aa8d75af085c2465e8cfd9e5a572770a5d95738 Author: yuanyoujun Date: 2018-07-15T13:07:49Z [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException ---
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544720#comment-16544720 ] ASF GitHub Bot commented on FLINK-9630: --- Github user ubyyj closed the pull request at: https://github.com/apache/flink/pull/6336 > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause...
Github user ubyyj commented on the issue: https://github.com/apache/flink/pull/6336 trigger travis build ---
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544721#comment-16544721 ] ASF GitHub Bot commented on FLINK-9630: --- Github user ubyyj commented on the issue: https://github.com/apache/flink/pull/6336 unrelavent build failure, close and will reopen again, to trigger travis build again > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544722#comment-16544722 ] ASF GitHub Bot commented on FLINK-9630: --- GitHub user ubyyj reopened a pull request: https://github.com/apache/flink/pull/6336 [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection … …leak on TopicAuthorizationException **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change Fix the bug that Kafka09PartitionDiscoverer can cause TCP connection leak, if getAllPartitionsForTopics() get a TopicAuthorizationException. ## Brief change log catch TopicAuthorizationException and close the kafkaConsumer in getAllPartitionsForTopics(). ## Verifying this change This change added tests and can be verified as follows: - *Manually verified the change by running job which consumes from an non-exist kafka topic, and verified the # of opened TCP connection and # file handle did not increase of the task manager process, The fix has beening running in our production for weeks now, without problem * ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/ubyyj/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6336.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6336 commit 0aa8d75af085c2465e8cfd9e5a572770a5d95738 Author: yuanyoujun Date: 2018-07-15T13:07:49Z [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544723#comment-16544723 ] ASF GitHub Bot commented on FLINK-9630: --- Github user ubyyj commented on the issue: https://github.com/apache/flink/pull/6336 trigger travis build > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6321: [FLINK-9829] fix the wrapper classes be compared b...
Github user lamber-ken commented on a diff in the pull request: https://github.com/apache/flink/pull/6321#discussion_r202566500 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java --- @@ -69,17 +69,17 @@ public void serialize(BigDecimal record, DataOutputView target) throws IOExcepti } // fast paths for 0, 1, 10 // only reference equality is checked because equals would be too expensive --- End diff -- right, the second section code was not exposed anywhere, and just modify the first section code now ---
[jira] [Commented] (FLINK-9829) The wrapper classes be compared by symbol of '==' directly in BigDecSerializer.java
[ https://issues.apache.org/jira/browse/FLINK-9829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544745#comment-16544745 ] ASF GitHub Bot commented on FLINK-9829: --- Github user lamber-ken commented on a diff in the pull request: https://github.com/apache/flink/pull/6321#discussion_r202566500 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java --- @@ -69,17 +69,17 @@ public void serialize(BigDecimal record, DataOutputView target) throws IOExcepti } // fast paths for 0, 1, 10 // only reference equality is checked because equals would be too expensive --- End diff -- right, the second section code was not exposed anywhere, and just modify the first section code now > The wrapper classes be compared by symbol of '==' directly in > BigDecSerializer.java > --- > > Key: FLINK-9829 > URL: https://issues.apache.org/jira/browse/FLINK-9829 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.5.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > The wrapper classes should be compared by equals method rather than by symbol > of '==' directly in BigDecSerializer.java -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6326: Mutual authentication for internal communication
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6326 Thanks for the review and for merging. @NicoK has an end-to-end test for SSL PR already (#6327) which would be great to rebase on top of this change. ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202556091 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -446,8 +485,10 @@ public String toString() { @Override public int numStateEntries() { int sum = 0; - for (StateTable stateTable : stateTables.values()) { - sum += stateTable.size(); + for (StateSnapshotRestore stateTable : registeredStates.values()) { + if (stateTable instanceof StateTable) { + sum += ((StateTable) stateTable).size(); + } --- End diff -- This method is only used for some tests, and to be on the safe side it probably only expected to count the keyed state and not some timers. ---
[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
[ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544693#comment-16544693 ] ASF GitHub Bot commented on FLINK-9489: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202556091 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -446,8 +485,10 @@ public String toString() { @Override public int numStateEntries() { int sum = 0; - for (StateTable stateTable : stateTables.values()) { - sum += stateTable.size(); + for (StateSnapshotRestore stateTable : registeredStates.values()) { + if (stateTable instanceof StateTable) { + sum += ((StateTable) stateTable).size(); + } --- End diff -- This method is only used for some tests, and to be on the safe side it probably only expected to count the keyed state and not some timers. > Checkpoint timers as part of managed keyed state instead of raw keyed state > --- > > Key: FLINK-9489 > URL: https://issues.apache.org/jira/browse/FLINK-9489 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > Timer state should now become part of the keyed state backend snapshot, i.e., > stored inside the managed keyed state. This means that we have to connect our > preparation for asynchronous checkpoints with the backend, so that the timers > are written as part of the state for each key-group. This means that we will > also free up the raw keyed state an might expose it to user functions in the > future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202556109 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java --- @@ -80,13 +82,57 @@ public E peek() { @Override public void bulkPoll(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { + if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) { + bulkPollRelaxedOrder(canConsume, consumer); + } else { + bulkPollStrictOrder(canConsume, consumer); + } + } + + private void bulkPollRelaxedOrder(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { + if (orderedCache.isEmpty()) { + bulkPollStore(canConsume, consumer); + } else { + while (!orderedCache.isEmpty() && canConsume.test(orderedCache.peekFirst())) { + final E next = orderedCache.removeFirst(); + orderedStore.remove(next); + consumer.accept(next); + } + + if (orderedCache.isEmpty()) { + bulkPollStore(canConsume, consumer); + } + } + } + + private void bulkPollStrictOrder(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { E element; while ((element = peek()) != null && canConsume.test(element)) { poll(); consumer.accept(element); } } + private void bulkPollStore(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { + try (CloseableIterator iterator = orderedStore.orderedIterator()) { + while (iterator.hasNext()) { + final E next = iterator.next(); + if (canConsume.test(next)) { + orderedStore.remove(next); + consumer.accept(next); + } else { + orderedCache.add(next); + while (iterator.hasNext() && !orderedCache.isFull()) { + orderedCache.add(iterator.next()); + } + break; + } + } + } catch (Exception e) { + throw new FlinkRuntimeException("Exception while bulk polling store.", e); --- End diff -- Why would you prefer it? I think there is no better way that caller can handle problems in this call than failing the job (rocksdb problems)? ---