[jira] [Commented] (FLINK-10559) Remove LegacyLocalStreamEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651143#comment-16651143 ] ASF GitHub Bot commented on FLINK-10559: TisonKun edited a comment on issue #6851: [FLINK-10559] Remove LegacyLocalStreamEnvironment URL: https://github.com/apache/flink/pull/6851#issuecomment-430105332 Travis fails on >05:09:30.233 [ERROR] Failed to execute goal com.github.siom79.japicmp:japicmp-maven-plugin:0.11.0:cmp (default) on project flink-streaming-java_2.11: Breaking the build because there is at least one incompatibility: org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment.execute(java.lang.String):METHOD_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment.LegacyLocalStreamEnvironment():CONSTRUCTOR_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment.LegacyLocalStreamEnvironment(org.apache.flink.configuration.Configuration):CONSTRUCTOR_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment:SUPERCLASS_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment:CLASS_REMOVED -> [Help 1] and I wonder if we have to tolerant the mistake and keep `LegacyLocalStreamEnvironment` until 2.x or do something to pass this check? @zentol @StephanEwen This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove LegacyLocalStreamEnvironment > --- > > Key: FLINK-10559 > URL: https://issues.apache.org/jira/browse/FLINK-10559 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > See the corresponding GitHub pull request for diagnostic, basically this > class is not in used any more. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10559) Remove LegacyLocalStreamEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651142#comment-16651142 ] ASF GitHub Bot commented on FLINK-10559: TisonKun commented on issue #6851: [FLINK-10559] Remove LegacyLocalStreamEnvironment URL: https://github.com/apache/flink/pull/6851#issuecomment-430105332 Travis fails on ``` 05:09:30.233 [ERROR] Failed to execute goal com.github.siom79.japicmp:japicmp-maven-plugin:0.11.0:cmp (default) on project flink-streaming-java_2.11: Breaking the build because there is at least one incompatibility: org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment.execute(java.lang.String):METHOD_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment.LegacyLocalStreamEnvironment():CONSTRUCTOR_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment.LegacyLocalStreamEnvironment(org.apache.flink.configuration.Configuration):CONSTRUCTOR_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment:SUPERCLASS_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment:CLASS_REMOVED -> [Help 1] ``` and I wonder if we have to tolerant the mistake and keep `LegacyLocalStreamEnvironment` until 2.x or do something to pass this check? @zentol @StephanEwen This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove LegacyLocalStreamEnvironment > --- > > Key: FLINK-10559 > URL: https://issues.apache.org/jira/browse/FLINK-10559 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > See the corresponding GitHub pull request for diagnostic, basically this > class is not in used any more. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6851: [FLINK-10559] Remove LegacyLocalStreamEnvironment
TisonKun commented on issue #6851: [FLINK-10559] Remove LegacyLocalStreamEnvironment URL: https://github.com/apache/flink/pull/6851#issuecomment-430105332 Travis fails on ``` 05:09:30.233 [ERROR] Failed to execute goal com.github.siom79.japicmp:japicmp-maven-plugin:0.11.0:cmp (default) on project flink-streaming-java_2.11: Breaking the build because there is at least one incompatibility: org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment.execute(java.lang.String):METHOD_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment.LegacyLocalStreamEnvironment():CONSTRUCTOR_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment.LegacyLocalStreamEnvironment(org.apache.flink.configuration.Configuration):CONSTRUCTOR_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment:SUPERCLASS_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment:CLASS_REMOVED -> [Help 1] ``` and I wonder if we have to tolerant the mistake and keep `LegacyLocalStreamEnvironment` until 2.x or do something to pass this check? @zentol @StephanEwen This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun edited a comment on issue #6851: [FLINK-10559] Remove LegacyLocalStreamEnvironment
TisonKun edited a comment on issue #6851: [FLINK-10559] Remove LegacyLocalStreamEnvironment URL: https://github.com/apache/flink/pull/6851#issuecomment-430105332 Travis fails on >05:09:30.233 [ERROR] Failed to execute goal com.github.siom79.japicmp:japicmp-maven-plugin:0.11.0:cmp (default) on project flink-streaming-java_2.11: Breaking the build because there is at least one incompatibility: org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment.execute(java.lang.String):METHOD_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment.LegacyLocalStreamEnvironment():CONSTRUCTOR_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment.LegacyLocalStreamEnvironment(org.apache.flink.configuration.Configuration):CONSTRUCTOR_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment:SUPERCLASS_REMOVED,org.apache.flink.streaming.api.environment.LegacyLocalStreamEnvironment:CLASS_REMOVED -> [Help 1] and I wonder if we have to tolerant the mistake and keep `LegacyLocalStreamEnvironment` until 2.x or do something to pass this check? @zentol @StephanEwen This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun opened a new pull request #6852: [hotfix] correct document
TisonKun opened a new pull request #6852: [hotfix] correct document URL: https://github.com/apache/flink/pull/6852 ## What is the purpose of the change We don't use `LocalFlinkMiniCluster`, in fact we use `MiniCluster` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**not applicable**) cc @zentol @StefanRRichter This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10559) Remove LegacyLocalStreamEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10559: --- Labels: pull-request-available (was: ) > Remove LegacyLocalStreamEnvironment > --- > > Key: FLINK-10559 > URL: https://issues.apache.org/jira/browse/FLINK-10559 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > See the corresponding GitHub pull request for diagnostic, basically this > class is not in used any more. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10559) Remove LegacyLocalStreamEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651120#comment-16651120 ] ASF GitHub Bot commented on FLINK-10559: TisonKun opened a new pull request #6851: [FLINK-10559] Remove LegacyLocalStreamEnvironment URL: https://github.com/apache/flink/pull/6851 ## What is the purpose of the change Remove `LegacyLocalStreamEnvironment` which based on legacy mode and now we have its replacement `LocalStreamEnvironment`. It looks like not just a simply removal because some previous miss or mistake. 1. `PythonEnvironmentFactory` still use `LegacyLocalStreamEnvironment`, which I believe is a missing task. 2. `LegacyLocalStreamEnvironment` is with annotation `@Public` by [this commit](https://github.com/apache/flink/commit/af5279e9bd3dec18512a54c59982e3dc1f253cd2). Of course `LocalStreamEnvironment` is `@Public` but `LegacyLocalStreamEnvironment` should never be. ## Brief change log Remove `LegacyLocalStreamEnvironment.java`. Replace `LegacyLocalStreamEnvironment` with `LocalStreamEnvironment` in `PythonEnvironmentFactory.java`, I believe it is a missing task. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `(Evolving)`: (**yes**, but I think it is because of a mistake.) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**not applicable**) cc @tillrohrmann @aljoscha This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove LegacyLocalStreamEnvironment > --- > > Key: FLINK-10559 > URL: https://issues.apache.org/jira/browse/FLINK-10559 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > See the corresponding GitHub pull request for diagnostic, basically this > class is not in used any more. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun opened a new pull request #6851: [FLINK-10559] Remove LegacyLocalStreamEnvironment
TisonKun opened a new pull request #6851: [FLINK-10559] Remove LegacyLocalStreamEnvironment URL: https://github.com/apache/flink/pull/6851 ## What is the purpose of the change Remove `LegacyLocalStreamEnvironment` which based on legacy mode and now we have its replacement `LocalStreamEnvironment`. It looks like not just a simply removal because some previous miss or mistake. 1. `PythonEnvironmentFactory` still use `LegacyLocalStreamEnvironment`, which I believe is a missing task. 2. `LegacyLocalStreamEnvironment` is with annotation `@Public` by [this commit](https://github.com/apache/flink/commit/af5279e9bd3dec18512a54c59982e3dc1f253cd2). Of course `LocalStreamEnvironment` is `@Public` but `LegacyLocalStreamEnvironment` should never be. ## Brief change log Remove `LegacyLocalStreamEnvironment.java`. Replace `LegacyLocalStreamEnvironment` with `LocalStreamEnvironment` in `PythonEnvironmentFactory.java`, I believe it is a missing task. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `(Evolving)`: (**yes**, but I think it is because of a mistake.) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**not applicable**) cc @tillrohrmann @aljoscha This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10559) Remove LegacyLocalStreamEnvironment
TisonKun created FLINK-10559: Summary: Remove LegacyLocalStreamEnvironment Key: FLINK-10559 URL: https://issues.apache.org/jira/browse/FLINK-10559 Project: Flink Issue Type: Sub-task Components: Local Runtime Affects Versions: 1.7.0 Reporter: TisonKun Assignee: TisonKun Fix For: 1.7.0 See the corresponding GitHub pull request for diagnostic, basically this class is not in used any more. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10356) Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
[ https://issues.apache.org/jira/browse/FLINK-10356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651114#comment-16651114 ] ASF GitHub Bot commented on FLINK-10356: zhijiangW commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r225396776 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ## @@ -549,21 +584,53 @@ private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, in } else { spillingChannel.close(); + spillingChannel = null; - BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024); + BufferedInputStream inStream = + new BufferedInputStream( + new FileInputStream(checkNotNull(spillFile)), + 2 * 1024 * 1024); this.spillFileReader = new DataInputViewStreamWrapper(inStream); } } } - private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) { + private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) throws IOException { + Optional deserializationError = getDeserializationError(0); + if (deserializationError.isPresent()) { + throw new IOException(deserializationError.get()); + } + deserializer.clear(); if (leftOverData != null) { deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit); } } + private Optional getDeserializationError(int addToReadBytes) { + Optional deserializationError = Optional.empty(); + int remainingSpanningBytes = 0, leftOverDataStart = 0, leftOverDataLimit = 0; + if (this.spillFileReader == null) { + remainingSpanningBytes = this.serializationReadBuffer.available() - addToReadBytes; + } else { + try { + remainingSpanningBytes = this.spillFileReader.available() - addToReadBytes; + } catch (IOException ignored) { + } + } + if (this.leftOverData != null) { Review comment: Because the following condition `remainingSpanningBytes != 0` is not always true, this condition is not always necessary. I am not sure whether it is worth putting this condition inside the below one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer > --- > > Key: FLINK-10356 > URL: https://issues.apache.org/jira/browse/FLINK-10356 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > {{SpillingAdaptiveSpanningRecordDeserializer}} doesn't have any consistency > checks for usage calls or serializers behaving properly, e.g. to read only as > many bytes as available/promised for that record. At least these checks > should be added: > # Check that buffers have not been read from yet before adding them (this is > an invariant {{SpillingAdaptiveSpanningRecordDeserializer}} works with and > from what I can see, it is followed now. > # Check that after deserialization, we actually consumed {{recordLength}} > bytes > ** If not, in the spanning deserializer, we currently simply skip the > remaining bytes. > ** But in the non-spanning deserializer, we currently continue from the > wrong offset. > # Protect
[GitHub] zhijiangW commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
zhijiangW commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r225396776 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ## @@ -549,21 +584,53 @@ private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, in } else { spillingChannel.close(); + spillingChannel = null; - BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024); + BufferedInputStream inStream = + new BufferedInputStream( + new FileInputStream(checkNotNull(spillFile)), + 2 * 1024 * 1024); this.spillFileReader = new DataInputViewStreamWrapper(inStream); } } } - private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) { + private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) throws IOException { + Optional deserializationError = getDeserializationError(0); + if (deserializationError.isPresent()) { + throw new IOException(deserializationError.get()); + } + deserializer.clear(); if (leftOverData != null) { deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit); } } + private Optional getDeserializationError(int addToReadBytes) { + Optional deserializationError = Optional.empty(); + int remainingSpanningBytes = 0, leftOverDataStart = 0, leftOverDataLimit = 0; + if (this.spillFileReader == null) { + remainingSpanningBytes = this.serializationReadBuffer.available() - addToReadBytes; + } else { + try { + remainingSpanningBytes = this.spillFileReader.available() - addToReadBytes; + } catch (IOException ignored) { + } + } + if (this.leftOverData != null) { Review comment: Because the following condition `remainingSpanningBytes != 0` is not always true, this condition is not always necessary. I am not sure whether it is worth putting this condition inside the below one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10398) Add Tanh math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1665#comment-1665 ] ASF GitHub Bot commented on FLINK-10398: yanghua commented on issue #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6736#issuecomment-430099630 @xccui Does it look good to you? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Tanh math function supported in Table API and SQL > - > > Key: FLINK-10398 > URL: https://issues.apache.org/jira/browse/FLINK-10398 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to : https://www.techonthenet.com/oracle/functions/tanh.php -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL
yanghua commented on issue #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6736#issuecomment-430099630 @xccui Does it look good to you? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10356) Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
[ https://issues.apache.org/jira/browse/FLINK-10356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651110#comment-16651110 ] ASF GitHub Bot commented on FLINK-10356: zhijiangW commented on issue #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#issuecomment-430099420 @NicoK , thanks for this improvement and I think it is actually necessary to add some checks during the deserialization, otherwise it is difficult to find hidden problems or debug. I reviewed the whole processes except the tests currently. I am only not very understanding the usages of `getDeserializationError` in one place. Maybe need your further explanation for clarification. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer > --- > > Key: FLINK-10356 > URL: https://issues.apache.org/jira/browse/FLINK-10356 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > {{SpillingAdaptiveSpanningRecordDeserializer}} doesn't have any consistency > checks for usage calls or serializers behaving properly, e.g. to read only as > many bytes as available/promised for that record. At least these checks > should be added: > # Check that buffers have not been read from yet before adding them (this is > an invariant {{SpillingAdaptiveSpanningRecordDeserializer}} works with and > from what I can see, it is followed now. > # Check that after deserialization, we actually consumed {{recordLength}} > bytes > ** If not, in the spanning deserializer, we currently simply skip the > remaining bytes. > ** But in the non-spanning deserializer, we currently continue from the > wrong offset. > # Protect against {{setNextBuffer}} being called before draining all > available records -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on issue #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
zhijiangW commented on issue #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#issuecomment-430099420 @NicoK , thanks for this improvement and I think it is actually necessary to add some checks during the deserialization, otherwise it is difficult to find hidden problems or debug. I reviewed the whole processes except the tests currently. I am only not very understanding the usages of `getDeserializationError` in one place. Maybe need your further explanation for clarification. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10356) Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
[ https://issues.apache.org/jira/browse/FLINK-10356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651108#comment-16651108 ] ASF GitHub Bot commented on FLINK-10356: zhijiangW commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r225395730 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ## @@ -137,7 +162,16 @@ else if (remaining == 0) { // spanning record case if (this.spanningWrapper.hasFullRecord()) { // get the full record - target.read(this.spanningWrapper.getInputView()); + try { + target.read(this.spanningWrapper.getInputView()); + } catch (EOFException e) { + Optional deserializationError = this.spanningWrapper.getDeserializationError(1); Review comment: I do not quite understand why we set `addToReadBytes` as 1 here. If the `target.read` is successful, then we do `spanningWrapper.getDeserializationError(0)` in the following `moveRemainderToNonSpanningDeserializer` and it makes sense, otherwise we do `spanningWrapper.getDeserializationError(1)`. The `spanningWrapper.getDeserializationError(0)` may be also suitable for the exception case? Because we only want to show some internal informations during exceptions for debugging. Then we just need one check. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer > --- > > Key: FLINK-10356 > URL: https://issues.apache.org/jira/browse/FLINK-10356 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > {{SpillingAdaptiveSpanningRecordDeserializer}} doesn't have any consistency > checks for usage calls or serializers behaving properly, e.g. to read only as > many bytes as available/promised for that record. At least these checks > should be added: > # Check that buffers have not been read from yet before adding them (this is > an invariant {{SpillingAdaptiveSpanningRecordDeserializer}} works with and > from what I can see, it is followed now. > # Check that after deserialization, we actually consumed {{recordLength}} > bytes > ** If not, in the spanning deserializer, we currently simply skip the > remaining bytes. > ** But in the non-spanning deserializer, we currently continue from the > wrong offset. > # Protect against {{setNextBuffer}} being called before draining all > available records -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10356) Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
[ https://issues.apache.org/jira/browse/FLINK-10356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651107#comment-16651107 ] ASF GitHub Bot commented on FLINK-10356: zhijiangW commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r225389817 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ## @@ -549,21 +584,53 @@ private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, in } else { spillingChannel.close(); + spillingChannel = null; - BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024); + BufferedInputStream inStream = + new BufferedInputStream( + new FileInputStream(checkNotNull(spillFile)), + 2 * 1024 * 1024); this.spillFileReader = new DataInputViewStreamWrapper(inStream); } } } - private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) { + private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) throws IOException { + Optional deserializationError = getDeserializationError(0); + if (deserializationError.isPresent()) { + throw new IOException(deserializationError.get()); + } + deserializer.clear(); if (leftOverData != null) { deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit); } } + private Optional getDeserializationError(int addToReadBytes) { Review comment: I think it is better to give a comment for this method for easily understanding, especially for the meaning of the `addToReadBytes`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add sanity checks to SpillingAdaptiveSpanningRecordDeserializer > --- > > Key: FLINK-10356 > URL: https://issues.apache.org/jira/browse/FLINK-10356 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > {{SpillingAdaptiveSpanningRecordDeserializer}} doesn't have any consistency > checks for usage calls or serializers behaving properly, e.g. to read only as > many bytes as available/promised for that record. At least these checks > should be added: > # Check that buffers have not been read from yet before adding them (this is > an invariant {{SpillingAdaptiveSpanningRecordDeserializer}} works with and > from what I can see, it is followed now. > # Check that after deserialization, we actually consumed {{recordLength}} > bytes > ** If not, in the spanning deserializer, we currently simply skip the > remaining bytes. > ** But in the non-spanning deserializer, we currently continue from the > wrong offset. > # Protect against {{setNextBuffer}} being called before draining all > available records -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
zhijiangW commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r225395730 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ## @@ -137,7 +162,16 @@ else if (remaining == 0) { // spanning record case if (this.spanningWrapper.hasFullRecord()) { // get the full record - target.read(this.spanningWrapper.getInputView()); + try { + target.read(this.spanningWrapper.getInputView()); + } catch (EOFException e) { + Optional deserializationError = this.spanningWrapper.getDeserializationError(1); Review comment: I do not quite understand why we set `addToReadBytes` as 1 here. If the `target.read` is successful, then we do `spanningWrapper.getDeserializationError(0)` in the following `moveRemainderToNonSpanningDeserializer` and it makes sense, otherwise we do `spanningWrapper.getDeserializationError(1)`. The `spanningWrapper.getDeserializationError(0)` may be also suitable for the exception case? Because we only want to show some internal informations during exceptions for debugging. Then we just need one check. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer
zhijiangW commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer URL: https://github.com/apache/flink/pull/6705#discussion_r225389817 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ## @@ -549,21 +584,53 @@ private void addNextChunkFromMemorySegment(MemorySegment segment, int offset, in } else { spillingChannel.close(); + spillingChannel = null; - BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024); + BufferedInputStream inStream = + new BufferedInputStream( + new FileInputStream(checkNotNull(spillFile)), + 2 * 1024 * 1024); this.spillFileReader = new DataInputViewStreamWrapper(inStream); } } } - private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) { + private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) throws IOException { + Optional deserializationError = getDeserializationError(0); + if (deserializationError.isPresent()) { + throw new IOException(deserializationError.get()); + } + deserializer.clear(); if (leftOverData != null) { deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit); } } + private Optional getDeserializationError(int addToReadBytes) { Review comment: I think it is better to give a comment for this method for easily understanding, especially for the meaning of the `addToReadBytes`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10246) Harden and separate MetricQueryService
[ https://issues.apache.org/jira/browse/FLINK-10246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651085#comment-16651085 ] vinoyang commented on FLINK-10246: -- [~till.rohrmann] Now the remaining two tasks of this task already have PRs, I hope we can work hard to merge it into 1.7. > Harden and separate MetricQueryService > -- > > Key: FLINK-10246 > URL: https://issues.apache.org/jira/browse/FLINK-10246 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Critical > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > This is an umbrella issue to track the effort to harden Flink's > {{MetricQueryService}} and to separate it from the rest of the system. > The idea is to setup the {{MetricQueryService}} and the metric system in > general in such a way that it cannot interfere with or even bring the main > Flink components down. Moreover, the metric system also should not degrade > performance by simply using any free CPU cycles but not more. Ideally, the > user does not see a difference between running Flink with metric query > service turned on or off. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651083#comment-16651083 ] ASF GitHub Bot commented on FLINK-10252: yanghua opened a new pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850 ## What is the purpose of the change *This pull request handles oversized metric messges* ## Brief change log - *Handle oversized metric messges* ## Verifying this change This change is already covered by existing tests, such as *MetricQueryServiceTest#testHandleOversizedMetricMessage*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua opened a new pull request #6850: [FLINK-10252] Handle oversized metric messges
yanghua opened a new pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850 ## What is the purpose of the change *This pull request handles oversized metric messges* ## Brief change log - *Handle oversized metric messges* ## Verifying this change This change is already covered by existing tests, such as *MetricQueryServiceTest#testHandleOversizedMetricMessage*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10252: --- Labels: pull-request-available (was: ) > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information
[ https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651052#comment-16651052 ] ASF GitHub Bot commented on FLINK-4816: --- tony810430 closed pull request #4828: [FLINK-4816] [checkpoints] Executions failed from "DEPLOYING" should retain restored checkpoint information URL: https://github.com/apache/flink/pull/4828 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 9a4456ef7d7..ae74ce3b5a5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -181,6 +181,8 @@ /** Registry that tracks state which is shared across (incremental) checkpoints */ private SharedStateRegistry sharedStateRegistry; + private volatile long restoredCheckpointID = -1; + // public CheckpointCoordinator( @@ -1109,7 +,8 @@ public boolean restoreLatestCheckpointedState( statsTracker.reportRestoredCheckpoint(restored); } - + // set it inside lock + restoredCheckpointID = latest.getCheckpointID(); return true; } } @@ -1152,6 +1155,12 @@ public boolean restoreSavepoint( return restoreLatestCheckpointedState(tasks, true, allowNonRestored); } + public long getRestoredCheckpointID() { + synchronized (lock) { + return this.restoredCheckpointID; + } + } + // // Accessors // diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/DeployTaskException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/DeployTaskException.java new file mode 100644 index 000..278a2ec526a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/DeployTaskException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.execution; + +/** + * Thrown to indicate that a task failed while in the {@link ExecutionState#DEPLOYING} + * and there was no checkpoint restoration + */ +public class DeployTaskException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public DeployTaskException(Throwable cause) { + super(cause); + } + + public DeployTaskException(String msg) { + super(msg); + } + + public DeployTaskException() { + super(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RestoreTaskException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RestoreTaskException.java new file mode 100644 index 000..f7118364885 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RestoreTaskException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *
[jira] [Updated] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information
[ https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-4816: -- Labels: pull-request-available (was: ) > Executions failed from "DEPLOYING" should retain restored checkpoint > information > > > Key: FLINK-4816 > URL: https://issues.apache.org/jira/browse/FLINK-4816 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen >Priority: Major > Labels: pull-request-available > > When an execution fails from state {{DEPLOYING}}, it should wrap the failure > to better report the failure cause: > - If no checkpoint was restored, it should wrap the exception in a > {{DeployTaskException}} > - If a checkpoint was restored, it should wrap the exception in a > {{RestoreTaskException}} and record the id of the checkpoint that was > attempted to be restored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-4574) Strengthen fetch interval implementation in Kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei-Che Wei reassigned FLINK-4574: -- Assignee: (was: Wei-Che Wei) > Strengthen fetch interval implementation in Kinesis consumer > > > Key: FLINK-4574 > URL: https://issues.apache.org/jira/browse/FLINK-4574 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > Labels: pull-request-available > > As pointed out by [~rmetzger], right now the fetch interval implementation in > the {{ShardConsumer}} class of the Kinesis consumer can lead to much longer > interval times than specified by the user, ex. say the specified fetch > interval is {{f}}, it takes {{x}} to complete a {{getRecords()}} call, and > {{y}} to complete processing the fetched records for emitting, than the > actual interval between each fetch is actually {{f+x+y}}. > The main problem with this is that we can never guarantee how much time has > past since the last {{getRecords}} call, thus can not guarantee that returned > shard iterators will not have expired the next time we use them, even if we > limit the user-given value for {{f}} to not be longer than the iterator > expire time. > I propose to improve this by, per {{ShardConsumer}}, use a > {{ScheduledExecutorService}} / {{Timer}} to do the fixed-interval fetching, > and a separate blocking queue that collects the fetched records for emitting. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tony810430 closed pull request #4828: [FLINK-4816] [checkpoints] Executions failed from "DEPLOYING" should retain restored checkpoint information
tony810430 closed pull request #4828: [FLINK-4816] [checkpoints] Executions failed from "DEPLOYING" should retain restored checkpoint information URL: https://github.com/apache/flink/pull/4828 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 9a4456ef7d7..ae74ce3b5a5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -181,6 +181,8 @@ /** Registry that tracks state which is shared across (incremental) checkpoints */ private SharedStateRegistry sharedStateRegistry; + private volatile long restoredCheckpointID = -1; + // public CheckpointCoordinator( @@ -1109,7 +,8 @@ public boolean restoreLatestCheckpointedState( statsTracker.reportRestoredCheckpoint(restored); } - + // set it inside lock + restoredCheckpointID = latest.getCheckpointID(); return true; } } @@ -1152,6 +1155,12 @@ public boolean restoreSavepoint( return restoreLatestCheckpointedState(tasks, true, allowNonRestored); } + public long getRestoredCheckpointID() { + synchronized (lock) { + return this.restoredCheckpointID; + } + } + // // Accessors // diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/DeployTaskException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/DeployTaskException.java new file mode 100644 index 000..278a2ec526a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/DeployTaskException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.execution; + +/** + * Thrown to indicate that a task failed while in the {@link ExecutionState#DEPLOYING} + * and there was no checkpoint restoration + */ +public class DeployTaskException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public DeployTaskException(Throwable cause) { + super(cause); + } + + public DeployTaskException(String msg) { + super(msg); + } + + public DeployTaskException() { + super(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RestoreTaskException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RestoreTaskException.java new file mode 100644 index 000..f7118364885 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RestoreTaskException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
[jira] [Assigned] (FLINK-4816) Executions failed from "DEPLOYING" should retain restored checkpoint information
[ https://issues.apache.org/jira/browse/FLINK-4816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei-Che Wei reassigned FLINK-4816: -- Assignee: (was: Wei-Che Wei) > Executions failed from "DEPLOYING" should retain restored checkpoint > information > > > Key: FLINK-4816 > URL: https://issues.apache.org/jira/browse/FLINK-4816 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Stephan Ewen >Priority: Major > > When an execution fails from state {{DEPLOYING}}, it should wrap the failure > to better report the failure cause: > - If no checkpoint was restored, it should wrap the exception in a > {{DeployTaskException}} > - If a checkpoint was restored, it should wrap the exception in a > {{RestoreTaskException}} and record the id of the checkpoint that was > attempted to be restored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4574) Strengthen fetch interval implementation in Kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651050#comment-16651050 ] ASF GitHub Bot commented on FLINK-4574: --- tony810430 closed pull request #2925: [FLINK-4574] [kinesis] Strengthen fetch interval implementation in Kinesis consumer URL: https://github.com/apache/flink/pull/2925 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 612a4a7b273..2da0c912771 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -38,6 +38,10 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Properties; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -64,6 +68,9 @@ private SequenceNumber lastSequenceNum; + /** Reference to the first error thrown by the {@link ShardConsumerFetcher} threads */ + private final AtomicReference error; + /** * Creates a shard consumer. * @@ -81,7 +88,7 @@ public ShardConsumer(KinesisDataFetcher fetcherRef, subscribedShard, lastSequenceNum, KinesisProxy.create(fetcherRef.getConsumerConfiguration())); - } + } /** This constructor is exposed for testing purposes */ protected ShardConsumer(KinesisDataFetcher fetcherRef, @@ -107,27 +114,30 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty( ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS))); + + this.error = new AtomicReference(); } @SuppressWarnings("unchecked") @Override public void run() { - String nextShardItr; + String startShardItr; + Timer shardConsumerFetcherScheduler = new Timer(); try { - // before infinitely looping, we set the initial nextShardItr appropriately + // before infinitely looping, we set the initial startShardItr appropriately if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) { // if the shard is already closed, there will be no latest next record to get for this shard if (subscribedShard.isClosed()) { - nextShardItr = null; + startShardItr = null; } else { - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); + startShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); } } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) { - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null); + startShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null); } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { - nextShardItr = null; + startShardItr = null; } else { // we will be starting from an actual sequence number (due to restore from failure). // if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records @@ -154,42 +164,115 @@ public void run() {
[jira] [Updated] (FLINK-4574) Strengthen fetch interval implementation in Kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-4574: -- Labels: pull-request-available (was: ) > Strengthen fetch interval implementation in Kinesis consumer > > > Key: FLINK-4574 > URL: https://issues.apache.org/jira/browse/FLINK-4574 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei >Priority: Major > Labels: pull-request-available > > As pointed out by [~rmetzger], right now the fetch interval implementation in > the {{ShardConsumer}} class of the Kinesis consumer can lead to much longer > interval times than specified by the user, ex. say the specified fetch > interval is {{f}}, it takes {{x}} to complete a {{getRecords()}} call, and > {{y}} to complete processing the fetched records for emitting, than the > actual interval between each fetch is actually {{f+x+y}}. > The main problem with this is that we can never guarantee how much time has > past since the last {{getRecords}} call, thus can not guarantee that returned > shard iterators will not have expired the next time we use them, even if we > limit the user-given value for {{f}} to not be longer than the iterator > expire time. > I propose to improve this by, per {{ShardConsumer}}, use a > {{ScheduledExecutorService}} / {{Timer}} to do the fixed-interval fetching, > and a separate blocking queue that collects the fetched records for emitting. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tony810430 closed pull request #2925: [FLINK-4574] [kinesis] Strengthen fetch interval implementation in Kinesis consumer
tony810430 closed pull request #2925: [FLINK-4574] [kinesis] Strengthen fetch interval implementation in Kinesis consumer URL: https://github.com/apache/flink/pull/2925 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 612a4a7b273..2da0c912771 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -38,6 +38,10 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Properties; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -64,6 +68,9 @@ private SequenceNumber lastSequenceNum; + /** Reference to the first error thrown by the {@link ShardConsumerFetcher} threads */ + private final AtomicReference error; + /** * Creates a shard consumer. * @@ -81,7 +88,7 @@ public ShardConsumer(KinesisDataFetcher fetcherRef, subscribedShard, lastSequenceNum, KinesisProxy.create(fetcherRef.getConsumerConfiguration())); - } + } /** This constructor is exposed for testing purposes */ protected ShardConsumer(KinesisDataFetcher fetcherRef, @@ -107,27 +114,30 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty( ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS))); + + this.error = new AtomicReference(); } @SuppressWarnings("unchecked") @Override public void run() { - String nextShardItr; + String startShardItr; + Timer shardConsumerFetcherScheduler = new Timer(); try { - // before infinitely looping, we set the initial nextShardItr appropriately + // before infinitely looping, we set the initial startShardItr appropriately if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) { // if the shard is already closed, there will be no latest next record to get for this shard if (subscribedShard.isClosed()) { - nextShardItr = null; + startShardItr = null; } else { - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); + startShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); } } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) { - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null); + startShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null); } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { - nextShardItr = null; + startShardItr = null; } else { // we will be starting from an actual sequence number (due to restore from failure). // if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records @@ -154,42 +164,115 @@ public void run() { } } - // set the nextShardItr so we can continue iterating in the next while loop -
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651048#comment-16651048 ] ASF GitHub Bot commented on FLINK-9970: --- yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#issuecomment-430076195 @xccui Some suggestion I have accepted, some of them I gave a new comment. Any problem, please let me know. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#issuecomment-430076195 @xccui Some suggestion I have accepted, some of them I gave a new comment. Any problem, please let me know. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10253) Run MetricQueryService with lower priority
[ https://issues.apache.org/jira/browse/FLINK-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651046#comment-16651046 ] ASF GitHub Bot commented on FLINK-10253: yanghua commented on issue #6839: [FLINK-10253] Run MetricQueryService with lower priority URL: https://github.com/apache/flink/pull/6839#issuecomment-430075883 @tillrohrmann Referring to your suggestion, I have updated this PR, please review again. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Run MetricQueryService with lower priority > -- > > Key: FLINK-10253 > URL: https://issues.apache.org/jira/browse/FLINK-10253 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > We should run the {{MetricQueryService}} with a lower priority than the main > Flink components. An idea would be to start the underlying threads with a > lower priority. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6839: [FLINK-10253] Run MetricQueryService with lower priority
yanghua commented on issue #6839: [FLINK-10253] Run MetricQueryService with lower priority URL: https://github.com/apache/flink/pull/6839#issuecomment-430075883 @tillrohrmann Referring to your suggestion, I have updated this PR, please review again. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10474) Don't translate IN with Literals to JOIN with VALUES for streaming queries
[ https://issues.apache.org/jira/browse/FLINK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651037#comment-16651037 ] ASF GitHub Bot commented on FLINK-10474: hequn8128 commented on issue #6792: [FLINK-10474][table] Don't translate IN/NOT_IN to JOIN with VALUES URL: https://github.com/apache/flink/pull/6792#issuecomment-430073199 Thanks a lot for your review! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Don't translate IN with Literals to JOIN with VALUES for streaming queries > -- > > Key: FLINK-10474 > URL: https://issues.apache.org/jira/browse/FLINK-10474 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.1, 1.7.0 >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > IN predicates with literals are translated to JOIN with VALUES if the number > of elements in the IN clause exceeds a certain threshold. This should not be > done, because a streaming join is very heavy and materializes both inputs > (which is fine for the VALUES) input but not for the other. > There are two ways to solve this: > # don't translate IN to a JOIN at all > # translate it to a JOIN but have a special join strategy if one input is > bound and final (non-updating) > Option 1. should be easy to do, option 2. requires much more effort. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on issue #6792: [FLINK-10474][table] Don't translate IN/NOT_IN to JOIN with VALUES
hequn8128 commented on issue #6792: [FLINK-10474][table] Don't translate IN/NOT_IN to JOIN with VALUES URL: https://github.com/apache/flink/pull/6792#issuecomment-430073199 Thanks a lot for your review! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651034#comment-16651034 ] ASF GitHub Bot commented on FLINK-9970: --- yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225370746 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala ## @@ -65,6 +65,9 @@ class ScalarTypesTestBase extends ExpressionTestBase { testData.setField(33, null) testData.setField(34, 256) testData.setField(35, "aGVsbG8gd29ybGQ=") +testData.setField(36, 65) +testData.setField(37, 97) Review comment: agree. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225370746 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala ## @@ -65,6 +65,9 @@ class ScalarTypesTestBase extends ExpressionTestBase { testData.setField(33, null) testData.setField(34, 256) testData.setField(35, "aGVsbG8gd29ybGQ=") +testData.setField(36, 65) +testData.setField(37, 97) Review comment: agree. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651031#comment-16651031 ] ASF GitHub Bot commented on FLINK-9970: --- yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225370366 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -889,6 +916,69 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testChr(): Unit = { +testAllApis( + 'f14.chr(), + "f14.chr()", + "CHR(f14)", + "null") + +testAllApis( + 'f34.chr(), + "f34.chr()", + "CHR(f34)", + "null") + +testAllApis( + 'f34.chr(), + "f34.chr()", + "CHR(f34)", + "null") + +testAllApis( + 'f36.chr(), + "f36.chr()", + "CHR(f36)", + "A") + +testAllApis( + 'f37.chr(), + "f37.chr()", + "CHR(f37)", + "a") + +testAllApis( + 'f38.chr(), + "f38.chr()", + "CHR(f38)", + "ÿ") + +testAllApis( Review comment: There are three test cases that seem to be duplicates, but their SQL functions pass different input types for verification purposes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225370366 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -889,6 +916,69 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testChr(): Unit = { +testAllApis( + 'f14.chr(), + "f14.chr()", + "CHR(f14)", + "null") + +testAllApis( + 'f34.chr(), + "f34.chr()", + "CHR(f34)", + "null") + +testAllApis( + 'f34.chr(), + "f34.chr()", + "CHR(f34)", + "null") + +testAllApis( + 'f36.chr(), + "f36.chr()", + "CHR(f36)", + "A") + +testAllApis( + 'f37.chr(), + "f37.chr()", + "CHR(f37)", + "a") + +testAllApis( + 'f38.chr(), + "f38.chr()", + "CHR(f38)", + "ÿ") + +testAllApis( Review comment: There are three test cases that seem to be duplicates, but their SQL functions pass different input types for verification purposes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651022#comment-16651022 ] ASF GitHub Bot commented on FLINK-9970: --- yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225367314 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 +} else { + str.charAt(0).toByte.toInt Review comment: I think it's OK. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225367314 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 +} else { + str.charAt(0).toByte.toInt Review comment: I think it's OK. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651021#comment-16651021 ] ASF GitHub Bot commented on FLINK-9970: --- yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225367094 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 Review comment: hi @xccui , [ASCII code '0' means NULL](http://ee.hawaii.edu/~tep/EE160/Book/chap4/subsection2.1.1.1.html): ``` The last example in Table 4.3, '0', is called the NULL character, whose ASCII value is zero. Once again, this is NOT the same character as the printable digit character, '0', whose ASCII value is 48. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225367094 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 Review comment: hi @xccui , [ASCII code '0' means NULL](http://ee.hawaii.edu/~tep/EE160/Book/chap4/subsection2.1.1.1.html): ``` The last example in Table 4.3, '0', is called the NULL character, whose ASCII value is zero. Once again, this is NOT the same character as the printable digit character, '0', whose ASCII value is 48. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
[ https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651019#comment-16651019 ] ASF GitHub Bot commented on FLINK-10527: yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#issuecomment-430066720 Hi @tillrohrmann and @TisonKun what about this change? In addition, I create a single JIRA issue [FLINK-10558](https://issues.apache.org/jira/browse/FLINK-10558) to track the porting. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup constant isNewMode in YarnTestBase > -- > > Key: FLINK-10527 > URL: https://issues.apache.org/jira/browse/FLINK-10527 > Project: Flink > Issue Type: Sub-task > Components: YARN >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > This seems to be a residual problem with FLINK-10396. It is set to true in > that PR. Currently it has three usage scenarios: > 1. assert, caused an error > {code:java} > assumeTrue("The new mode does not start TMs upfront.", !isNewMode); > {code} > 2. if (!isNewMode) the logic in the block would not have invoked, the if > block can be removed > 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase
yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#issuecomment-430066720 Hi @tillrohrmann and @TisonKun what about this change? In addition, I create a single JIRA issue [FLINK-10558](https://issues.apache.org/jira/browse/FLINK-10558) to track the porting. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10558) Port YARNHighAvailabilityITCase and YARNSessionCapacitySchedulerITCase to new code base
vinoyang created FLINK-10558: Summary: Port YARNHighAvailabilityITCase and YARNSessionCapacitySchedulerITCase to new code base Key: FLINK-10558 URL: https://issues.apache.org/jira/browse/FLINK-10558 Project: Flink Issue Type: Sub-task Reporter: vinoyang {{YARNHighAvailabilityITCase}}, {{YARNSessionCapacitySchedulerITCase#testClientStartup,}} {{YARNSessionCapacitySchedulerITCase#testTaskManagerFailure}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650851#comment-16650851 ] ASF GitHub Bot commented on FLINK-9970: --- xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225311291 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -285,4 +296,15 @@ object ScalarFunctions { */ def repeat(base: String, n: Int): String = StringUtils.repeat(base, n) + /** +* Returns a character corresponding to the input integer ASCII code. Review comment: The Java doc says the method takes an integer ASCII code, but the actual parameter type is `Long`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650847#comment-16650847 ] ASF GitHub Bot commented on FLINK-9970: --- xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225322607 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -889,6 +916,69 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testChr(): Unit = { +testAllApis( + 'f14.chr(), + "f14.chr()", + "CHR(f14)", + "null") + +testAllApis( + 'f34.chr(), + "f34.chr()", + "CHR(f34)", + "null") + +testAllApis( Review comment: Duplicated test case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650844#comment-16650844 ] ASF GitHub Bot commented on FLINK-9970: --- xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225324041 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala ## @@ -65,6 +65,9 @@ class ScalarTypesTestBase extends ExpressionTestBase { testData.setField(33, null) testData.setField(34, 256) testData.setField(35, "aGVsbG8gd29ybGQ=") +testData.setField(36, 65) +testData.setField(37, 97) Review comment: `f36` and `f37`, I think we can only keep one of them. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225311291 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -285,4 +296,15 @@ object ScalarFunctions { */ def repeat(base: String, n: Int): String = StringUtils.repeat(base, n) + /** +* Returns a character corresponding to the input integer ASCII code. Review comment: The Java doc says the method takes an integer ASCII code, but the actual parameter type is `Long`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650843#comment-16650843 ] ASF GitHub Bot commented on FLINK-9970: --- xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225322741 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -889,6 +916,69 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testChr(): Unit = { +testAllApis( + 'f14.chr(), + "f14.chr()", + "CHR(f14)", + "null") + +testAllApis( + 'f34.chr(), + "f34.chr()", + "CHR(f34)", + "null") + +testAllApis( + 'f34.chr(), + "f34.chr()", + "CHR(f34)", + "null") + +testAllApis( + 'f36.chr(), + "f36.chr()", + "CHR(f36)", + "A") + +testAllApis( + 'f37.chr(), + "f37.chr()", + "CHR(f37)", + "a") + +testAllApis( + 'f38.chr(), + "f38.chr()", + "CHR(f38)", + "ÿ") + +testAllApis( Review comment: Duplicated This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225324041 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala ## @@ -65,6 +65,9 @@ class ScalarTypesTestBase extends ExpressionTestBase { testData.setField(33, null) testData.setField(34, 256) testData.setField(35, "aGVsbG8gd29ybGQ=") +testData.setField(36, 65) +testData.setField(37, 97) Review comment: `f36` and `f37`, I think we can only keep one of them. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650848#comment-16650848 ] ASF GitHub Bot commented on FLINK-9970: --- xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225320391 ## File path: docs/dev/table/functions.md ## @@ -2449,6 +2449,30 @@ REPEAT(string, integer) + + +{% highlight text %} +ASCII(string) +{% endhighlight %} + + +Returns a numeric value of the leftmost character of the string. Review comment: Returns the ASCII code value of... (please also update the Javadoc correspondingly) As we've reached an agreement on adding the return type, it's better to mention it here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650842#comment-16650842 ] ASF GitHub Bot commented on FLINK-9970: --- xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225319289 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -285,4 +296,15 @@ object ScalarFunctions { */ def repeat(base: String, n: Int): String = StringUtils.repeat(base, n) + /** +* Returns a character corresponding to the input integer ASCII code. +*/ + def chr(ascii: Long): String = { +if (ascii == null || ascii < 0 || ascii > 255) { Review comment: As far as I know, when the input exceeding the range (1 - 255), the behavior of the `chr()` function differs in some databases. It's fine to return a `null` value and maybe we should mention that in the documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650846#comment-16650846 ] ASF GitHub Bot commented on FLINK-9970: --- xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225307607 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 +} else { + str.charAt(0).toByte.toInt Review comment: This may work for ASCII characters, but I'm not sure if it will return "expected results" for non-ASCII characters. Or it doesn't matter? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225311291 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -285,4 +296,15 @@ object ScalarFunctions { */ def repeat(base: String, n: Int): String = StringUtils.repeat(base, n) + /** +* Returns a character corresponding to the input integer ASCII code. Review comment: The Java doc says the method takes an integer ASCII code, but the actual parameter type is `Long`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225302431 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 Review comment: Should return `null` when `str == null`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650841#comment-16650841 ] ASF GitHub Bot commented on FLINK-9970: --- xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225311291 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -285,4 +296,15 @@ object ScalarFunctions { */ def repeat(base: String, n: Int): String = StringUtils.repeat(base, n) + /** +* Returns a character corresponding to the input integer ASCII code. Review comment: The Java doc says the method takes an integer ASCII code, but the actual parameter type is `Long`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650845#comment-16650845 ] ASF GitHub Bot commented on FLINK-9970: --- xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225302431 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 Review comment: Should return `null` when `str == null`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225319289 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -285,4 +296,15 @@ object ScalarFunctions { */ def repeat(base: String, n: Int): String = StringUtils.repeat(base, n) + /** +* Returns a character corresponding to the input integer ASCII code. +*/ + def chr(ascii: Long): String = { +if (ascii == null || ascii < 0 || ascii > 255) { Review comment: As far as I know, when the input exceeding the range (1 - 255), the behavior of the `chr()` function differs in some databases. It's fine to return a `null` value and maybe we should mention that in the documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225322607 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -889,6 +916,69 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testChr(): Unit = { +testAllApis( + 'f14.chr(), + "f14.chr()", + "CHR(f14)", + "null") + +testAllApis( + 'f34.chr(), + "f34.chr()", + "CHR(f34)", + "null") + +testAllApis( Review comment: Duplicated test case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225322741 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -889,6 +916,69 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testChr(): Unit = { +testAllApis( + 'f14.chr(), + "f14.chr()", + "CHR(f14)", + "null") + +testAllApis( + 'f34.chr(), + "f34.chr()", + "CHR(f34)", + "null") + +testAllApis( + 'f34.chr(), + "f34.chr()", + "CHR(f34)", + "null") + +testAllApis( + 'f36.chr(), + "f36.chr()", + "CHR(f36)", + "A") + +testAllApis( + 'f37.chr(), + "f37.chr()", + "CHR(f37)", + "a") + +testAllApis( + 'f38.chr(), + "f38.chr()", + "CHR(f38)", + "ÿ") + +testAllApis( Review comment: Duplicated This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225307607 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -252,6 +252,17 @@ object ScalarFunctions { regexp_extract(str, regex, 0) } + /** +* Returns a numeric value of the leftmost character of the string str. +*/ + def ascii(str: String): Integer = { +if (str == null || str.equals("")) { + 0 +} else { + str.charAt(0).toByte.toInt Review comment: This may work for ASCII characters, but I'm not sure if it will return "expected results" for non-ASCII characters. Or it doesn't matter? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#discussion_r225320391 ## File path: docs/dev/table/functions.md ## @@ -2449,6 +2449,30 @@ REPEAT(string, integer) + + +{% highlight text %} +ASCII(string) +{% endhighlight %} + + +Returns a numeric value of the leftmost character of the string. Review comment: Returns the ASCII code value of... (please also update the Javadoc correspondingly) As we've reached an agreement on adding the return type, it's better to mention it here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10557) Checkpoint size metric incorrectly reports the same value until restart
Ruby Andrews created FLINK-10557: Summary: Checkpoint size metric incorrectly reports the same value until restart Key: FLINK-10557 URL: https://issues.apache.org/jira/browse/FLINK-10557 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.4.0 Reporter: Ruby Andrews We have seen the following several times, but have not found the root cause. The checkpoint size metric will sometimes report the same value over and over, even though the checkpoint size is changing. The last time we saw this, it happened for 4 days, until we re-started the Flink cluster. In that time period, the application flushes all data each day so we would expect to see the checkpoint size grow until UTC midnights, then go to about 0 and begin growing again. It appears that the metrics continue to be gathered, because we see them in our data repository where we are reporting them. However, the size does not change. Is there more information we can gather to root cause this if it happens again? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10553) Unified sink and source table name in SQL statement
[ https://issues.apache.org/jira/browse/FLINK-10553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650824#comment-16650824 ] ASF GitHub Bot commented on FLINK-10553: xuefuz commented on issue #6847: [FLINK-10553] [sql] Unified sink and source table name in SQL statement URL: https://github.com/apache/flink/pull/6847#issuecomment-430020997 Hi Jerry, Thanks for fixing this. Your changes seems good. However, I have a side question: do we expect that the external catalog understands the notation of "catalog.db.table", that's is, three level hierarchy, or it just take the table name literally. From InMemoryExternalCatalog, it seems to me the catalog doesn't seem doing anything interpretation on the given table name. I'm not sure if I missed anything. (Nothing wrong about your changes, but just about my understanding.) Thanks, Xuefu This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Unified sink and source table name in SQL statement > --- > > Key: FLINK-10553 > URL: https://issues.apache.org/jira/browse/FLINK-10553 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.6.0, 1.7.0 >Reporter: wangsan >Assignee: wangsan >Priority: Major > Labels: pull-request-available > > Since sink table can now be registered using ExternalCatalog, just the same > as source table, the source and sink name in SQL statement should also be > treated equally. Now we can only use `catalog.database.table` for sink table > (enclosed in back-ticks as a identifier), this is not consistent with source > table name (do not treat the whole name as a identifier). > *INSERT INTO catalog.database.sinktable SELECT ... FROM > catalog.database.sourcetable* should be supported . -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xuefuz commented on issue #6847: [FLINK-10553] [sql] Unified sink and source table name in SQL statement
xuefuz commented on issue #6847: [FLINK-10553] [sql] Unified sink and source table name in SQL statement URL: https://github.com/apache/flink/pull/6847#issuecomment-430020997 Hi Jerry, Thanks for fixing this. Your changes seems good. However, I have a side question: do we expect that the external catalog understands the notation of "catalog.db.table", that's is, three level hierarchy, or it just take the table name literally. From InMemoryExternalCatalog, it seems to me the catalog doesn't seem doing anything interpretation on the given table name. I'm not sure if I missed anything. (Nothing wrong about your changes, but just about my understanding.) Thanks, Xuefu This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10556) Integration with Apache Hive
[ https://issues.apache.org/jira/browse/FLINK-10556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-10556: -- Description: This is an umbrella JIRA tracking all enhancement and issues related to integrating Flink with Hive ecosystem. This is an outcome of a discussion in the community, and thanks go to everyone that provided feedback and interest. Specifically, we'd like to see the following features and capabilities immediately in Flink: # Metadata interoperability # Data interoperability # Data type compatibility # Hive UDF support # DDL/DML/Query language compatibility For a longer term, we'd also like to add or improve: # Compatible SQL service, client tools, JDBC/ODBC drivers # Better task failure tolerance and task scheduling # Support other user customizations in Hive (storage handlers, serdes, etc). I will provide more details regarding the proposal in a doc shortly. Design doc, if deemed necessary, will be provided in each related sub tasks under this JIRA. Feedback and contributions are greatly welcome! was: This is an umbrella JIRA tracking all enhancement and issues related to integrating Flink with Hive ecosystem. This is an outcome of a discussion in the community, and thanks go to everyone that provided feedback and interest. Specifically, we'd like to see the following features and capabilities immediately in Flink: # Detadata interoperability # Data interoperability # Data type compatibility # Hive UDF support # DDL/DML/Query language compatibility For a longer term, we'd also like to add or improve: # Compatible SQL service, client tools, JDBC/ODBC drivers # Better task failure tolerance and task scheduling # Support other user customizations in Hive (storage handlers, serdes, etc). I will provide more details regarding the proposal in a doc shortly. Design doc, if deemed necessary, will be provided in each related sub tasks under this JIRA. Feedback and contributions are greatly welcome! > Integration with Apache Hive > > > Key: FLINK-10556 > URL: https://issues.apache.org/jira/browse/FLINK-10556 > Project: Flink > Issue Type: New Feature > Components: Batch Connectors and Input/Output Formats, SQL Client, > Table API SQL >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > > This is an umbrella JIRA tracking all enhancement and issues related to > integrating Flink with Hive ecosystem. This is an outcome of a discussion in > the community, and thanks go to everyone that provided feedback and interest. > Specifically, we'd like to see the following features and capabilities > immediately in Flink: > # Metadata interoperability > # Data interoperability > # Data type compatibility > # Hive UDF support > # DDL/DML/Query language compatibility > For a longer term, we'd also like to add or improve: > # Compatible SQL service, client tools, JDBC/ODBC drivers > # Better task failure tolerance and task scheduling > # Support other user customizations in Hive (storage handlers, serdes, etc). > I will provide more details regarding the proposal in a doc shortly. Design > doc, if deemed necessary, will be provided in each related sub tasks under > this JIRA. > Feedback and contributions are greatly welcome! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-2592) Rework of FlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-2592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun closed FLINK-2592. --- Resolution: Won't Fix In fact, we want to remove {{FlinkMiniCluster}} since it is based on deprecated legacy mode. > Rework of FlinkMiniCluster > -- > > Key: FLINK-2592 > URL: https://issues.apache.org/jira/browse/FLINK-2592 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Till Rohrmann >Priority: Minor > > Over the time, the {{FlinkMiniCluster}} has become quite complex to support > all different execution modes (batch vs. streaming, standalone vs. ha with > ZooKeeper, single {{ActorSystem}} vs. multiple {{ActorSystems}}, etc.). There > is no consistent way of configuring all these options. Therefore it would be > good to rework the {{FlinkMiniCluster}} to avoid configuring it via the > {{Configuration}} object and instead to use explicit options which can be > turned on and off. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9932) If task executor offer slot to job master timeout the first time, the slot will leak
[ https://issues.apache.org/jira/browse/FLINK-9932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9932. -- Resolution: Fixed Fixed via 1.7.0: 438f650b180d1b47a4fa79c9c8314787b3d164e3 d7740bd00c8752303beef120ab55652cc7ed7ab5 1.6.2: b64cbd6dfe4445aa45f9f8982de1ec83cea9c040 c60fdc4e0d1885a25145995fdc51b3a4acce7500 1.5.5: 9892717a856c3ef30de922f6e452e943cab66d34 28bf20c49f5323b57c23c2b27249ff2c84b69d59 > If task executor offer slot to job master timeout the first time, the slot > will leak > > > Key: FLINK-9932 > URL: https://issues.apache.org/jira/browse/FLINK-9932 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > When task executor offer slot to job master, it will first mark the slot as > active. > If the offer slot call timeout, the task executor will try to call > offerSlotsToJobManager again, > but it will only offer the slot in ALLOCATED state. As the slot has already > be mark ACTIVE, it will never be offered and this will cause slot leak. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9932) If task executor offer slot to job master timeout the first time, the slot will leak
[ https://issues.apache.org/jira/browse/FLINK-9932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650716#comment-16650716 ] ASF GitHub Bot commented on FLINK-9932: --- asfgit closed pull request #6780: [FLINK-9932] [runtime] fix slot leak when task executor offer slot to job master timeout URL: https://github.com/apache/flink/pull/6780 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index ae69e561bc7..599bee99da4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -444,11 +444,16 @@ public void start() throws Exception { throw new TaskSubmissionException(message); } - if (!taskSlotTable.existsActiveSlot(jobId, tdd.getAllocationId())) { - final String message = "No task slot allocated for job ID " + jobId + - " and allocation ID " + tdd.getAllocationId() + '.'; - log.debug(message); - throw new TaskSubmissionException(message); + try { + if (!taskSlotTable.markSlotActive(tdd.getAllocationId()) && + !taskSlotTable.isActive(tdd.getTargetSlotNumber(), tdd.getJobId(), tdd.getAllocationId())) { + final String message = "No task slot allocated for job ID " + jobId + + " and allocation ID " + tdd.getAllocationId() + '.'; + log.debug(message); + throw new TaskSubmissionException(message); + } + } catch (SlotNotFoundException e) { + throw new TaskSubmissionException(e); } // re-integrate offloaded data: @@ -1050,18 +1055,6 @@ private void offerSlotsToJobManager(final JobID jobId) { while (reservedSlotsIterator.hasNext()) { SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer(); - try { - if (!taskSlotTable.markSlotActive(offer.getAllocationId())) { - // the slot is either free or releasing at the moment - final String message = "Could not mark slot " + jobId + " active."; - log.debug(message); - jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new Exception(message)); - } - } catch (SlotNotFoundException e) { - final String message = "Could not mark slot " + jobId + " active."; - jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new Exception(message)); - continue; - } reservedSlots.add(offer); } @@ -1091,7 +1084,20 @@ private void offerSlotsToJobManager(final JobID jobId) { if (isJobManagerConnectionValid(jobId, jobMasterId)) { // mark accepted slots active for (SlotOffer acceptedSlot : acceptedSlots) { - reservedSlots.remove(acceptedSlot); + try { + if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId()) && + !taskSlotTable.isActive(acceptedSlot.getSlotIndex(), jobId, acceptedSlot.getAllocationId())) { +
[GitHub] asfgit closed pull request #6780: [FLINK-9932] [runtime] fix slot leak when task executor offer slot to job master timeout
asfgit closed pull request #6780: [FLINK-9932] [runtime] fix slot leak when task executor offer slot to job master timeout URL: https://github.com/apache/flink/pull/6780 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index ae69e561bc7..599bee99da4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -444,11 +444,16 @@ public void start() throws Exception { throw new TaskSubmissionException(message); } - if (!taskSlotTable.existsActiveSlot(jobId, tdd.getAllocationId())) { - final String message = "No task slot allocated for job ID " + jobId + - " and allocation ID " + tdd.getAllocationId() + '.'; - log.debug(message); - throw new TaskSubmissionException(message); + try { + if (!taskSlotTable.markSlotActive(tdd.getAllocationId()) && + !taskSlotTable.isActive(tdd.getTargetSlotNumber(), tdd.getJobId(), tdd.getAllocationId())) { + final String message = "No task slot allocated for job ID " + jobId + + " and allocation ID " + tdd.getAllocationId() + '.'; + log.debug(message); + throw new TaskSubmissionException(message); + } + } catch (SlotNotFoundException e) { + throw new TaskSubmissionException(e); } // re-integrate offloaded data: @@ -1050,18 +1055,6 @@ private void offerSlotsToJobManager(final JobID jobId) { while (reservedSlotsIterator.hasNext()) { SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer(); - try { - if (!taskSlotTable.markSlotActive(offer.getAllocationId())) { - // the slot is either free or releasing at the moment - final String message = "Could not mark slot " + jobId + " active."; - log.debug(message); - jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new Exception(message)); - } - } catch (SlotNotFoundException e) { - final String message = "Could not mark slot " + jobId + " active."; - jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new Exception(message)); - continue; - } reservedSlots.add(offer); } @@ -1091,7 +1084,20 @@ private void offerSlotsToJobManager(final JobID jobId) { if (isJobManagerConnectionValid(jobId, jobMasterId)) { // mark accepted slots active for (SlotOffer acceptedSlot : acceptedSlots) { - reservedSlots.remove(acceptedSlot); + try { + if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId()) && + !taskSlotTable.isActive(acceptedSlot.getSlotIndex(), jobId, acceptedSlot.getAllocationId())) { + // the slot is either free or releasing at the moment + final String message = "Could not mark slot "
[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka
[ https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650696#comment-16650696 ] ASF GitHub Bot commented on FLINK-9697: --- yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-429991327 cc @aljoscha This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide connector for modern Kafka > -- > > Key: FLINK-9697 > URL: https://issues.apache.org/jira/browse/FLINK-9697 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Kafka 2.0.0 would be released soon. > Here is vote thread: > [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1] > We should provide connector for Kafka 2.0.0 once it is released. > Upgrade to 2.0 documentation : > http://kafka.apache.org/20/documentation.html#upgrade_2_0_0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka
yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-429991327 cc @aljoscha This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10542) Register Hive metastore as an external catalog in TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xuefu Zhang updated FLINK-10542: Issue Type: Sub-task (was: New Feature) Parent: FLINK-10556 > Register Hive metastore as an external catalog in TableEnvironment > -- > > Key: FLINK-10542 > URL: https://issues.apache.org/jira/browse/FLINK-10542 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > > Similar to FLINK-2167 but rather register Hive metastore as an external > ctalog in the {{TableEnvironment}}. After registration, Table API and SQL > queries should be able to access all Hive tables. > This might supersede the need of FLINK-2167 because Hive metastore stores a > superset of tables available via hCat without an indirection. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10556) Integration with Apache Hive
Xuefu Zhang created FLINK-10556: --- Summary: Integration with Apache Hive Key: FLINK-10556 URL: https://issues.apache.org/jira/browse/FLINK-10556 Project: Flink Issue Type: New Feature Components: Batch Connectors and Input/Output Formats, SQL Client, Table API SQL Affects Versions: 1.6.0 Reporter: Xuefu Zhang Assignee: Xuefu Zhang This is an umbrella JIRA tracking all enhancement and issues related to integrating Flink with Hive ecosystem. This is an outcome of a discussion in the community, and thanks go to everyone that provided feedback and interest. Specifically, we'd like to see the following features and capabilities immediately in Flink: # Detadata interoperability # Data interoperability # Data type compatibility # Hive UDF support # DDL/DML/Query language compatibility For a longer term, we'd also like to add or improve: # Compatible SQL service, client tools, JDBC/ODBC drivers # Better task failure tolerance and task scheduling # Support other user customizations in Hive (storage handlers, serdes, etc). I will provide more details regarding the proposal in a doc shortly. Design doc, if deemed necessary, will be provided in each related sub tasks under this JIRA. Feedback and contributions are greatly welcome! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10525) Deserialization schema, skip data, that couldn't be properly deserialized
[ https://issues.apache.org/jira/browse/FLINK-10525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rafi Aroch reassigned FLINK-10525: -- Assignee: (was: Rafi Aroch) > Deserialization schema, skip data, that couldn't be properly deserialized > - > > Key: FLINK-10525 > URL: https://issues.apache.org/jira/browse/FLINK-10525 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Rinat Sharipov >Priority: Minor > > Hi mates, in accordance with the contract of > *org.apache.flink.api.common.serialization.DeserializationSchema*, it should > return *null* value, when content couldn’t be deserialized. > But in most cases (e.x. > *org.apache.flink.formats.avro.AvroDeserializationSchema*) method fails, if > data doesn't satisfy expected schema. > > We’ve implemented our own SerDe class, that returns *null*, if data doesn’t > satisfy schema, but it’s rather hard to maintain this functionality during > migration to the latest Flink version. > I think, that it’ll be useful feature, if Flink will support optional skip of > failed records in avro and other Deserializers -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun opened a new pull request #6849: [FLINK-10555] [test] Port AkkaSslITCase to new code base
TisonKun opened a new pull request #6849: [FLINK-10555] [test] Port AkkaSslITCase to new code base URL: https://github.com/apache/flink/pull/6849 ## What is the purpose of the change Port `AkkaSslITCase` to new code base, as `MiniClusterSslITCase`. ## Brief change log Testing the flink cluster using SSL transport for akka remoting, use `MiniCluster` as "flink cluster". ## Verifying this change This change is a trivial rework and it itself is a test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**not applicable**) cc @tillrohrmann @StephanEwen This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10555) Port AkkaSslITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10555: --- Labels: pull-request-available (was: ) > Port AkkaSslITCase to new code base > --- > > Key: FLINK-10555 > URL: https://issues.apache.org/jira/browse/FLINK-10555 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{AkkaSslITCase}} to new code base, as {{MiniClusterSslITCase}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10555) Port AkkaSslITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650525#comment-16650525 ] ASF GitHub Bot commented on FLINK-10555: TisonKun opened a new pull request #6849: [FLINK-10555] [test] Port AkkaSslITCase to new code base URL: https://github.com/apache/flink/pull/6849 ## What is the purpose of the change Port `AkkaSslITCase` to new code base, as `MiniClusterSslITCase`. ## Brief change log Testing the flink cluster using SSL transport for akka remoting, use `MiniCluster` as "flink cluster". ## Verifying this change This change is a trivial rework and it itself is a test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**not applicable**) cc @tillrohrmann @StephanEwen This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port AkkaSslITCase to new code base > --- > > Key: FLINK-10555 > URL: https://issues.apache.org/jira/browse/FLINK-10555 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{AkkaSslITCase}} to new code base, as {{MiniClusterSslITCase}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10555) Port AkkaSslITCase to new code base
TisonKun created FLINK-10555: Summary: Port AkkaSslITCase to new code base Key: FLINK-10555 URL: https://issues.apache.org/jira/browse/FLINK-10555 Project: Flink Issue Type: Sub-task Components: Tests Affects Versions: 1.7.0 Reporter: TisonKun Assignee: TisonKun Fix For: 1.7.0 Port {{AkkaSslITCase}} to new code base, as {{MiniClusterSslITCase}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10474) Don't translate IN with Literals to JOIN with VALUES for streaming queries
[ https://issues.apache.org/jira/browse/FLINK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650498#comment-16650498 ] ASF GitHub Bot commented on FLINK-10474: fhueske commented on issue #6792: [FLINK-10474][table] Don't translate IN/NOT_IN to JOIN with VALUES URL: https://github.com/apache/flink/pull/6792#issuecomment-429937833 Thanks for the update @hequn8128. The changes look good. I'll merge this PR. Thanks, Fabian This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Don't translate IN with Literals to JOIN with VALUES for streaming queries > -- > > Key: FLINK-10474 > URL: https://issues.apache.org/jira/browse/FLINK-10474 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.1, 1.7.0 >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > IN predicates with literals are translated to JOIN with VALUES if the number > of elements in the IN clause exceeds a certain threshold. This should not be > done, because a streaming join is very heavy and materializes both inputs > (which is fine for the VALUES) input but not for the other. > There are two ways to solve this: > # don't translate IN to a JOIN at all > # translate it to a JOIN but have a special join strategy if one input is > bound and final (non-updating) > Option 1. should be easy to do, option 2. requires much more effort. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on issue #6792: [FLINK-10474][table] Don't translate IN/NOT_IN to JOIN with VALUES
fhueske commented on issue #6792: [FLINK-10474][table] Don't translate IN/NOT_IN to JOIN with VALUES URL: https://github.com/apache/flink/pull/6792#issuecomment-429937833 Thanks for the update @hequn8128. The changes look good. I'll merge this PR. Thanks, Fabian This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10554) Bump flink-shaded dependency version
[ https://issues.apache.org/jira/browse/FLINK-10554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650482#comment-16650482 ] ASF GitHub Bot commented on FLINK-10554: zentol opened a new pull request #6848: [FLINK-10554][build] Bump flink-shaded to 5.0 URL: https://github.com/apache/flink/pull/6848 This PR bumps the flink-shaded version to 5.0. This shouldn't affect anything except giving the table API access to `jackson-dataformat-csv`. /cc @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Bump flink-shaded dependency version > > > Key: FLINK-10554 > URL: https://issues.apache.org/jira/browse/FLINK-10554 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10554) Bump flink-shaded dependency version
[ https://issues.apache.org/jira/browse/FLINK-10554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10554: --- Labels: pull-request-available (was: ) > Bump flink-shaded dependency version > > > Key: FLINK-10554 > URL: https://issues.apache.org/jira/browse/FLINK-10554 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol opened a new pull request #6848: [FLINK-10554][build] Bump flink-shaded to 5.0
zentol opened a new pull request #6848: [FLINK-10554][build] Bump flink-shaded to 5.0 URL: https://github.com/apache/flink/pull/6848 This PR bumps the flink-shaded version to 5.0. This shouldn't affect anything except giving the table API access to `jackson-dataformat-csv`. /cc @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10554) Bump flink-shaded dependency version
Chesnay Schepler created FLINK-10554: Summary: Bump flink-shaded dependency version Key: FLINK-10554 URL: https://issues.apache.org/jira/browse/FLINK-10554 Project: Flink Issue Type: Improvement Components: Build System Affects Versions: 1.7.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.7.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8578) Implement rowtime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650471#comment-16650471 ] Fabian Hueske commented on FLINK-8578: -- bq. If a user wants to upsert under proct-time, he doesn't need to define a rowtime attribute field in a table schema. The rowtime attribute will be converted to a regular TIMESTAMP attribute after the upsert conversion. bq. The problem is that rowtime attributes are always read from the internal StreamRecord timestamp field. So unless, there is a copy of the field in the data type of the DataStream, a user would not have access to that. But I agree, that seems to a not very common special case. > Implement rowtime DataStream to Table upsert conversion. > > > Key: FLINK-8578 > URL: https://issues.apache.org/jira/browse/FLINK-8578 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Flink-8577 implements upsert from stream under proctime. This task is going > to solve the order problem introduce by proctime. As proposed by Fabian in > FLINK-8545, it would be good to be able to declare a time attribute that > decides whether an upsert is performed or not. > {code:java} > Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key) > {code} > This is a good way to solve the order problem using rowtime. And an idea > comes to my mind that we can even remove the `.upsertOrder`, because the > rowtime attribute can only be defined once in a table schema. Removing > `.upsertOrder` also makes it easier to design api for TableSource and sql, > i.e, we don't need to add another new feature for the api. > Any suggestions are welcomed! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650448#comment-16650448 ] ASF GitHub Bot commented on FLINK-9592: --- kl0u commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing URL: https://github.com/apache/flink/pull/6824#issuecomment-429922404 Perfect @kent2171 ! Looking forward to your design proposal. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another. For more information about state > management of \{@code BucketingSink}, look > * through it's official documentation. > */ > public interface FileStateChangeCallback extends Serializable \{ /** * Used > to perform any additional operations, related with moving of file into next > state. * *
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650447#comment-16650447 ] ASF GitHub Bot commented on FLINK-9592: --- kent2171 closed pull request #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing URL: https://github.com/apache/flink/pull/6824 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 4f85e3cf8d5..67500ed549e 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -58,6 +58,7 @@ import java.lang.reflect.Method; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -113,6 +114,9 @@ * a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently * pending files will be moved to {@code finished}. * + * If it's necessary to perform any additional actions, when state of the file is changed, you need to register + * the list of callbacks using {@link #registerFileStateChangedCallback(FileStateChangedCallback...)} method. + * All of them will be called in accordance with the specified order. * * If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it * had when that last successful checkpoint occurred. To this end, when restoring, the restored files in {@code pending} @@ -323,6 +327,11 @@ private transient ProcessingTimeService processingTimeService; + /** +* The list of callbacks, that should be called, when state of the file is changed. +*/ + private List fileStateChangedCallbacks = new ArrayList<>(); + /** * Creates a new {@code BucketingSink} that writes files to the given base directory. * @@ -368,6 +377,11 @@ public void setInputType(TypeInformation type, ExecutionConfig executionConfi } } + public BucketingSink registerFileStateChangedCallback(FileStateChangedCallback... callbacks) { + fileStateChangedCallbacks.addAll(Arrays.asList(callbacks)); + return this; + } + @Override public void initializeState(FunctionInitializationContext context) throws Exception { Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized."); @@ -598,10 +612,14 @@ private void closeCurrentPartFile(BucketState bucketState) throws Exception { Path inProgressPath = getInProgressPathFor(currentPartPath); Path pendingPath = getPendingPathFor(currentPartPath); + LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPath, pendingPath); fs.rename(inProgressPath, pendingPath); - LOG.debug("Moving in-progress bucket {} to pending file {}", - inProgressPath, - pendingPath); + for (FileStateChangedCallback callback : fileStateChangedCallbacks) { + callback.onInProgressToPending(fs, pendingPath); + } + + LOG.debug("In-progress bucket {} successfully moved to pending file {}", inProgressPath, pendingPath); + bucketState.pendingFiles.add(currentPartPath.toString()); bucketState.currentFile = null; } @@ -702,11 +720,18 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { Path finalPath = new Path(filename); Path pendingPath = getPendingPathFor(finalPath); - fs.rename(pendingPath, finalPath); LOG.debug( "Moving pending file {} to final location having completed checkpoint {}.", -
[GitHub] kl0u commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing
kl0u commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing URL: https://github.com/apache/flink/pull/6824#issuecomment-429922404 Perfect @kent2171 ! Looking forward to your design proposal. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kent2171 closed pull request #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing
kent2171 closed pull request #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing URL: https://github.com/apache/flink/pull/6824 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 4f85e3cf8d5..67500ed549e 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -58,6 +58,7 @@ import java.lang.reflect.Method; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -113,6 +114,9 @@ * a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently * pending files will be moved to {@code finished}. * + * If it's necessary to perform any additional actions, when state of the file is changed, you need to register + * the list of callbacks using {@link #registerFileStateChangedCallback(FileStateChangedCallback...)} method. + * All of them will be called in accordance with the specified order. * * If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it * had when that last successful checkpoint occurred. To this end, when restoring, the restored files in {@code pending} @@ -323,6 +327,11 @@ private transient ProcessingTimeService processingTimeService; + /** +* The list of callbacks, that should be called, when state of the file is changed. +*/ + private List fileStateChangedCallbacks = new ArrayList<>(); + /** * Creates a new {@code BucketingSink} that writes files to the given base directory. * @@ -368,6 +377,11 @@ public void setInputType(TypeInformation type, ExecutionConfig executionConfi } } + public BucketingSink registerFileStateChangedCallback(FileStateChangedCallback... callbacks) { + fileStateChangedCallbacks.addAll(Arrays.asList(callbacks)); + return this; + } + @Override public void initializeState(FunctionInitializationContext context) throws Exception { Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized."); @@ -598,10 +612,14 @@ private void closeCurrentPartFile(BucketState bucketState) throws Exception { Path inProgressPath = getInProgressPathFor(currentPartPath); Path pendingPath = getPendingPathFor(currentPartPath); + LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPath, pendingPath); fs.rename(inProgressPath, pendingPath); - LOG.debug("Moving in-progress bucket {} to pending file {}", - inProgressPath, - pendingPath); + for (FileStateChangedCallback callback : fileStateChangedCallbacks) { + callback.onInProgressToPending(fs, pendingPath); + } + + LOG.debug("In-progress bucket {} successfully moved to pending file {}", inProgressPath, pendingPath); + bucketState.pendingFiles.add(currentPartPath.toString()); bucketState.currentFile = null; } @@ -702,11 +720,18 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { Path finalPath = new Path(filename); Path pendingPath = getPendingPathFor(finalPath); - fs.rename(pendingPath, finalPath); LOG.debug( "Moving pending file {} to final location having completed checkpoint {}.", - pendingPath, - pastCheckpointId); + pendingPath,
[jira] [Commented] (FLINK-8578) Implement rowtime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650443#comment-16650443 ] Hequn Cheng commented on FLINK-8578: > Upsert Order Yes, a non-timestamp order filed can cover more cases, however I agree that we can start from the time-attributes. Can the first version support upsert without {{.upsertOrder}}? If a user wants to upsert under proct-time, he doesn't need to define a rowtime attribute field in a table schema. The rowtime attribute will be converted to a regular TIMESTAMP attribute after the upsert conversion. > Should we also convert rowtime attributes into regular TIMESTAMP attributes, > i.e., remove the time-attribute property. Yes, I agree. > Implement rowtime DataStream to Table upsert conversion. > > > Key: FLINK-8578 > URL: https://issues.apache.org/jira/browse/FLINK-8578 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Flink-8577 implements upsert from stream under proctime. This task is going > to solve the order problem introduce by proctime. As proposed by Fabian in > FLINK-8545, it would be good to be able to declare a time attribute that > decides whether an upsert is performed or not. > {code:java} > Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key) > {code} > This is a good way to solve the order problem using rowtime. And an idea > comes to my mind that we can even remove the `.upsertOrder`, because the > rowtime attribute can only be defined once in a table schema. Removing > `.upsertOrder` also makes it easier to design api for TableSource and sql, > i.e, we don't need to add another new feature for the api. > Any suggestions are welcomed! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10135) Certain cluster-level metrics are no longer exposed
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10135. Resolution: Fixed 1.6: 5b5d95334a91bf7d51a8c02c0d168a6d4ad006e9 1.5: d09d06a0b53fec0d681eaa8e788f7984f5318191 > Certain cluster-level metrics are no longer exposed > --- > > Key: FLINK-10135 > URL: https://issues.apache.org/jira/browse/FLINK-10135 > Project: Flink > Issue Type: Bug > Components: JobManager, Metrics >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Joey Echeverria >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > In [the documentation for > metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster] > in the Flink 1.5.0 release, it says that the following metrics are reported > by the JobManager: > {noformat} > numRegisteredTaskManagers > numRunningJobs > taskSlotsAvailable > taskSlotsTotal > {noformat} > In the job manager REST endpoint > ({{http://:8081/jobmanager/metrics}}), those metrics don't > appear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650442#comment-16650442 ] ASF GitHub Bot commented on FLINK-9592: --- kent2171 commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing URL: https://github.com/apache/flink/pull/6824#issuecomment-429921703 ok, will return with the proposal, thx @kl0u This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another. For more information about state > management of \{@code BucketingSink}, look > * through it's official documentation. > */ > public interface FileStateChangeCallback extends Serializable \{ /** * Used > to perform any additional operations, related with moving of file into next > state. * * @param fs
[GitHub] kent2171 commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing
kent2171 commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing URL: https://github.com/apache/flink/pull/6824#issuecomment-429921703 ok, will return with the proposal, thx @kl0u This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9592) Notify on moving file into pending/ final state
[ https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650438#comment-16650438 ] ASF GitHub Bot commented on FLINK-9592: --- kl0u commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing URL: https://github.com/apache/flink/pull/6824#issuecomment-429920387 Hi @kent2171 ! I am not so sure if the `BucketingSink` is going to be deprecated soon. The reason that the new `StreamingFileSink` for now requires newer Hadoop versions. But specifically for this new feature, I would say to implement it on top of the new `StreamingFileSink`, as this is definitely going to be main filesystem sink in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Notify on moving file into pending/ final state > --- > > Key: FLINK-9592 > URL: https://issues.apache.org/jira/browse/FLINK-9592 > Project: Flink > Issue Type: New Feature > Components: filesystem-connector >Reporter: Rinat Sharipov >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > Hi mates, I got a proposal about functionality of BucketingSink. > > During implementation of one of our tasks we got the following need - create > a meta-file, with the path and additional information about the file, created > by BucketingSink, when it’s been moved into final place. > Unfortunately such behaviour is currently not available for us. > > We’ve implemented our own Sink, that provides an opportunity to register > notifiers, that will be called, when file state is changing, but current API > doesn’t allow us to add such behaviour using inheritance ... > > It seems, that such functionality could be useful, and could be a part of > BucketingSink API > What do you sink, should I make a PR ? > Sincerely yours, > *Rinat Sharipov* > Software Engineer at 1DMP CORE Team > > email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru] > mobile: +7 (925) 416-37-26 > Clever{color:#4f8f00}DATA{color} > make your data clever > > > > Hi, > I see that could be a useful feature. What exactly now is preventing you from > inheriting from BucketingSink? Maybe it would be just enough to make the > BucketingSink easier extendable. > One thing now that could collide with such feature is that Kostas is now > working on larger BucketingSink rework/refactor. > Piotrek > > > Hi guys, thx for your reply. > The following code info is actual for *release-1.5.0 tag, > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class* > > For now, BucketingSink has the following lifecycle of files > > When moving files from opened to pending state: > # on each item (*method* *invoke:434* *line*), we check that suitable bucket > exist, and contain opened file, in case, when opened file doesn’t exist, we > create one, and write item to it > # on each item (*method* *invoke:434* *line*), we check that suitable opened > file doesn’t exceed the limits, and if limits are exceeded, we close it and > move into pending state using *closeCurrentPartFile:568 line - private method* > # on each timer request (*onProcessingTime:482 line*), we check, if items > haven't been added to the opened file longer, than specified period of time, > we close it, using the same private method *closeCurrentPartFile:588 line* > > So, the only way, that we have, is to call our hook from > *closeCurrentPartFile*, that is private, so we copy-pasted the current impl > and injected our logic there > > > Files are moving from pending state into final, during checkpointing > lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and > contains a lot of logic, including discovery of files in pending states, > synchronization of state access and it’s modification, etc … > > So we couldn’t override it, or call super method and add some logic, because > when current impl changes the state of files, it removes them from state, and > we don’t have any opportunity to know, > for which files state have been changed. > > To solve such problem, we've created the following interface > > /** > * The \{@code FileStateChangeCallback}is used to perform any additional > operations, when > {@link BucketingSink} > * moves file from one state to another.
[GitHub] kl0u commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing
kl0u commented on issue #6824: [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing URL: https://github.com/apache/flink/pull/6824#issuecomment-429920387 Hi @kent2171 ! I am not so sure if the `BucketingSink` is going to be deprecated soon. The reason that the new `StreamingFileSink` for now requires newer Hadoop versions. But specifically for this new feature, I would say to implement it on top of the new `StreamingFileSink`, as this is definitely going to be main filesystem sink in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services