[jira] [Assigned] (FLINK-14055) Add advanced function DDL syntax "USING JAR/FILE/ACHIVE"
[ https://issues.apache.org/jira/browse/FLINK-14055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen reassigned FLINK-14055: -- Assignee: Zhenqiu Huang > Add advanced function DDL syntax "USING JAR/FILE/ACHIVE" > > > Key: FLINK-14055 > URL: https://issues.apache.org/jira/browse/FLINK-14055 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Zhenqiu Huang >Priority: Major > > As FLINK-7151 adds basic function DDL to Flink, this ticket is to support > dynamically loading functions from external source in function DDL with > advanced syntax like > > {code:java} > CREATE FUNCTION func_name as class_name USING JAR/FILE/ACHIEVE 'xxx' [, > JAR/FILE/ACHIEVE 'yyy'] ; > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-13372) Timestamp conversion bug in non-blink Table/SQL runtime
[ https://issues.apache.org/jira/browse/FLINK-13372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16894932#comment-16894932 ] Shuyi Chen commented on FLINK-13372: I'll provide a fix for 1.9.0. > Timestamp conversion bug in non-blink Table/SQL runtime > --- > > Key: FLINK-13372 > URL: https://issues.apache.org/jira/browse/FLINK-13372 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1, 1.9.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Critical > > Currently, in the non-blink table/SQL runtime, Flink used > SqlFunctions.internalToTimestamp(long v) from Calcite to convert event time > (in long) to java.sql.Timestamp. > {code:java} > public static Timestamp internalToTimestamp(long v) { return new Timestamp(v > - (long)LOCAL_TZ.getOffset(v)); } {code} > However, as discussed in the recent Calcite mailing list, > SqlFunctions.internalToTimestamp() assumes the input timestamp value is in > the current JVM’s default timezone (which is unusual), NOT milliseconds since > epoch. And SqlFunctions.internalToTimestamp() is used to convert timestamp > value in the current JVM’s default timezone to milliseconds since epoch, > which java.sql.Timestamp constructor takes. Therefore, the results will not > only be wrong, but change if the job runs in machines on different timezones > as well.(The only exception is that all your production machines uses UTC > timezone.) > Here is an example, if the user input value is 0 (00:00:00 UTC on 1 January > 1970), and the table/SQL runtime runs in a machine in PST (UTC-8), the output > sql.Timestamp after SqlFunctions.internalToTimestamp() will become 2880 > millisec since epoch (08:00:00 UTC on 1 January 1970); And with the same > input, if the table/SQL runtime runs again in a different machine in EST > (UTC-5), the output sql.Timestamp after SqlFunctions.internalToTimestamp() > will become 1800 millisec since epoch (05:00:00 UTC on 1 January 1970). > Currently, there are unittests to test the table/SQL API event time > input/output (e.g., GroupWindowITCase.testEventTimeTumblingWindow() and > SqlITCase.testDistinctAggWithMergeOnEventTimeSessionGroupWindow()). They now > all passed because we are comparing the string format of the time which > ignores timezone. If you step into the code, the actual java.sql.Timestamp > value is wrong and change as the tests run in different timezone (e.g., one > can use -Duser.timezone=PST to change the current JVM’s default timezone) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (FLINK-13372) Timestamp conversion bug in non-blink Table/SQL runtime
[ https://issues.apache.org/jira/browse/FLINK-13372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-13372: --- Description: Currently, in the non-blink table/SQL runtime, Flink used SqlFunctions.internalToTimestamp(long v) from Calcite to convert event time (in long) to java.sql.Timestamp. {code:java} public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - (long)LOCAL_TZ.getOffset(v)); } {code} However, as discussed in the recent Calcite mailing list, SqlFunctions.internalToTimestamp() assumes the input timestamp value is in the current JVM’s default timezone (which is unusual), NOT milliseconds since epoch. And SqlFunctions.internalToTimestamp() is used to convert timestamp value in the current JVM’s default timezone to milliseconds since epoch, which java.sql.Timestamp constructor takes. Therefore, the results will not only be wrong, but change if the job runs in machines on different timezones as well.(The only exception is that all your production machines uses UTC timezone.) Here is an example, if the user input value is 0 (00:00:00 UTC on 1 January 1970), and the table/SQL runtime runs in a machine in PST (UTC-8), the output sql.Timestamp after SqlFunctions.internalToTimestamp() will become 2880 millisec since epoch (08:00:00 UTC on 1 January 1970); And with the same input, if the table/SQL runtime runs again in a different machine in EST (UTC-5), the output sql.Timestamp after SqlFunctions.internalToTimestamp() will become 1800 millisec since epoch (05:00:00 UTC on 1 January 1970). Currently, there are unittests to test the table/SQL API event time input/output (e.g., GroupWindowITCase.testEventTimeTumblingWindow() and SqlITCase.testDistinctAggWithMergeOnEventTimeSessionGroupWindow()). They now all passed because we are comparing the string format of the time which ignores timezone. If you step into the code, the actual java.sql.Timestamp value is wrong and change as the tests run in different timezone (e.g., one can use -Duser.timezone=PST to change the current JVM’s default timezone) was: Currently, in the non-blink table/SQL runtime, Flink used SqlFunctions.internalToTimestamp(long v) from Calcite to convert event time (in long) to java.sql.Timestamp. {code:java} public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - (long)LOCAL_TZ.getOffset(v)); } {code} However, as discussed in the recent Calcite mailing list, SqlFunctions.internalToTimestamp() assumes the input timestamp value is in the current JVM’s default timezone (which is unusual), NOT milliseconds since epoch. And SqlFunctions.internalToTimestamp() is used to convert timestamp value in the current JVM’s default timezone to milliseconds since epoch, which java.sql.Timestamp constructor takes. Therefore, the results will not only be wrong, but change if the job runs in machines on different timezones as well. Here is an example, if the user input value is 0 (00:00:00 UTC on 1 January 1970), and the table/SQL runtime runs in a machine with in PST (UTC-8), the output sql.Timestamp after SqlFunctions.internalToTimestamp() will become 2880 millisec since epoch (08:00:00 UTC on 1 January 1970); And if the table/SQL runtime runs in a machine with in EST (UTC-5), the output sql.Timestamp after SqlFunctions.internalToTimestamp() will become 1800 millisec since epoch (05:00:00 UTC on 1 January 1970). Currently, there are unittests to test the table/SQL API event time input/output (e.g., GroupWindowITCase.testEventTimeTumblingWindow() and SqlITCase.testDistinctAggWithMergeOnEventTimeSessionGroupWindow()). They now all passed because we are comparing the string format of the time which ignores timezone. If you step into the code, the actual java.sql.Timestamp value is wrong and change as the tests run in different timezone (e.g., one can use -Duser.timezone=PST to change the current JVM’s default timezone) > Timestamp conversion bug in non-blink Table/SQL runtime > --- > > Key: FLINK-13372 > URL: https://issues.apache.org/jira/browse/FLINK-13372 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0, 1.8.1 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Critical > > Currently, in the non-blink table/SQL runtime, Flink used > SqlFunctions.internalToTimestamp(long v) from Calcite to convert event time > (in long) to java.sql.Timestamp. > {code:java} > public static Timestamp internalToTimestamp(long v) { return new Timestamp(v > - (long)LOCAL_TZ.getOffset(v)); } {code} > However, as discussed in the recent Calcite mailing list, > SqlFunctions.internalToTimestamp() assumes the input timestamp value is in > the current JVM’s
[jira] [Created] (FLINK-13372) Timestamp conversion bug in non-blink Table/SQL runtime
Shuyi Chen created FLINK-13372: -- Summary: Timestamp conversion bug in non-blink Table/SQL runtime Key: FLINK-13372 URL: https://issues.apache.org/jira/browse/FLINK-13372 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.8.1, 1.8.0, 1.7.2, 1.6.4, 1.6.3 Reporter: Shuyi Chen Assignee: Shuyi Chen Currently, in the non-blink table/SQL runtime, Flink used SqlFunctions.internalToTimestamp(long v) from Calcite to convert event time (in long) to java.sql.Timestamp. {code:java} public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - (long)LOCAL_TZ.getOffset(v)); } {code} However, as discussed in the recent Calcite mailing list, SqlFunctions.internalToTimestamp() assumes the input timestamp value is in the current JVM’s default timezone (which is unusual), NOT milliseconds since epoch. And SqlFunctions.internalToTimestamp() is used to convert timestamp value in the current JVM’s default timezone to milliseconds since epoch, which java.sql.Timestamp constructor takes. Therefore, the results will not only be wrong, but change if the job runs in machines on different timezones as well. Here is an example, if the user input value is 0 (00:00:00 UTC on 1 January 1970), and the table/SQL runtime runs in a machine with in PST (UTC-8), the output sql.Timestamp after SqlFunctions.internalToTimestamp() will become 2880 millisec since epoch (08:00:00 UTC on 1 January 1970); And if the table/SQL runtime runs in a machine with in EST (UTC-5), the output sql.Timestamp after SqlFunctions.internalToTimestamp() will become 1800 millisec since epoch (05:00:00 UTC on 1 January 1970). Currently, there are unittests to test the table/SQL API event time input/output (e.g., GroupWindowITCase.testEventTimeTumblingWindow() and SqlITCase.testDistinctAggWithMergeOnEventTimeSessionGroupWindow()). They now all passed because we are comparing the string format of the time which ignores timezone. If you step into the code, the actual java.sql.Timestamp value is wrong and change as the tests run in different timezone (e.g., one can use -Duser.timezone=PST to change the current JVM’s default timezone) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-13132) Allow ClusterEntrypoints use user main method to generate job graph
[ https://issues.apache.org/jira/browse/FLINK-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880947#comment-16880947 ] Shuyi Chen commented on FLINK-13132: Hi [~till.rohrmann], as [~ZhenqiuHuang] mentioned, we have a central deployer service managing 1000+ production jobs. Normally, when the YARN cluster is healthy, the deployment request rate to the deployer service is relatively low, <1qps. However, in case of cluster maintenance or cluster failures, the deployer service will be overwhelmed with deployment requests for, in worst case, all 1000+ production jobs in a short period of time. If we generate the job graph in the client side, which means that job graph generation happens in the deployer service in our case, we need to download the job jar to the deployer service in order to generate the job graph. In case of the entire YARN cluster failure we mentioned above, the deployer service, which normally have only a few instances, will become bottleneck both in network & CPU/memory because it needs to download all 1000+ job jars and generate all job graphs in a short time, which is about 100GB (say 100MB per job jar on average). And this will violate our SLA, or we have to overprovision the deployer service by a lot to spread out both the network and the cpu/memory load. However, if we generate the job graph in the Job Entrypoint, the load will naturally distribute among all nodes across the YARN cluster (i.e., a few hundreds worker nodes on tens of racks), and this is more scalable & resource efficient. As for the non-deterministic problem, could we generate the job graph once when the job is first started, and save the generated job graph in zookeeper, and when master failover happens, we just use the generated job graph in zookeeper to restart the job? Thanks. > Allow ClusterEntrypoints use user main method to generate job graph > --- > > Key: FLINK-13132 > URL: https://issues.apache.org/jira/browse/FLINK-13132 > Project: Flink > Issue Type: Improvement > Components: Deployment / YARN >Affects Versions: 1.8.0, 1.8.1 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > > We are building a service that can transparently deploy a job to different > cluster management systems, such as Yarn and another internal system. It is > very cost to download the jar and generate JobGraph in the client side. Thus, > I want to propose an improvement to make Yarn Entrypoints can be configurable > to use either FileJobGraphRetriever or ClassPathJobGraphRetriever. It is > actually a long asking TODO in AbstractionYarnClusterDescriptor in line 834. > https://github.com/apache/flink/blob/21468e0050dc5f97de5cfe39885e0d3fd648e399/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L834 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11914) Expose a REST endpoint in JobManager to kill specific TaskManager
[ https://issues.apache.org/jira/browse/FLINK-11914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16810193#comment-16810193 ] Shuyi Chen commented on FLINK-11914: Hi [~feng.xu], I dont think we should expose an Akka endpoint because Akka is an internal implementation detail, and AFAIK, the community is trying to deprecate the use of Akka in Flink. Thanks. > Expose a REST endpoint in JobManager to kill specific TaskManager > - > > Key: FLINK-11914 > URL: https://issues.apache.org/jira/browse/FLINK-11914 > Project: Flink > Issue Type: New Feature > Components: Runtime / REST >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > we want to add capability in the Flink web UI to kill each individual TM by > clicking a button, this would require first exposing the capability from the > REST API endpoint. The reason is that some TM might be running on a heavily > loaded YARN host over time, and we want to kill just that TM and have flink > JM to reallocate a TM to restart the job graph. The other approach would be > restart the entire YARN job and this is heavy-weight. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16809002#comment-16809002 ] Shuyi Chen commented on FLINK-11912: Hi [~aitozi], the current approach does the following: 1) as the KafkaConsumer discover new partition, it add the partition information to _manualRegisteredMetricSet_. 2) in the consumer polling run loop, for every iteration/poll, check if there is any partition not yet registered in _manualRegisteredMetricSet_. If there are still partitions left, check if the KafkaConsumer has already exposed the metric for those partitions, and register them with Flink. In short, the current approach will keep trying to register the partition metric once a new partition is discovered until the KafkaConsumer expose it. Therefore, I dont think we will lose partition lag metrics unless there are bugs with new partition discovery mechanism. What do you think? > Expose per partition Kafka lag metric in Flink Kafka connector > -- > > Key: FLINK-11912 > URL: https://issues.apache.org/jira/browse/FLINK-11912 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Affects Versions: 1.6.4, 1.7.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > In production, it's important that we expose the Kafka lag by partition > metric in order for users to diagnose which Kafka partition is lagging. > However, although the Kafka lag by partition metrics are available in > KafkaConsumer after 0.10.2, Flink was not able to properly register it > because the metrics are only available after the consumer start polling data > from partitions. I would suggest the following fix: > 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. > 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add > MetricName for those partitions that we want to register into > manualRegisteredMetricSet. > 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, > try to search for the metrics available in KafkaConsumer, and if found, > register it and remove the entry from manualRegisteredMetricSet. > The overhead of the above approach is bounded and only incur when discovering > new partitions, and registration is done once the KafkaConsumer have the > metrics exposed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11914) Expose a REST endpoint in JobManager to kill specific TaskManager
[ https://issues.apache.org/jira/browse/FLINK-11914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16803209#comment-16803209 ] Shuyi Chen commented on FLINK-11914: Hi [~Zentol], thanks a lot for the comments. cc [~till.rohrmann], since we had some offline discussion as well. Currently, the YARN resource scheduler does not take into dynamic resource usage. Also over time, the resource usage of some containers might increase or some containers might use more than what they ask for, thus, oversubscribe host resource. Also, the resource that causing lags might be CPU/memory/FD/Disk/network, or even some application specific cause. This commonly happen in a shared cluster, and it’s not possible for the resource scheduler to predict and regulate the runtime resource usage effectively. Like other frameworks, like MapReduce or Spark, if there is a straggle task, it should be the responsibility of the framework to restart the straggle task in a different node, but not the resource scheduler, since the resource schedule has no idea what it means for one container to be slow. I think exposing an endpoint to disconnect TM will enable us to build external monitor/controller to recover the flink job by relocating the straggling TM. The external controller will synthesize information from Flink metrics, application metrics and host metrics to determine whether a TM is straggling and relocate it. This will greatly help scale our platform to manage more Flink jobs. Also, you are correct that it's possible that the same slow host get allocated again after the kill. To mitigate the issue, I propose we can add a reason parameter for the API and let the Flink resource scheduler to blacklist that host from the resource acquisition from YARN/Mesos. With regards to adding a UI button for this, I understand your concern and we can discuss the need in follow-up. > Expose a REST endpoint in JobManager to kill specific TaskManager > - > > Key: FLINK-11914 > URL: https://issues.apache.org/jira/browse/FLINK-11914 > Project: Flink > Issue Type: New Feature > Components: Runtime / REST >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > we want to add capability in the Flink web UI to kill each individual TM by > clicking a button, this would require first exposing the capability from the > REST API endpoint. The reason is that some TM might be running on a heavily > loaded YARN host over time, and we want to kill just that TM and have flink > JM to reallocate a TM to restart the job graph. The other approach would be > restart the entire YARN job and this is heavy-weight. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16802858#comment-16802858 ] Shuyi Chen commented on FLINK-11912: Hi [~becket_qin], thanks a lot for your comments. The use of manualRegisteredMetricSet is not to remove the MetricGroup, but to prevent registering the same MetricGroup multiple times. The reason is that, the per partition metrics are only made available in KafkaConsumer after it has been assigned the partitions and start polling. so we can only register those metrics in the consumer polling run loop, however, since it's a loop, we need to prevent the same metric being registerred again and again in the loop. Therefore, we add new entries to manualRegisteredMetricSet when new partitions are discovered to signal new metrics should be available soon for registration, and when successful registered, we remove those entries so that it wont reregister again in the loop to prevent hogging the consumer thread. Let me know otherwise. > Expose per partition Kafka lag metric in Flink Kafka connector > -- > > Key: FLINK-11912 > URL: https://issues.apache.org/jira/browse/FLINK-11912 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Affects Versions: 1.6.4, 1.7.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > In production, it's important that we expose the Kafka lag by partition > metric in order for users to diagnose which Kafka partition is lagging. > However, although the Kafka lag by partition metrics are available in > KafkaConsumer after 0.10.2, Flink was not able to properly register it > because the metrics are only available after the consumer start polling data > from partitions. I would suggest the following fix: > 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. > 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add > MetricName for those partitions that we want to register into > manualRegisteredMetricSet. > 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, > try to search for the metrics available in KafkaConsumer, and if found, > register it and remove the entry from manualRegisteredMetricSet. > The overhead of the above approach is bounded and only incur when discovering > new partitions, and registration is done once the KafkaConsumer have the > metrics exposed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9477) Support SQL 2016 JSON functions in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-9477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16802008#comment-16802008 ] Shuyi Chen commented on FLINK-9477: --- That sounds like a good idea. I will split the tasks according to [CALCITE-2867|https://issues.apache.org/jira/browse/CALCITE-2867]. > Support SQL 2016 JSON functions in Flink SQL > > > Key: FLINK-9477 > URL: https://issues.apache.org/jira/browse/FLINK-9477 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11914) Expose a REST endpoint in JobManager to kill specific TaskManager
[ https://issues.apache.org/jira/browse/FLINK-11914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16793176#comment-16793176 ] Shuyi Chen commented on FLINK-11914: [~gjy], thanks a lot for the quick reply. Yes, to kill the TM process on a host, it would require sudo permission to do so. And we dont allow individual job owners to have this privilege for security reason, as they might accidentally kill other user's job colocating on the same host. Also, exposing the API will allow our external monitoring service (called watchdog) to monitor the TM health and programmatically disconnect it if it experiences issues. I see the JobMasterGateway already has a disconnectTaskManager() interface, so it wont be too much effort to add a REST endpoint to expose the capability. What do you think? > Expose a REST endpoint in JobManager to kill specific TaskManager > - > > Key: FLINK-11914 > URL: https://issues.apache.org/jira/browse/FLINK-11914 > Project: Flink > Issue Type: New Feature > Components: Runtime / REST >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > we want to add capability in the Flink web UI to kill each individual TM by > clicking a button, this would require first exposing the capability from the > REST API endpoint. The reason is that some TM might be running on a heavily > loaded YARN host over time, and we want to kill just that TM and have flink > JM to reallocate a TM to restart the job graph. The other approach would be > restart the entire YARN job and this is heavy-weight. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11914) Expose a REST endpoint in JobManager to disconnect specific TaskManager
[ https://issues.apache.org/jira/browse/FLINK-11914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16792207#comment-16792207 ] Shuyi Chen commented on FLINK-11914: Hi [~Zentol], [~trohrm...@apache.org], [~gyao], what do you think? Thanks a lot. > Expose a REST endpoint in JobManager to disconnect specific TaskManager > --- > > Key: FLINK-11914 > URL: https://issues.apache.org/jira/browse/FLINK-11914 > Project: Flink > Issue Type: New Feature > Components: Runtime / REST >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > we want to add capability in the Flink web UI to kill each individual TM by > clicking a button, this would require first exposing the capability from the > REST API endpoint. The reason is that some TM might be running on a heavily > loaded YARN host over time, and we want to kill just that TM and have flink > JM to reallocate a TM to restart the job graph. The other approach would be > restart the entire YARN job and this is heavy-weight. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16792155#comment-16792155 ] Shuyi Chen edited comment on FLINK-11912 at 3/14/19 12:24 AM: -- Hi [~tzulitai], I've attached a proposed tentative change (experimental) [here|https://github.com/apache/flink/commit/acaa46fdae6d1b3ba89caaef94ab6547be3688ea], could you please take a look and let me know if this is the right approach? Thanks a lot. was (Author: suez1224): Hi [~tzulitai], I've attached a proposed tentative change (experimental) [here|https://github.com/apache/flink/commit/c37394acc01ea5a0c4e2681319ecbfaa63beead3], could you please take a look and let me know if this is the right approach? Thanks a lot. > Expose per partition Kafka lag metric in Flink Kafka connector > -- > > Key: FLINK-11912 > URL: https://issues.apache.org/jira/browse/FLINK-11912 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Affects Versions: 1.6.4, 1.7.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > In production, it's important that we expose the Kafka lag by partition > metric in order for users to diagnose which Kafka partition is lagging. > However, although the Kafka lag by partition metrics are available in > KafkaConsumer after 0.10.2, Flink was not able to properly register it > because the metrics are only available after the consumer start polling data > from partitions. I would suggest the following fix: > 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. > 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add > MetricName for those partitions that we want to register into > manualRegisteredMetricSet. > 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, > try to search for the metrics available in KafkaConsumer, and if found, > register it and remove the entry from manualRegisteredMetricSet. > The overhead of the above approach is bounded and only incur when discovering > new partitions, and registration is done once the KafkaConsumer have the > metrics exposed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16792155#comment-16792155 ] Shuyi Chen edited comment on FLINK-11912 at 3/14/19 12:22 AM: -- Hi [~tzulitai], I've attached a proposed tentative change (experimental) [here|https://github.com/apache/flink/commit/c37394acc01ea5a0c4e2681319ecbfaa63beead3], could you please take a look and let me know if this is the right approach? Thanks a lot. was (Author: suez1224): Hi [~tzulitai], I've attached a proposed tentative change (experimental) [here|https://github.com/apache/flink/commit/094135efcadf5c0ddb47eabd66091e20d26d1417], could you please take a look and let me know if this is the right approach? Thanks a lot. > Expose per partition Kafka lag metric in Flink Kafka connector > -- > > Key: FLINK-11912 > URL: https://issues.apache.org/jira/browse/FLINK-11912 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Affects Versions: 1.6.4, 1.7.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > In production, it's important that we expose the Kafka lag by partition > metric in order for users to diagnose which Kafka partition is lagging. > However, although the Kafka lag by partition metrics are available in > KafkaConsumer after 0.10.2, Flink was not able to properly register it > because the metrics are only available after the consumer start polling data > from partitions. I would suggest the following fix: > 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. > 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add > MetricName for those partitions that we want to register into > manualRegisteredMetricSet. > 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, > try to search for the metrics available in KafkaConsumer, and if found, > register it and remove the entry from manualRegisteredMetricSet. > The overhead of the above approach is bounded and only incur when discovering > new partitions, and registration is done once the KafkaConsumer have the > metrics exposed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11914) Expose a REST endpoint in JobManager to disconnect specific TaskManager
Shuyi Chen created FLINK-11914: -- Summary: Expose a REST endpoint in JobManager to disconnect specific TaskManager Key: FLINK-11914 URL: https://issues.apache.org/jira/browse/FLINK-11914 Project: Flink Issue Type: New Feature Components: Runtime / REST Reporter: Shuyi Chen Assignee: Shuyi Chen we want to add capability in the Flink web UI to kill each individual TM by clicking a button, this would require first exposing the capability from the REST API endpoint. The reason is that some TM might be running on a heavily loaded YARN host over time, and we want to kill just that TM and have flink JM to reallocate a TM to restart the job graph. The other approach would be restart the entire YARN job and this is heavy-weight. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16792155#comment-16792155 ] Shuyi Chen edited comment on FLINK-11912 at 3/13/19 10:36 PM: -- Hi [~tzulitai], I've attached a proposed tentative change (experimental) [here|https://github.com/apache/flink/commit/094135efcadf5c0ddb47eabd66091e20d26d1417], could you please take a look and let me know if this is the right approach? Thanks a lot. was (Author: suez1224): Hi [~tzulitai], I've attached a proposed tentative change [here|https://github.com/apache/flink/commit/094135efcadf5c0ddb47eabd66091e20d26d1417], could you please take a look and let me know what you think? Thanks a lot. > Expose per partition Kafka lag metric in Flink Kafka connector > -- > > Key: FLINK-11912 > URL: https://issues.apache.org/jira/browse/FLINK-11912 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Affects Versions: 1.6.4, 1.7.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > In production, it's important that we expose the Kafka lag by partition > metric in order for users to diagnose which Kafka partition is lagging. > However, although the Kafka lag by partition metrics are available in > KafkaConsumer after 0.10.2, Flink was not able to properly register it > because the metrics are only available after the consumer start polling data > from partitions. I would suggest the following fix: > 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. > 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add > MetricName for those partitions that we want to register into > manualRegisteredMetricSet. > 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, > try to search for the metrics available in KafkaConsumer, and if found, > register it and remove the entry from manualRegisteredMetricSet. > The overhead of the above approach is bounded and only incur when discovering > new partitions, and registration is done once the KafkaConsumer have the > metrics exposed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16792155#comment-16792155 ] Shuyi Chen edited comment on FLINK-11912 at 3/13/19 10:29 PM: -- Hi [~tzulitai], I've attached a proposed tentative change [here|https://github.com/apache/flink/commit/094135efcadf5c0ddb47eabd66091e20d26d1417], could you please take a look and let me know what you think? Thanks a lot. was (Author: suez1224): Hi [~tzulitai], I've attached a proposed change [here|https://github.com/apache/flink/commit/094135efcadf5c0ddb47eabd66091e20d26d1417], could you please take a look and let me know what you think? Thanks a lot. > Expose per partition Kafka lag metric in Flink Kafka connector > -- > > Key: FLINK-11912 > URL: https://issues.apache.org/jira/browse/FLINK-11912 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Affects Versions: 1.6.4, 1.7.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > In production, it's important that we expose the Kafka lag by partition > metric in order for users to diagnose which Kafka partition is lagging. > However, although the Kafka lag by partition metrics are available in > KafkaConsumer after 0.10.2, Flink was not able to properly register it > because the metrics are only available after the consumer start polling data > from partitions. I would suggest the following fix: > 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. > 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add > MetricName for those partitions that we want to register into > manualRegisteredMetricSet. > 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, > try to search for the metrics available in KafkaConsumer, and if found, > register it and remove the entry from manualRegisteredMetricSet. > The overhead of the above approach is bounded and only incur when discovering > new partitions, and registration is done once the KafkaConsumer have the > metrics exposed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16792155#comment-16792155 ] Shuyi Chen commented on FLINK-11912: Hi [~tzulitai], I've attached a proposed change [here|https://github.com/apache/flink/commit/094135efcadf5c0ddb47eabd66091e20d26d1417], could you please take a look and let me know what you think? Thanks a lot. > Expose per partition Kafka lag metric in Flink Kafka connector > -- > > Key: FLINK-11912 > URL: https://issues.apache.org/jira/browse/FLINK-11912 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Affects Versions: 1.6.4, 1.7.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > In production, it's important that we expose the Kafka lag by partition > metric in order for users to diagnose which Kafka partition is lagging. > However, although the Kafka lag by partition metrics are available in > KafkaConsumer after 0.10.2, Flink was not able to properly register it > because the metrics are only available after the consumer start polling data > from partitions. I would suggest the following fix: > 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. > 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add > MetricName for those partitions that we want to register into > manualRegisteredMetricSet. > 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, > try to search for the metrics available in KafkaConsumer, and if found, > register it and remove the entry from manualRegisteredMetricSet. > The overhead of the above approach is bounded and only incur when discovering > new partitions, and registration is done once the KafkaConsumer have the > metrics exposed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-11912: --- Summary: Expose per partition Kafka lag metric in Flink Kafka connector (was: Expose per partition Kafka lag metric in Flink Kafka consumer) > Expose per partition Kafka lag metric in Flink Kafka connector > -- > > Key: FLINK-11912 > URL: https://issues.apache.org/jira/browse/FLINK-11912 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Affects Versions: 1.6.4, 1.7.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > In production, it's important that we expose the Kafka lag by partition > metric in order for users to diagnose which Kafka partition is lagging. > However, although the Kafka lag by partition metrics are available in > KafkaConsumer, Flink was not able to properly register it because the metrics > are only available after the consumer start polling data from partitions. I > would suggest the following fix: > 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. > 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add > MetricName for those partitions that we want to register into > manualRegisteredMetricSet. > 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, > try to search for the metrics available in KafkaConsumer, and if found, > register it and remove the entry from manualRegisteredMetricSet. > The overhead of the above approach is bounded and only incur when discovering > new partitions, and registration is done once the KafkaConsumer have the > metrics exposed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-11912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-11912: --- Description: In production, it's important that we expose the Kafka lag by partition metric in order for users to diagnose which Kafka partition is lagging. However, although the Kafka lag by partition metrics are available in KafkaConsumer after 0.10.2, Flink was not able to properly register it because the metrics are only available after the consumer start polling data from partitions. I would suggest the following fix: 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add MetricName for those partitions that we want to register into manualRegisteredMetricSet. 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, try to search for the metrics available in KafkaConsumer, and if found, register it and remove the entry from manualRegisteredMetricSet. The overhead of the above approach is bounded and only incur when discovering new partitions, and registration is done once the KafkaConsumer have the metrics exposed. was: In production, it's important that we expose the Kafka lag by partition metric in order for users to diagnose which Kafka partition is lagging. However, although the Kafka lag by partition metrics are available in KafkaConsumer, Flink was not able to properly register it because the metrics are only available after the consumer start polling data from partitions. I would suggest the following fix: 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add MetricName for those partitions that we want to register into manualRegisteredMetricSet. 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, try to search for the metrics available in KafkaConsumer, and if found, register it and remove the entry from manualRegisteredMetricSet. The overhead of the above approach is bounded and only incur when discovering new partitions, and registration is done once the KafkaConsumer have the metrics exposed. > Expose per partition Kafka lag metric in Flink Kafka connector > -- > > Key: FLINK-11912 > URL: https://issues.apache.org/jira/browse/FLINK-11912 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kafka >Affects Versions: 1.6.4, 1.7.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > In production, it's important that we expose the Kafka lag by partition > metric in order for users to diagnose which Kafka partition is lagging. > However, although the Kafka lag by partition metrics are available in > KafkaConsumer after 0.10.2, Flink was not able to properly register it > because the metrics are only available after the consumer start polling data > from partitions. I would suggest the following fix: > 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. > 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add > MetricName for those partitions that we want to register into > manualRegisteredMetricSet. > 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, > try to search for the metrics available in KafkaConsumer, and if found, > register it and remove the entry from manualRegisteredMetricSet. > The overhead of the above approach is bounded and only incur when discovering > new partitions, and registration is done once the KafkaConsumer have the > metrics exposed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11912) Expose per partition Kafka lag metric in Flink Kafka consumer
Shuyi Chen created FLINK-11912: -- Summary: Expose per partition Kafka lag metric in Flink Kafka consumer Key: FLINK-11912 URL: https://issues.apache.org/jira/browse/FLINK-11912 Project: Flink Issue Type: New Feature Components: Connectors / Kafka Affects Versions: 1.7.2, 1.6.4 Reporter: Shuyi Chen Assignee: Shuyi Chen In production, it's important that we expose the Kafka lag by partition metric in order for users to diagnose which Kafka partition is lagging. However, although the Kafka lag by partition metrics are available in KafkaConsumer, Flink was not able to properly register it because the metrics are only available after the consumer start polling data from partitions. I would suggest the following fix: 1) In KafkaConsumerThread.run(), allocate a manualRegisteredMetricSet. 2) in the fetch loop, as KafkaConsumer discovers new partitions, manually add MetricName for those partitions that we want to register into manualRegisteredMetricSet. 3) in the fetch loop, check if manualRegisteredMetricSet is empty. If not, try to search for the metrics available in KafkaConsumer, and if found, register it and remove the entry from manualRegisteredMetricSet. The overhead of the above approach is bounded and only incur when discovering new partitions, and registration is done once the KafkaConsumer have the metrics exposed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16751269#comment-16751269 ] Shuyi Chen commented on FLINK-7608: --- [~Zentol], Thanks a lot, I found it. > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Hai Zhou >Assignee: Hai Zhou >Priority: Major > Fix For: 1.5.0 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16750675#comment-16750675 ] Shuyi Chen commented on FLINK-7608: --- [~Zentol], [~aljoscha], it appears that after this PR, I can no longer find the latency histogram from the web UI or the rest API. Is it because now it's grouped by operator id, and not vertex id? Is there a way that I can find this metric through the web UI or rest API? Thanks a lot. > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Hai Zhou >Assignee: Hai Zhou >Priority: Major > Fix For: 1.5.0 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10848) Flink's Yarn ResourceManager can allocate too many excess containers
[ https://issues.apache.org/jira/browse/FLINK-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16740797#comment-16740797 ] Shuyi Chen commented on FLINK-10848: Also, can you please also help me take a look at [FLINK-10868|https://issues.apache.org/jira/browse/FLINK-10868]? It's causing Flink job on YARN to keep retrying container allocation but not fail. Thanks a lot. > Flink's Yarn ResourceManager can allocate too many excess containers > > > Key: FLINK-10848 > URL: https://issues.apache.org/jira/browse/FLINK-10848 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2 >Reporter: Shuyi Chen >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.4, 1.7.2, 1.8.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > Currently, both the YarnFlinkResourceManager and YarnResourceManager do not > call removeContainerRequest() on container allocation success. Because the > YARN AM-RM protocol is not a delta protocol (please see YARN-1902), > AMRMClient will keep all ContainerRequests that are added and send them to RM. > In production, we observe the following that verifies the theory: 16 > containers are allocated and used upon cluster startup; when a TM is killed, > 17 containers are allocated, 1 container is used, and 16 excess containers > are returned; when another TM is killed, 18 containers are allocated, 1 > container is used, and 17 excess containers are returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10848) Flink's Yarn ResourceManager can allocate too many excess containers
[ https://issues.apache.org/jira/browse/FLINK-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16740796#comment-16740796 ] Shuyi Chen commented on FLINK-10848: Thanks a lot for fixing the issue, [~till.rohrmann]. One concern I have is that, in case of mismatch between container request and allocation containers (we ask for 2 vcores and YARN return container of 1 vcore), should we at least print out WARNING in the log to let the user know we are using containers of 1 vcore, and ask them not to use DefaultResourceCalculator if they need fine-grained cpu allocation? > Flink's Yarn ResourceManager can allocate too many excess containers > > > Key: FLINK-10848 > URL: https://issues.apache.org/jira/browse/FLINK-10848 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2 >Reporter: Shuyi Chen >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.6.4, 1.7.2, 1.8.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > Currently, both the YarnFlinkResourceManager and YarnResourceManager do not > call removeContainerRequest() on container allocation success. Because the > YARN AM-RM protocol is not a delta protocol (please see YARN-1902), > AMRMClient will keep all ContainerRequests that are added and send them to RM. > In production, we observe the following that verifies the theory: 16 > containers are allocated and used upon cluster startup; when a TM is killed, > 17 containers are allocated, 1 container is used, and 16 excess containers > are returned; when another TM is killed, 18 containers are allocated, 1 > container is used, and 17 excess containers are returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10848) Flink's Yarn ResourceManager can allocate too many excess containers
[ https://issues.apache.org/jira/browse/FLINK-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16735050#comment-16735050 ] Shuyi Chen commented on FLINK-10848: Hi [~gjy], we are running on 2.7.2. Can you share the steps to reproduce the problem? Thanks a lot. > Flink's Yarn ResourceManager can allocate too many excess containers > > > Key: FLINK-10848 > URL: https://issues.apache.org/jira/browse/FLINK-10848 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, both the YarnFlinkResourceManager and YarnResourceManager do not > call removeContainerRequest() on container allocation success. Because the > YARN AM-RM protocol is not a delta protocol (please see YARN-1902), > AMRMClient will keep all ContainerRequests that are added and send them to RM. > In production, we observe the following that verifies the theory: 16 > containers are allocated and used upon cluster startup; when a TM is killed, > 17 containers are allocated, 1 container is used, and 16 excess containers > are returned; when another TM is killed, 18 containers are allocated, 1 > container is used, and 17 excess containers are returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10848) Flink's Yarn ResourceManager can allocate too many excess containers
[ https://issues.apache.org/jira/browse/FLINK-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16734963#comment-16734963 ] Shuyi Chen commented on FLINK-10848: Fixed in 1.6: 7cc4c6f3e5e84efc067f2f2179648e31e5defa27 Fixed in 1.7: 2576076f36e75fa81896a7cc275315bd8cd848da > Flink's Yarn ResourceManager can allocate too many excess containers > > > Key: FLINK-10848 > URL: https://issues.apache.org/jira/browse/FLINK-10848 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, both the YarnFlinkResourceManager and YarnResourceManager do not > call removeContainerRequest() on container allocation success. Because the > YARN AM-RM protocol is not a delta protocol (please see YARN-1902), > AMRMClient will keep all ContainerRequests that are added and send them to RM. > In production, we observe the following that verifies the theory: 16 > containers are allocated and used upon cluster startup; when a TM is killed, > 17 containers are allocated, 1 container is used, and 16 excess containers > are returned; when another TM is killed, 18 containers are allocated, 1 > container is used, and 17 excess containers are returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10848) Flink's Yarn ResourceManager can allocate too many excess containers
[ https://issues.apache.org/jira/browse/FLINK-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen resolved FLINK-10848. Resolution: Fixed > Flink's Yarn ResourceManager can allocate too many excess containers > > > Key: FLINK-10848 > URL: https://issues.apache.org/jira/browse/FLINK-10848 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, both the YarnFlinkResourceManager and YarnResourceManager do not > call removeContainerRequest() on container allocation success. Because the > YARN AM-RM protocol is not a delta protocol (please see YARN-1902), > AMRMClient will keep all ContainerRequests that are added and send them to RM. > In production, we observe the following that verifies the theory: 16 > containers are allocated and used upon cluster startup; when a TM is killed, > 17 containers are allocated, 1 container is used, and 16 excess containers > are returned; when another TM is killed, 18 containers are allocated, 1 > container is used, and 17 excess containers are returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10848) Flink's Yarn ResourceManager can allocate too many excess containers
[ https://issues.apache.org/jira/browse/FLINK-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16734322#comment-16734322 ] Shuyi Chen commented on FLINK-10848: Fixed in master: e26d90fc86b266978b4bac84fe02ca34b62983fe. I'll patch the change to 1.6 and 1.7 later. > Flink's Yarn ResourceManager can allocate too many excess containers > > > Key: FLINK-10848 > URL: https://issues.apache.org/jira/browse/FLINK-10848 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently, both the YarnFlinkResourceManager and YarnResourceManager do not > call removeContainerRequest() on container allocation success. Because the > YARN AM-RM protocol is not a delta protocol (please see YARN-1902), > AMRMClient will keep all ContainerRequests that are added and send them to RM. > In production, we observe the following that verifies the theory: 16 > containers are allocated and used upon cluster startup; when a TM is killed, > 17 containers are allocated, 1 container is used, and 16 excess containers > are returned; when another TM is killed, 18 containers are allocated, 1 > container is used, and 17 excess containers are returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10848) Flink's Yarn ResourceManager can allocate too many excess containers
[ https://issues.apache.org/jira/browse/FLINK-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16733245#comment-16733245 ] Shuyi Chen commented on FLINK-10848: [~xinpu], thanks for sharing your thoughts. You are right, ideally, we should prevent step 2. However, in AMRMClientAsync.CallbackHandler, AFAIK, we can only know that the previous n container requests has been sent to RM through the onContainersAllocated callback, so step 2 might be difficult to prevent. > Flink's Yarn ResourceManager can allocate too many excess containers > > > Key: FLINK-10848 > URL: https://issues.apache.org/jira/browse/FLINK-10848 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Currently, both the YarnFlinkResourceManager and YarnResourceManager do not > call removeContainerRequest() on container allocation success. Because the > YARN AM-RM protocol is not a delta protocol (please see YARN-1902), > AMRMClient will keep all ContainerRequests that are added and send them to RM. > In production, we observe the following that verifies the theory: 16 > containers are allocated and used upon cluster startup; when a TM is killed, > 17 containers are allocated, 1 container is used, and 16 excess containers > are returned; when another TM is killed, 18 containers are allocated, 1 > container is used, and 17 excess containers are returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10848) Flink's Yarn ResourceManager can allocate too many excess containers
[ https://issues.apache.org/jira/browse/FLINK-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16726502#comment-16726502 ] Shuyi Chen commented on FLINK-10848: [~xinpu], the ContainerRequests are sent during the heartbeat with the RM, I am not aware of a callback that allow clients to remove sent ContainerRequest right after the heartbeat. Also, can you explain a bit on how container-completed can happens between container requests and container allocations on the same container? > Flink's Yarn ResourceManager can allocate too many excess containers > > > Key: FLINK-10848 > URL: https://issues.apache.org/jira/browse/FLINK-10848 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Currently, both the YarnFlinkResourceManager and YarnResourceManager do not > call removeContainerRequest() on container allocation success. Because the > YARN AM-RM protocol is not a delta protocol (please see YARN-1902), > AMRMClient will keep all ContainerRequests that are added and send them to RM. > In production, we observe the following that verifies the theory: 16 > containers are allocated and used upon cluster startup; when a TM is killed, > 17 containers are allocated, 1 container is used, and 16 excess containers > are returned; when another TM is killed, 18 containers are allocated, 1 > container is used, and 17 excess containers are returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11006) Update Calcite dependency to 1.18
[ https://issues.apache.org/jira/browse/FLINK-11006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen closed FLINK-11006. -- Resolution: Duplicate > Update Calcite dependency to 1.18 > - > > Key: FLINK-11006 > URL: https://issues.apache.org/jira/browse/FLINK-11006 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Rong Rong >Priority: Major > > Umbrella task to track all dependencies and tasks needs to be done for > upgrading to Calcite 1.18 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11006) Update Calcite dependency to 1.18
[ https://issues.apache.org/jira/browse/FLINK-11006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699379#comment-16699379 ] Shuyi Chen commented on FLINK-11006: This is a duplicate of [FLINK-10076|https://issues.apache.org/jira/browse/FLINK-10076]. > Update Calcite dependency to 1.18 > - > > Key: FLINK-11006 > URL: https://issues.apache.org/jira/browse/FLINK-11006 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Rong Rong >Priority: Major > > Umbrella task to track all dependencies and tasks needs to be done for > upgrading to Calcite 1.18 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9477) Support SQL 2016 JSON functions in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-9477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696440#comment-16696440 ] Shuyi Chen commented on FLINK-9477: --- Thanks [~x1q1j1]. Once Calcite 1.18 is released and we upgrade to Calcite 1.18, I'll create subtasks to track the implementations of the new JSON functions in Flink.. > Support SQL 2016 JSON functions in Flink SQL > > > Key: FLINK-9477 > URL: https://issues.apache.org/jira/browse/FLINK-9477 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10848) Flink's Yarn ResourceManager can allocate too many excess containers
[ https://issues.apache.org/jira/browse/FLINK-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16693835#comment-16693835 ] Shuyi Chen commented on FLINK-10848: [~till.rohrmann], can you help take a look at this PR? Thanks a lot. > Flink's Yarn ResourceManager can allocate too many excess containers > > > Key: FLINK-10848 > URL: https://issues.apache.org/jira/browse/FLINK-10848 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > Currently, both the YarnFlinkResourceManager and YarnResourceManager do not > call removeContainerRequest() on container allocation success. Because the > YARN AM-RM protocol is not a delta protocol (please see YARN-1902), > AMRMClient will keep all ContainerRequests that are added and send them to RM. > In production, we observe the following that verifies the theory: 16 > containers are allocated and used upon cluster startup; when a TM is killed, > 17 containers are allocated, 1 container is used, and 16 excess containers > are returned; when another TM is killed, 18 containers are allocated, 1 > container is used, and 17 excess containers are returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10868) Flink's Yarn ResourceManager doesn't use yarn.maximum-failed-containers as limit of resource acquirement
[ https://issues.apache.org/jira/browse/FLINK-10868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16693833#comment-16693833 ] Shuyi Chen commented on FLINK-10868: Hi [~till.rohrmann], the following describes the the problem that we saw: 1) In YarnResourceManager, after container is allocated, it will start the container in onContainerAllocated(). 2) In createTaskExecutorLaunchContext, it will try to call fs.getFileStatus in registerLocalResource which access the file status on HDFS. 3) In rare scenario when some of above files in HDFS was not accessible due to HDFS issues. createTaskExecutorLaunchContext will throw an exception and cause YarnResourceManager to keep reacquiring resource due to container start failure because that the files are no longer accessible. In the above case, the job will be in a loop of acquiring new resources, since the files is already broken/missing, there is no way to recover by flink itself and we need to fail the job and fall back to the client side to fix the files and resubmit entirely. Together with [FLINK-10848|https://issues.apache.org/jira/browse/FLINK-10848], it even exaggerate the problem and cause the entire YARN queue resource to get depleted. I've submitted a PR to fix [FLINK-10848|https://issues.apache.org/jira/browse/FLINK-10848], could you please also help take a look? I am wondering if we could separate this JIRA into 2 part, one for PerJobCluster, one for session cluster. For this jira, we could 1) apply yarn.maximum-failed-containers for PerJobCluster mode 2) log a warning saying that yarn.maximum-failed-containers is not supported for session cluster. 3) update the documentation on yarn.maximum-failed-containers on website What do you think? > Flink's Yarn ResourceManager doesn't use yarn.maximum-failed-containers as > limit of resource acquirement > > > Key: FLINK-10868 > URL: https://issues.apache.org/jira/browse/FLINK-10868 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.6.2, 1.7.0 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Major > > Currently, YarnResourceManager does use yarn.maximum-failed-containers as > limit of resource acquirement. In worse case, when new start containers > consistently fail, YarnResourceManager will goes into an infinite resource > acquirement process without failing the job. Together with the > https://issues.apache.org/jira/browse/FLINK-10848, It will quick occupy all > resources of yarn queue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10848) Flink's Yarn ResourceManager can allocate too many excess containers
Shuyi Chen created FLINK-10848: -- Summary: Flink's Yarn ResourceManager can allocate too many excess containers Key: FLINK-10848 URL: https://issues.apache.org/jira/browse/FLINK-10848 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.6.2, 1.5.5, 1.4.2, 1.3.3 Reporter: Shuyi Chen Assignee: Shuyi Chen Currently, both the YarnFlinkResourceManager and YarnResourceManager do not call removeContainerRequest() on container allocation success. Because the YARN AM-RM protocol is not a delta protocol (please see YARN-1902), AMRMClient will keep all ContainerRequests that are added and send them to RM. In production, we observe the following that verifies the theory: 16 containers are allocated and used upon cluster startup; when a TM is killed, 17 containers are allocated, 1 container is used, and 16 excess containers are returned; when another TM is killed, 18 containers are allocated, 1 container is used, and 17 excess containers are returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6962) Add a table SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-6962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672624#comment-16672624 ] Shuyi Chen commented on FLINK-6962: --- Design doc is attached. > Add a table SQL DDL > --- > > Key: FLINK-6962 > URL: https://issues.apache.org/jira/browse/FLINK-6962 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shuyi Chen >Priority: Major > > This Jira adds support to allow user define the DDL for source and sink > tables, including the waterMark(on source table) and emit SLA (on result > table). The detailed design doc will be attached soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10232) Add a SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-10232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672623#comment-16672623 ] Shuyi Chen commented on FLINK-10232: Design doc is attached. > Add a SQL DDL > - > > Key: FLINK-10232 > URL: https://issues.apache.org/jira/browse/FLINK-10232 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > This is an umbrella issue for all efforts related to supporting a SQL Data > Definition Language (DDL) in Flink's Table & SQL API. > Such a DDL includes creating, deleting, replacing: > - tables > - views > - functions > - types > - libraries > - catalogs > If possible, the parsing/validating/logical part should be done using > Calcite. Related issues are CALCITE-707, CALCITE-2045, CALCITE-2046, > CALCITE-2214, and others. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10231) Add a view SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-10231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671204#comment-16671204 ] Shuyi Chen commented on FLINK-10231: I somewhat agree that registerTable to be replaced with registerView, but registerTableInternal is used for both registering virtual tables and real tables, so I think it can stay. What do you think, [~fhueske]? Also, [~winipanda], the community is finalizing DDL design doc, and will share soon. You are welcome to comment on it :) > Add a view SQL DDL > -- > > Key: FLINK-10231 > URL: https://issues.apache.org/jira/browse/FLINK-10231 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: winifredtang >Priority: Major > > FLINK-10163 added initial view support for the SQL Client. However, for > supporting the [full definition of > views|https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/AlterView] > (with schema, comments, etc.) we need to support native support for views in > the Table API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16648665#comment-16648665 ] Shuyi Chen commented on FLINK-10516: [~till.rohrmann], [~aljoscha], do you want me to cherry pick the change onto branch release-1.5 and release-1.6? Thanks. > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16648664#comment-16648664 ] Shuyi Chen commented on FLINK-10516: This is fixed in 1.7.0 with 5e90ed95a580aefd84b72f593954d01f4eb67f68. > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644150#comment-16644150 ] Shuyi Chen commented on FLINK-10516: Hi [~till.rohrmann], how do you want to proceed? I can either send a PR to fix it at current master, and then we cherry pick for release 1.5.5 and 1.6.2, or do you have other suggestions? Thanks a lot. > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
Shuyi Chen created FLINK-10516: -- Summary: YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup Key: FLINK-10516 URL: https://issues.apache.org/jira/browse/FLINK-10516 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.6.0, 1.5.0, 1.4.0, 1.7.0 Reporter: Shuyi Chen Assignee: Shuyi Chen Fix For: 1.7.0 Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10232) Add a SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-10232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen reassigned FLINK-10232: -- Assignee: Shuyi Chen > Add a SQL DDL > - > > Key: FLINK-10232 > URL: https://issues.apache.org/jira/browse/FLINK-10232 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > This is an umbrella issue for all efforts related to supporting a SQL Data > Definition Language (DDL) in Flink's Table & SQL API. > Such a DDL includes creating, deleting, replacing: > - tables > - views > - functions > - types > - libraries > - catalogs > If possible, the parsing/validating/logical part should be done using > Calcite. Related issues are CALCITE-707, CALCITE-2045, CALCITE-2046, > CALCITE-2214, and others. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-6962) Add a table SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-6962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen reassigned FLINK-6962: - Assignee: Shuyi Chen (was: lincoln.lee) > Add a table SQL DDL > --- > > Key: FLINK-6962 > URL: https://issues.apache.org/jira/browse/FLINK-6962 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shuyi Chen >Priority: Major > > This Jira adds support to allow user define the DDL for source and sink > tables, including the waterMark(on source table) and emit SLA (on result > table). The detailed design doc will be attached soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10187) Fix LogicalUnnestRule to match Correlate/Uncollect correctly
Shuyi Chen created FLINK-10187: -- Summary: Fix LogicalUnnestRule to match Correlate/Uncollect correctly Key: FLINK-10187 URL: https://issues.apache.org/jira/browse/FLINK-10187 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.6.0 Reporter: Shuyi Chen Assignee: Shuyi Chen -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16584645#comment-16584645 ] Shuyi Chen commented on FLINK-10163: I think we can add this ticket as part of the DDL Flip, what do you think, [~twalthr]? > Support CREATE VIEW in SQL Client > - > > Key: FLINK-10163 > URL: https://issues.apache.org/jira/browse/FLINK-10163 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > The possibility to define a name for a subquery would improve the usability > of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a > virtual table. > > Example: > {code} > CREATE VIEW viewName > [ '(' columnName [, columnName ]* ')' ] > AS Query > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10076) Upgrade Calcite dependency to 1.18
Shuyi Chen created FLINK-10076: -- Summary: Upgrade Calcite dependency to 1.18 Key: FLINK-10076 URL: https://issues.apache.org/jira/browse/FLINK-10076 Project: Flink Issue Type: Task Components: Table API & SQL Reporter: Shuyi Chen Assignee: Shuyi Chen -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9134) Update Calcite dependency to 1.17
[ https://issues.apache.org/jira/browse/FLINK-9134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen closed FLINK-9134. - Resolution: Fixed > Update Calcite dependency to 1.17 > - > > Key: FLINK-9134 > URL: https://issues.apache.org/jira/browse/FLINK-9134 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > This is an umbrella issue for tasks that need to be performed when upgrading > to Calcite 1.17 once it is released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9135) Remove AggregateReduceFunctionsRule once CALCITE-2216 is fixed
[ https://issues.apache.org/jira/browse/FLINK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen closed FLINK-9135. - Resolution: Fixed > Remove AggregateReduceFunctionsRule once CALCITE-2216 is fixed > -- > > Key: FLINK-9135 > URL: https://issues.apache.org/jira/browse/FLINK-9135 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.5.0, 1.6.0 >Reporter: Fabian Hueske >Assignee: Shuyi Chen >Priority: Major > > We had to copy and slightly modify {{AggregateReduceFunctionsRule}} from > Calcite to fix FLINK-8903. > We proposed the changes to Calcite as CALCITE-2216. Once this issue is fixed > and we updated to Calcite dependency to a version that includes the fix, we > can remove our custom rule. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9135) Remove AggregateReduceFunctionsRule once CALCITE-2216 is fixed
[ https://issues.apache.org/jira/browse/FLINK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570890#comment-16570890 ] Shuyi Chen commented on FLINK-9135: --- Addressed in pull request (https://github.com/apache/flink/pull/6484) > Remove AggregateReduceFunctionsRule once CALCITE-2216 is fixed > -- > > Key: FLINK-9135 > URL: https://issues.apache.org/jira/browse/FLINK-9135 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.5.0, 1.6.0 >Reporter: Fabian Hueske >Assignee: Shuyi Chen >Priority: Major > > We had to copy and slightly modify {{AggregateReduceFunctionsRule}} from > Calcite to fix FLINK-8903. > We proposed the changes to Calcite as CALCITE-2216. Once this issue is fixed > and we updated to Calcite dependency to a version that includes the fix, we > can remove our custom rule. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9134) Update Calcite dependency to 1.17
[ https://issues.apache.org/jira/browse/FLINK-9134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570889#comment-16570889 ] Shuyi Chen commented on FLINK-9134: --- PR merged. > Update Calcite dependency to 1.17 > - > > Key: FLINK-9134 > URL: https://issues.apache.org/jira/browse/FLINK-9134 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > > This is an umbrella issue for tasks that need to be performed when upgrading > to Calcite 1.17 once it is released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped
[ https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen reassigned FLINK-9891: - Assignee: Shuyi Chen > Flink cluster is not shutdown in YARN mode when Flink client is stopped > --- > > Key: FLINK-9891 > URL: https://issues.apache.org/jira/browse/FLINK-9891 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0, 1.5.1 >Reporter: Sergey Krasovskiy >Assignee: Shuyi Chen >Priority: Blocker > > We are not using session mode and detached mode. The command to run Flink job > on YARN is: > {code:java} > /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm > 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount > {code} > Flink CLI logs: > {code:java} > Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set. > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2018-07-18 12:47:03,747 INFO > org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service > address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/ > 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - > No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - > No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-07-18 12:47:04,248 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the > HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink > YARN Client needs one of these to be set to properly load the Hadoop > configuration for accessing YARN. > 2018-07-18 12:47:04,409 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: > ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, > numberTaskManagers=1, slotsPerTaskManager=1} > 2018-07-18 12:47:04,783 WARN > org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit > local reads feature cannot be used because libhadoop cannot be loaded. > 2018-07-18 12:47:04,788 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration > directory > ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf') > contains both LOG4J and Logback configuration files. Please delete or rename > one of them. > 2018-07-18 12:47:07,846 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application > master application_1531474158783_10814 > 2018-07-18 12:47:08,073 INFO > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application > application_1531474158783_10814 > 2018-07-18 12:47:08,074 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster > to be allocated > 2018-07-18 12:47:08,076 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, > current state ACCEPTED > 2018-07-18 12:47:12,864 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has > been deployed successfully. > {code} > Job Manager logs: > {code:java} > 2018-07-18 12:47:09,913 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > > 2018-07-18 12:47:09,915 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting > YarnSessionClusterEntrypoint (Version: 1.5.1, Rev:3488f8b, Date:10.07.2018 @ > 11:51:27 GMT) > ... > {code} > Issues: > # Flink job is running as a Flink session > # Ctrl+C or 'stop' doesn't stop a job and YARN cluster > # Cancel job via Job Maanager web ui doesn't stop Flink cluster. To kill the > cluster we need to run: yarn application -kill > We also tried to run a flink job with 'mode: legacy' and we have the same > issues: > # Add property 'mode: legacy' to ./conf/flink-conf.yaml > # Execute the following command: > {code:java} > /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm > 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount > {code} > Flink CLI logs: > {code:java} > Setting HADOOP_CONF_DIR=/etc/hadoop/conf because
[jira] [Commented] (FLINK-6962) SQL DDL for input and output tables
[ https://issues.apache.org/jira/browse/FLINK-6962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16525753#comment-16525753 ] Shuyi Chen commented on FLINK-6962: --- Hi [~shenyufeng], I am finalizing the DDL design doc and should share it with the community within the next week or two. We would definitely like to get your feedback. > SQL DDL for input and output tables > --- > > Key: FLINK-6962 > URL: https://issues.apache.org/jira/browse/FLINK-6962 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: lincoln.lee >Priority: Major > > This Jira adds support to allow user define the DDL for source and sink > tables, including the waterMark(on source table) and emit SLA (on result > table). The detailed design doc will be attached soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9606) Support ParquetTableSource
[ https://issues.apache.org/jira/browse/FLINK-9606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515386#comment-16515386 ] Shuyi Chen commented on FLINK-9606: --- I think this is duplicate of [FLINK-2169|https://issues.apache.org/jira/browse/FLINK-2169]. > Support ParquetTableSource > -- > > Key: FLINK-9606 > URL: https://issues.apache.org/jira/browse/FLINK-9606 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: mingleizhang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR
[ https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-9559: -- Issue Type: Sub-task (was: Improvement) Parent: FLINK-9134 > The type of a union of CHAR columns of different lengths should be VARCHAR > -- > > Key: FLINK-9559 > URL: https://issues.apache.org/jira/browse/FLINK-9559 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Currently, If the case-when expression has two branches which return string > literal, redundant white spaces will be appended to the short string literal. > For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the > return value will be 'a ' of CHAR(3) instead of 'a'. > Although, this follows the behavior in strict SQL standard mode(SQL:2003). We > should get the pragmatic return type in a real scenario without blank-padded. > Happily, this problem has been fixed by > [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can > upgrade calcite to the next release(1.17.0) and override > {{RelDataTypeSystem}} in flink to configure the return type, i.e., making > {{shouldConvertRaggedUnionTypesToVarying()}} return true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9482) Not applicable functions for TIME
[ https://issues.apache.org/jira/browse/FLINK-9482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen closed FLINK-9482. - Resolution: Fixed > Not applicable functions for TIME > - > > Key: FLINK-9482 > URL: https://issues.apache.org/jira/browse/FLINK-9482 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Viktor Vlasov >Assignee: Viktor Vlasov >Priority: Minor > > Due to work on https://issues.apache.org/jira/browse/FLINK-9432 I have faced > with question how to check DECADE function with tests in > _org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala._ > > Because I have used CENTURY function as an example, first of all I have check > it. During the test I figured out that when we use it with TIME it returns 0. > I suppose arguments for such functions (also it works for YEAR, MONTH, > MILLENNIUM, etc) need to be checked and throw some exception if type is not > suitable. > As an example, in Apache Calcite project (checked in sqlline shell), when I > am trying to use CENTURY with TIME it throw: > {code:java} > java.lang.AssertionError: unexpected TIME > {code} > Need to determine, why such check is not exists and add it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9523) Add Kafka examples for Flink Table/SQL API
Shuyi Chen created FLINK-9523: - Summary: Add Kafka examples for Flink Table/SQL API Key: FLINK-9523 URL: https://issues.apache.org/jira/browse/FLINK-9523 Project: Flink Issue Type: Task Components: Examples Reporter: Shuyi Chen Given the popularity of Flink SQL and Kafka as streaming source, we want to add some examples of using Kafka JSON/Avro TableSource in flink-examples/flink-examples-table module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9477) Support SQL 2016 JSON functions in Flink SQL
Shuyi Chen created FLINK-9477: - Summary: Support SQL 2016 JSON functions in Flink SQL Key: FLINK-9477 URL: https://issues.apache.org/jira/browse/FLINK-9477 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Shuyi Chen Assignee: Shuyi Chen -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9430) Support Casting of Object to Primitive types for Flink SQL UDF
[ https://issues.apache.org/jira/browse/FLINK-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494761#comment-16494761 ] Shuyi Chen commented on FLINK-9430: --- Hi [~twalthr], I've attached the PR. Having an end-to-end JSON story in Flink SQL will be great. I'll create a separate ticket for it, we will need to first get SQL JSON grammar ready in Calcite. > Support Casting of Object to Primitive types for Flink SQL UDF > -- > > Key: FLINK-9430 > URL: https://issues.apache.org/jira/browse/FLINK-9430 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > We want to add a SQL UDF to access specific element in a JSON string using > JSON path. However, the JSON element can be of different types, e.g. Int, > Float, Double, String, Boolean and etc.. Since return type is not part of the > method signature, we can not use overload. So we will end up writing a UDF > for each type, e.g. GetFloatFromJSON, GetIntFromJSON and etc., which has a > lot of duplication. > One way to unify all these UDF functions is to implement one UDF and return > java.lang.Object, and in the SQL statement, use CAST AS to cast the returned > Object into the correct type. Below is an example: > > {code:java} > object JsonPathUDF extends ScalarFunction { > def eval(jsonStr: String, path: String): Object = { >JSONParser.parse(jsonStr).read(path) > } > }{code} > {code:java} > SELECT CAST(jsonpath(json, "$.store.book.title") AS VARCHAR(32)) as > bookTitle FROM table1{code} > The current Flink SQL cast implementation does not support casting from > GenericTypeInfo to another type, I have already got a local > branch to fix this. Please comment if there are alternatives to the problem > above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9430) Support Casting of Object to Primitive types for Flink SQL UDF
[ https://issues.apache.org/jira/browse/FLINK-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490026#comment-16490026 ] Shuyi Chen commented on FLINK-9430: --- The JSON object can be a int, float, double, boolean, array, map or etc.. Are you suggesting serialize the JSON object to String? But in that case, I need to write more UDFs to parse the String into desired type again, not cast, and also that is inefficient. What are the issues with returning Object? > Support Casting of Object to Primitive types for Flink SQL UDF > -- > > Key: FLINK-9430 > URL: https://issues.apache.org/jira/browse/FLINK-9430 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > We want to add a SQL UDF to access specific element in a JSON string using > JSON path. However, the JSON element can be of different types, e.g. Int, > Float, Double, String, Boolean and etc.. Since return type is not part of the > method signature, we can not use overload. So we will end up writing a UDF > for each type, e.g. GetFloatFromJSON, GetIntFromJSON and etc., which has a > lot of duplication. > One way to unify all these UDF functions is to implement one UDF and return > java.lang.Object, and in the SQL statement, use CAST AS to cast the returned > Object into the correct type. Below is an example: > > {code:java} > object JsonPathUDF extends ScalarFunction { > def eval(jsonStr: String, path: String): Object = { >JSONParser.parse(jsonStr).read(path) > } > }{code} > {code:java} > SELECT CAST(jsonpath(json, "$.store.book.title") AS VARCHAR(32)) as > bookTitle FROM table1{code} > The current Flink SQL cast implementation does not support casting from > GenericTypeInfo to another type, I have already got a local > branch to fix this. Please comment if there are alternatives to the problem > above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9430) Support Casting of Object to Primitive types for Flink SQL UDF
[ https://issues.apache.org/jira/browse/FLINK-9430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-9430: -- Description: We want to add a SQL UDF to access specific element in a JSON string using JSON path. However, the JSON element can be of different types, e.g. Int, Float, Double, String, Boolean and etc.. Since return type is not part of the method signature, we can not use overload. So we will end up writing a UDF for each type, e.g. GetFloatFromJSON, GetIntFromJSON and etc., which has a lot of duplication. One way to unify all these UDF functions is to implement one UDF and return java.lang.Object, and in the SQL statement, use CAST AS to cast the returned Object into the correct type. Below is an example: {code:java} object JsonPathUDF extends ScalarFunction { def eval(jsonStr: String, path: String): Object = { JSONParser.parse(jsonStr).read(path) } }{code} {code:java} SELECT CAST(jsonpath(json, "$.store.book.title") AS VARCHAR(32)) as bookTitle FROM table1{code} The current Flink SQL cast implementation does not support casting from GenericTypeInfo to another type, I have already got a local branch to fix this. Please comment if there are alternatives to the problem above. was: We want to add a SQL UDF to access specific element in a JSON string using JSON path. However, the JSON element can be of different types, e.g. Int, Float, Double, String, Boolean and etc.. Since return type is not part of the method signature, we can not use overload. So we will end up writing a UDF for each type, e.g. GetFloatFromJSON, GetIntFromJSON and etc., which has a lot of duplication. One way to unify all these UDF functions is to implement one UDF and return java.lang.Object, and in the SQL statement, use CAST AS to cast the returned Object into the correct type. Below is an example: {code:java} object JsonPathUDF extends ScalarFunction { def eval(jsonStr: String, path: String): Object = { JSONParser.parse(jsonStr).read(path) } }{code} {code:java} SELECT CAST(jsonpath(json, "$.store.book.title") AS VARCHAR(32)) as bookTitle FROM table1{code} I have already got a local branch working. Please comment if there are alternatives. > Support Casting of Object to Primitive types for Flink SQL UDF > -- > > Key: FLINK-9430 > URL: https://issues.apache.org/jira/browse/FLINK-9430 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > We want to add a SQL UDF to access specific element in a JSON string using > JSON path. However, the JSON element can be of different types, e.g. Int, > Float, Double, String, Boolean and etc.. Since return type is not part of the > method signature, we can not use overload. So we will end up writing a UDF > for each type, e.g. GetFloatFromJSON, GetIntFromJSON and etc., which has a > lot of duplication. > One way to unify all these UDF functions is to implement one UDF and return > java.lang.Object, and in the SQL statement, use CAST AS to cast the returned > Object into the correct type. Below is an example: > > {code:java} > object JsonPathUDF extends ScalarFunction { > def eval(jsonStr: String, path: String): Object = { >JSONParser.parse(jsonStr).read(path) > } > }{code} > {code:java} > SELECT CAST(jsonpath(json, "$.store.book.title") AS VARCHAR(32)) as > bookTitle FROM table1{code} > The current Flink SQL cast implementation does not support casting from > GenericTypeInfo to another type, I have already got a local > branch to fix this. Please comment if there are alternatives to the problem > above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9430) Support Casting of Object to Primitive types for Flink SQL UDF
Shuyi Chen created FLINK-9430: - Summary: Support Casting of Object to Primitive types for Flink SQL UDF Key: FLINK-9430 URL: https://issues.apache.org/jira/browse/FLINK-9430 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Shuyi Chen Assignee: Shuyi Chen We want to add a SQL UDF to access specific element in a JSON string using JSON path. However, the JSON element can be of different types, e.g. Int, Float, Double, String, Boolean and etc.. Since return type is not part of the method signature, we can not use overload. So we will end up writing a UDF for each type, e.g. GetFloatFromJSON, GetIntFromJSON and etc., which has a lot of duplication. One way to unify all these UDF functions is to implement one UDF and return java.lang.Object, and in the SQL statement, use CAST AS to cast the returned Object into the correct type. Below is an example: {code:java} object JsonPathUDF extends ScalarFunction { def eval(jsonStr: String, path: String): Object = { JSONParser.parse(jsonStr).read(path) } }{code} {code:java} SELECT CAST(jsonpath(json, "$.store.book.title") AS VARCHAR(32)) as bookTitle FROM table1{code} I have already got a local branch working. Please comment if there are alternatives. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16474801#comment-16474801 ] Shuyi Chen edited comment on FLINK-8866 at 5/14/18 9:07 PM: Hi [~walterddr], [~twalthr], [~fhueske], thanks a lot for the comments. I think there are a few challenges for this JIRA, 1) there can be a lot of duplicate code dealing with the unified table sink instantiation if we do it in the same way as TableSourceFactory/TableSourceFactoryService. So we should try to refactor/redesign it to make it cleaner. 2) to support a table which can be both source and sink, we need to have a unified interface at least when interacting with Calcite, so the same table name can be used for the source and sink in SQL. 3) when registering tableSinks, the current registerTableSink interface took additional parameters _fieldName_ and _fieldTypes_, which I dont think it's necessary and add complexity when integrating with SQL DDL and SQL client. I am experimenting the changes needed in my local branch, and writing a design doc. Would love to share the design doc soon when it's ready. was (Author: suez1224): Hi [~walterddr], [~twalthr], [~fhueske], thanks a lot for the comments. I think there are a few challenges for this JIRA, 1) there can be a lot of duplicate code dealing with the unified table sink instantiation if we do it in the same way as TableSourceFactory/TableSourceFactoryService. So we should try to refactor/redesign it to make it cleaner. 2) to support a table which can be both source and sink, we need to have a unified interface at least when interacting with Calcite, so the same table name can be used for the source and sink in SQL. 3) when registering tableSinks, the current registerTableSink interface took additional parameters _fieldName_ and _fieldTypes_, which I dont think it's necessary and add complexity when integrating with SQL DDL and SQL client. I am experimenting the changes needed in my local branch, and writing a design doc. Would love to share the design doc soon when I think it's ready. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16474801#comment-16474801 ] Shuyi Chen commented on FLINK-8866: --- Hi [~walterddr], [~twalthr], [~fhueske], thanks a lot for the comments. I think there are a few challenges for this JIRA, 1) there can be a lot of duplicate code dealing with the unified table sink instantiation if we do it in the same way as TableSourceFactory/TableSourceFactoryService. So we should try to refactor/redesign it to make it cleaner. 2) to support a table which can be both source and sink, we need to have a unified interface at least when interacting with Calcite, so the same table name can be used for the source and sink in SQL. 3) when registering tableSinks, the current registerTableSink interface took additional parameters _fieldName_ and _fieldTypes_, which I dont think it's necessary and add complexity when integrating with SQL DDL and SQL client. I am experimenting the changes needed in my local branch, and writing a design doc. Would love to share the design doc soon when I think it's ready. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9327) Support explicit ROW value constructor in Flink SQL
Shuyi Chen created FLINK-9327: - Summary: Support explicit ROW value constructor in Flink SQL Key: FLINK-9327 URL: https://issues.apache.org/jira/browse/FLINK-9327 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Shuyi Chen Assignee: Shuyi Chen Currently, explicit ROW value constructor can be only used in VALUES() statement. The parser will fail if ROW is explicitly used in SELECT, WHERE or etc. [CALCITE-2276|https://issues.apache.org/jira/browse/CALCITE-2276] fix the problem. We should integrate this as part of 1.17 upgrade, and add unittests for it in Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7001) Improve performance of Sliding Time Window with pane optimization
[ https://issues.apache.org/jira/browse/FLINK-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464544#comment-16464544 ] Shuyi Chen commented on FLINK-7001: --- Hi [~pgrulich], the paper is a nice read. And the technique applies to Tumble, Sliding & Session window, which is a good win, and the evaluation result looks good. Also, it seems you already have an implementation for Scotty using Apache Flink based on the paper. Maybe, you and [~jark] can share more, for each approach, about the detail design, pros and cons, and we can discuss them here? > Improve performance of Sliding Time Window with pane optimization > - > > Key: FLINK-7001 > URL: https://issues.apache.org/jira/browse/FLINK-7001 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > > Currently, the implementation of time-based sliding windows treats each > window individually and replicates records to each window. For a window of 10 > minute size that slides by 1 second the data is replicated 600 fold (10 > minutes / 1 second). We can optimize sliding window by divide windows into > panes (aligned with slide), so that we can avoid record duplication and > leverage the checkpoint. > I will attach a more detail design doc to the issue. > The following issues are similar to this issue: FLINK-5387, FLINK-6990 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen reassigned FLINK-7151: - Assignee: Shuyi Chen (was: yuemeng) > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng >Assignee: Shuyi Chen >Priority: Major > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9235) Add Integration test for Flink-Yarn-Kerberos integration for flip-6
Shuyi Chen created FLINK-9235: - Summary: Add Integration test for Flink-Yarn-Kerberos integration for flip-6 Key: FLINK-9235 URL: https://issues.apache.org/jira/browse/FLINK-9235 Project: Flink Issue Type: Test Affects Versions: 1.5.0 Reporter: Shuyi Chen Assignee: Shuyi Chen We need to provide an integration test for flip-6 similar to YARNSessionFIFOSecuredITCase for the legacy deployment mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for flip-6
[ https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16447411#comment-16447411 ] Shuyi Chen commented on FLINK-8286: --- Hi [~till.rohrmann] and [~aljoscha], spent some time on this, the Flink-Yarn-Kerberos integration is also broken. I'll provide a fix soon. > Fix Flink-Yarn-Kerberos integration for flip-6 > -- > > Key: FLINK-8286 > URL: https://issues.apache.org/jira/browse/FLINK-8286 > Project: Flink > Issue Type: Task > Components: Security >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > The current Flink-Yarn-Kerberos in Flip-6 is broken. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for flip-6
[ https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-8286: -- Description: The current Flink-Yarn-Kerberos in Flip-6 is broken. (was: The current ) > Fix Flink-Yarn-Kerberos integration for flip-6 > -- > > Key: FLINK-8286 > URL: https://issues.apache.org/jira/browse/FLINK-8286 > Project: Flink > Issue Type: Task > Components: Security >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > The current Flink-Yarn-Kerberos in Flip-6 is broken. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for flip-6
[ https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-8286: -- Summary: Fix Flink-Yarn-Kerberos integration for flip-6 (was: Investigate Flink-Yarn-Kerberos integration for flip-6) > Fix Flink-Yarn-Kerberos integration for flip-6 > -- > > Key: FLINK-8286 > URL: https://issues.apache.org/jira/browse/FLINK-8286 > Project: Flink > Issue Type: Task > Components: Security >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > We've found some issues with the Flink-Yarn-Kerberos integration in the > current deployment model, we will need to investigate and test it for flip-6 > when it's ready. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8286) Fix Flink-Yarn-Kerberos integration for flip-6
[ https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-8286: -- Description: The current (was: We've found some issues with the Flink-Yarn-Kerberos integration in the current deployment model, we will need to investigate and test it for flip-6 when it's ready.) > Fix Flink-Yarn-Kerberos integration for flip-6 > -- > > Key: FLINK-8286 > URL: https://issues.apache.org/jira/browse/FLINK-8286 > Project: Flink > Issue Type: Task > Components: Security >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > The current -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8286) Investigate Flink-Yarn-Kerberos integration for flip-6
[ https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16445560#comment-16445560 ] Shuyi Chen commented on FLINK-8286: --- Hi [~till.rohrmann] and [~aljoscha], the context is that there is a regression in flink kerberos yarn integration in 1.4, which is addressed in [FLINK-8275|https://issues.apache.org/jira/browse/FLINK-8275]. This task is created at that time to make sure that there is no regression on flip6 as well. I'll take a look the next few days. Also, can you point me to some existing integration tests for flip6 deployment that actually run a streaming/batch job on a mini-YARN cluster? Thanks. > Investigate Flink-Yarn-Kerberos integration for flip-6 > -- > > Key: FLINK-8286 > URL: https://issues.apache.org/jira/browse/FLINK-8286 > Project: Flink > Issue Type: Task > Components: Security >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > We've found some issues with the Flink-Yarn-Kerberos integration in the > current deployment model, we will need to investigate and test it for flip-6 > when it's ready. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9157) Support Apache HCatalog in SQL client
[ https://issues.apache.org/jira/browse/FLINK-9157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-9157: -- Summary: Support Apache HCatalog in SQL client (was: Support for commonly used external catalog) > Support Apache HCatalog in SQL client > - > > Key: FLINK-9157 > URL: https://issues.apache.org/jira/browse/FLINK-9157 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Priority: Major > > It will be great to have SQL-Client to support some external catalogs > out-of-the-box for SQL users to configure and utilize easily. Such as Apache > HCatalog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-1466) Add InputFormat to read HCatalog tables
[ https://issues.apache.org/jira/browse/FLINK-1466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-1466: -- Issue Type: Sub-task (was: New Feature) Parent: FLINK-9171 > Add InputFormat to read HCatalog tables > --- > > Key: FLINK-1466 > URL: https://issues.apache.org/jira/browse/FLINK-1466 > Project: Flink > Issue Type: Sub-task > Components: Java API, Scala API >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > HCatalog is a metadata repository and InputFormat to make Hive tables > accessible to other frameworks such as Pig. > Adding support for HCatalog would give access to Hive managed data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9157) Support for commonly used external catalog
[ https://issues.apache.org/jira/browse/FLINK-9157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-9157: -- Issue Type: Bug (was: Sub-task) Parent: (was: FLINK-7594) > Support for commonly used external catalog > -- > > Key: FLINK-9157 > URL: https://issues.apache.org/jira/browse/FLINK-9157 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Priority: Major > > It will be great to have SQL-Client to support some external catalogs > out-of-the-box for SQL users to configure and utilize easily. Such as Apache > HCatalog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9157) Support for commonly used external catalog
[ https://issues.apache.org/jira/browse/FLINK-9157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-9157: -- Issue Type: Sub-task (was: Bug) Parent: FLINK-9171 > Support for commonly used external catalog > -- > > Key: FLINK-9157 > URL: https://issues.apache.org/jira/browse/FLINK-9157 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Priority: Major > > It will be great to have SQL-Client to support some external catalogs > out-of-the-box for SQL users to configure and utilize easily. Such as Apache > HCatalog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-1913) Document how to access data in HCatalog
[ https://issues.apache.org/jira/browse/FLINK-1913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-1913: -- Issue Type: Sub-task (was: Bug) Parent: FLINK-9171 > Document how to access data in HCatalog > --- > > Key: FLINK-1913 > URL: https://issues.apache.org/jira/browse/FLINK-1913 > Project: Flink > Issue Type: Sub-task > Components: Documentation, flink-hcatalog >Reporter: Robert Metzger >Assignee: Zhenqiu Huang >Priority: Major > > Reading from HCatalog was added in FLINK-1466, but not documented > We should document how to use the code in {{flink-hcatalog}}. > Also, there should be an example on how to write to HCatalog using the Hadoop > wrappers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9170) HCatolog integration with Table/SQL API
[ https://issues.apache.org/jira/browse/FLINK-9170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-9170: -- Issue Type: Sub-task (was: New Feature) Parent: FLINK-9171 > HCatolog integration with Table/SQL API > --- > > Key: FLINK-9170 > URL: https://issues.apache.org/jira/browse/FLINK-9170 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Zhenqiu Huang >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9171) Flink HCatolog integration
Shuyi Chen created FLINK-9171: - Summary: Flink HCatolog integration Key: FLINK-9171 URL: https://issues.apache.org/jira/browse/FLINK-9171 Project: Flink Issue Type: Task Reporter: Shuyi Chen This is a parent task for all HCatalog related integration in Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9170) HCatolog integration with Table/SQL API
Shuyi Chen created FLINK-9170: - Summary: HCatolog integration with Table/SQL API Key: FLINK-9170 URL: https://issues.apache.org/jira/browse/FLINK-9170 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Shuyi Chen Assignee: Zhenqiu Huang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-1913) Document how to access data in HCatalog
[ https://issues.apache.org/jira/browse/FLINK-1913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen reassigned FLINK-1913: - Assignee: Zhenqiu Huang > Document how to access data in HCatalog > --- > > Key: FLINK-1913 > URL: https://issues.apache.org/jira/browse/FLINK-1913 > Project: Flink > Issue Type: Bug > Components: Documentation, flink-hcatalog >Reporter: Robert Metzger >Assignee: Zhenqiu Huang >Priority: Major > > Reading from HCatalog was added in FLINK-1466, but not documented > We should document how to use the code in {{flink-hcatalog}}. > Also, there should be an example on how to write to HCatalog using the Hadoop > wrappers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9161) Support AS STRUCT syntax to create named STRUCT in SQL
Shuyi Chen created FLINK-9161: - Summary: Support AS STRUCT syntax to create named STRUCT in SQL Key: FLINK-9161 URL: https://issues.apache.org/jira/browse/FLINK-9161 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Shuyi Chen Assignee: Shuyi Chen As discussed in [calcite dev mailing list|https://mail-archives.apache.org/mod_mbox/calcite-dev/201804.mbox/%3cCAMZk55avGNmp1vXeJwA1B_a8bGyCQ9ahxmE=R=6fklpf7jt...@mail.gmail.com%3e], we want add support for adding named structure construction in SQL, e.g., {code:java} SELECT STRUCT(a as first_name, b as last_name, STRUCT(c as zip code, d as street, e as state) as address) as record FROM example_table {code} This would require adding necessary change in Calcite first. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9161) Support STRUCT syntax to create named STRUCT in SQL
[ https://issues.apache.org/jira/browse/FLINK-9161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-9161: -- Summary: Support STRUCT syntax to create named STRUCT in SQL (was: Support AS STRUCT syntax to create named STRUCT in SQL) > Support STRUCT syntax to create named STRUCT in SQL > --- > > Key: FLINK-9161 > URL: https://issues.apache.org/jira/browse/FLINK-9161 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > As discussed in [calcite dev mailing > list|https://mail-archives.apache.org/mod_mbox/calcite-dev/201804.mbox/%3cCAMZk55avGNmp1vXeJwA1B_a8bGyCQ9ahxmE=R=6fklpf7jt...@mail.gmail.com%3e], > we want add support for adding named structure construction in SQL, e.g., > {code:java} > SELECT STRUCT(a as first_name, b as last_name, STRUCT(c as zip code, d as > street, e as state) as address) as record FROM example_table > {code} > This would require adding necessary change in Calcite first. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9135) Remove AggregateReduceFunctionsRule once CALCITE-2216 is fixed
[ https://issues.apache.org/jira/browse/FLINK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen reassigned FLINK-9135: - Assignee: Shuyi Chen > Remove AggregateReduceFunctionsRule once CALCITE-2216 is fixed > -- > > Key: FLINK-9135 > URL: https://issues.apache.org/jira/browse/FLINK-9135 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.5.0, 1.6.0 >Reporter: Fabian Hueske >Assignee: Shuyi Chen >Priority: Major > > We had to copy and slightly modify {{AggregateReduceFunctionsRule}} from > Calcite to fix FLINK-8903. > We proposed the changes to Calcite as CALCITE-2216. Once this issue is fixed > and we updated to Calcite dependency to a version that includes the fix, we > can remove our custom rule. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9015) Upgrade Calcite dependency to 1.17
[ https://issues.apache.org/jira/browse/FLINK-9015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16426458#comment-16426458 ] Shuyi Chen commented on FLINK-9015: --- Duplicate of [FLINK-9134|https://issues.apache.org/jira/browse/FLINK-9134]. I'll close this one. > Upgrade Calcite dependency to 1.17 > -- > > Key: FLINK-9015 > URL: https://issues.apache.org/jira/browse/FLINK-9015 > Project: Flink > Issue Type: Task >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9015) Upgrade Calcite dependency to 1.17
[ https://issues.apache.org/jira/browse/FLINK-9015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen closed FLINK-9015. - Resolution: Duplicate > Upgrade Calcite dependency to 1.17 > -- > > Key: FLINK-9015 > URL: https://issues.apache.org/jira/browse/FLINK-9015 > Project: Flink > Issue Type: Task >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9134) Update Calcite dependency to 1.17
[ https://issues.apache.org/jira/browse/FLINK-9134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16426455#comment-16426455 ] Shuyi Chen commented on FLINK-9134: --- Actually, let's close [FLINK-9015|https://issues.apache.org/jira/browse/FLINK-9015]. > Update Calcite dependency to 1.17 > - > > Key: FLINK-9134 > URL: https://issues.apache.org/jira/browse/FLINK-9134 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > This is an umbrella issue for tasks that need to be performed when upgrading > to Calcite 1.17 once it is released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-9134) Update Calcite dependency to 1.17
[ https://issues.apache.org/jira/browse/FLINK-9134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen reopened FLINK-9134: --- Assignee: Shuyi Chen > Update Calcite dependency to 1.17 > - > > Key: FLINK-9134 > URL: https://issues.apache.org/jira/browse/FLINK-9134 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > This is an umbrella issue for tasks that need to be performed when upgrading > to Calcite 1.17 once it is released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9134) Update Calcite dependency to 1.17
[ https://issues.apache.org/jira/browse/FLINK-9134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen closed FLINK-9134. - Resolution: Duplicate > Update Calcite dependency to 1.17 > - > > Key: FLINK-9134 > URL: https://issues.apache.org/jira/browse/FLINK-9134 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Priority: Major > > This is an umbrella issue for tasks that need to be performed when upgrading > to Calcite 1.17 once it is released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16426316#comment-16426316 ] Shuyi Chen commented on FLINK-7151: --- I don't have a concrete timeline, but will try to implement the table DDL before Flink 1.6 release. > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9134) Update Calcite dependency to 1.17
[ https://issues.apache.org/jira/browse/FLINK-9134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16425926#comment-16425926 ] Shuyi Chen commented on FLINK-9134: --- Hi [~twalthr], this is duplicate of [FLINK-9015|https://issues.apache.org/jira/browse/FLINK-9015]. I'll merge and close this one. > Update Calcite dependency to 1.17 > - > > Key: FLINK-9134 > URL: https://issues.apache.org/jira/browse/FLINK-9134 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Priority: Major > > This is an umbrella issue for tasks that need to be performed when upgrading > to Calcite 1.17 once it is released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8981) End-to-end test: Kerberos security
[ https://issues.apache.org/jira/browse/FLINK-8981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417997#comment-16417997 ] Shuyi Chen commented on FLINK-8981: --- Setting up a real YARN cluster from the binary might be tricky as it require things like "passphraseless ssh". I am thinking of using the MiniCluster and MiniKDC to do so. I'll prefer to write the test in java and provide a shell script to invoke it, what do you guys think? > End-to-end test: Kerberos security > -- > > Key: FLINK-8981 > URL: https://issues.apache.org/jira/browse/FLINK-8981 > Project: Flink > Issue Type: Sub-task > Components: Security, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Shuyi Chen >Priority: Blocker > Fix For: 1.5.0 > > > We should add an end-to-end test which verifies Flink's integration with > Kerberos security. In order to do this, we should start a Kerberos secured > Hadoop, ZooKeeper and Kafka cluster. Then we should start a Flink cluster > with HA enabled and run a job which reads from and writes to Kafka. We could > use a simple pipe job for that purpose which has some state for checkpointing > to HDFS. > See [security docs| > https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html] > for how more information about Flink's Kerberos integration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8981) End-to-end test: Kerberos security
[ https://issues.apache.org/jira/browse/FLINK-8981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415146#comment-16415146 ] Shuyi Chen commented on FLINK-8981: --- Hi [~tzulitai], I plan to work on it this week. Yes, I agree we should setup a multi-NM YARN cluster. Are you aware of any example programs that I can use that write checkpoints to HDFS? Thanks. > End-to-end test: Kerberos security > -- > > Key: FLINK-8981 > URL: https://issues.apache.org/jira/browse/FLINK-8981 > Project: Flink > Issue Type: Sub-task > Components: Security, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Shuyi Chen >Priority: Blocker > Fix For: 1.5.0 > > > We should add an end-to-end test which verifies Flink's integration with > Kerberos security. In order to do this, we should start a Kerberos secured > Hadoop, ZooKeeper and Kafka cluster. Then we should start a Flink cluster > with HA enabled and run a job which reads from and writes to Kafka. We could > use a simple pipe job for that purpose which has some state for checkpointing > to HDFS. > See [security docs| > https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html] > for how more information about Flink's Kerberos integration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9049) Create unified interfaces to configure and instatiate TableSink
[ https://issues.apache.org/jira/browse/FLINK-9049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414626#comment-16414626 ] Shuyi Chen commented on FLINK-9049: --- Thanks a lot, [~twalthr]. Assigned [FLINK-8866|https://issues.apache.org/jira/browse/FLINK-8866] to me. > Create unified interfaces to configure and instatiate TableSink > --- > > Key: FLINK-9049 > URL: https://issues.apache.org/jira/browse/FLINK-9049 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > This is a similar effort to > [FLINK-8240|https://issues.apache.org/jira/browse/FLINK-8240], we want to > create a unified interface for discovery and instantiation of TableSink, and > later support table DDL in flink. The proposed solution would use similar > approach in [FLINK-8240|https://issues.apache.org/jira/browse/FLINK-8240], > and can re-use most of the implementations already done in > [FLINK-8240|https://issues.apache.org/jira/browse/FLINK-8240]. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "tableType" with values (source, sink and > both) for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen updated FLINK-8866: -- Description: Similar to the efforts done in FLINK-8240. We need unified ways to configure and instantiate TableSinks. Among other applications, this is necessary in order to declare table sinks in an environment file of the SQL client. Such that the sink can be used for {{INSERT INTO}} statements. Below are a few major changes in mind. 1) Add TableSinkFactory/TableSinkFactoryService similar to TableSourceFactory/TableSourceFactoryService 2) Add a common property called "type" with values (source, sink and both) for both TableSource and TableSink. 3) in yaml file, replace "sources" with "tables", and use tableType to identify whether it's source or sink. was: Similar to the efforts done in FLINK-8240. We need unified ways to configure and instantiate TableSinks. Among other applications, this is necessary in order to declare table sinks in an environment file of the SQL client. Such that the sink can be used for {{INSERT INTO}} statements. Below are a few major changes in mind. 1) Add TableSinkFactory/TableSinkFactoryService similar to TableSourceFactory/TableSourceFactoryService 2) Add a common property called "tableType" with values (source, sink and both) for both TableSource and TableSink. 3) in yaml file, replace "sources" with "tables", and use tableType to identify whether it's source or sink. > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "type" with values (source, sink and both) > for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks
[ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen reassigned FLINK-8866: - Assignee: Shuyi Chen (was: Timo Walther) > Create unified interfaces to configure and instatiate TableSinks > > > Key: FLINK-8866 > URL: https://issues.apache.org/jira/browse/FLINK-8866 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > Similar to the efforts done in FLINK-8240. We need unified ways to configure > and instantiate TableSinks. Among other applications, this is necessary in > order to declare table sinks in an environment file of the SQL client. Such > that the sink can be used for {{INSERT INTO}} statements. > Below are a few major changes in mind. > 1) Add TableSinkFactory/TableSinkFactoryService similar to > TableSourceFactory/TableSourceFactoryService > 2) Add a common property called "tableType" with values (source, sink and > both) for both TableSource and TableSink. > 3) in yaml file, replace "sources" with "tables", and use tableType to > identify whether it's source or sink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file
[ https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414618#comment-16414618 ] Shuyi Chen commented on FLINK-9059: --- Hi [~twalthr], thanks a lot for the comments. In the Pull Request, it is actually already using "type" instead of "tableType". Could you please help take a look at the PR? Thanks a lot. > Add support for unified table source and sink declaration in environment file > - > > Key: FLINK-9059 > URL: https://issues.apache.org/jira/browse/FLINK-9059 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Fix For: 1.5.0 > > > 1) Add a common property called "type" with single value 'source'. > 2) in yaml file, replace "sources" with "tables". -- This message was sent by Atlassian JIRA (v7.6.3#76005)