[jira] [Created] (FLINK-34964) ScheduledTask leak in registering the processing timer
zoucao created FLINK-34964: -- Summary: ScheduledTask leak in registering the processing timer Key: FLINK-34964 URL: https://issues.apache.org/jira/browse/FLINK-34964 Project: Flink Issue Type: Bug Affects Versions: 1.20.0 Reporter: zoucao Attachments: image-2024-03-29-16-40-11-928.png I have come across a problem regarding a leak in the 'ScheduledTask' while registering the processing timer. Upon further investigation, I have identified two factors that are responsible for the leak. *1. Registered 'ScheduledTask' has not been canceled* see `org.apache.flink.streaming.api.operators.InternalTimerServiceImpl#deleteProcessingTimeTimer`, when a registered timer want be deleted, flink only removes it from the 'processingTimeTimersQueue'. However, it's possible that this timer is the earliest one that will be triggered in the future and has been scheduled as a task submitted to the ScheduledThreadPoolExecutor. When deleting a registered timer, flink should check whether this timer is the next triggered time, if true, the current 'ScheduledTask' should be canceled. *2. Re-submit a timer earlier than the System.currentTimeMillis* Considering a case, the current time-millis is 100, and there exist 100、101、102 in the processingQueue, timer-100 has been submitted to ScheduledThreadPool. At this moment, the user registers a timer-99. 99 is less than 100(the peek timer in queue), then Flink will cancel timer 100‘s task, and re-register using timer 99. However, before canceling timer-100, the thread pool has submitted it to mailbox. Then, the mail in mailbox is as follows: {code:java} -> * register timer-99 ->trigger timer-100 -> trigger timer-99 {code} - when executing 'trigger timer 100', Flink will flush records whose timer belongs to 99 and 100, then submit timer-101 to the scheduled thread pool. - when executing 'trigger timer-99', no records need to flush, then it also submits timer-101 to the scheduled thread pool, because timer-101 is the next timer needs to trigger. Obviously, Two tasks are registered to Flink's scheduled thread pool with the same timer. In our online job, the number of these leaked Scheduled Tasks could be in the thousands, see the following figure. Here an example is posted, convenient for reproducing the case-2. {code:java} @Test public void testTimerTaskLeak() { TaskMailboxImpl mailbox = new TaskMailboxImpl(); MailboxExecutor mailboxExecutor = new MailboxExecutorImpl( mailbox, 0, StreamTaskActionExecutor.IMMEDIATE); SystemProcessingTimeService processingTimeService = new SystemProcessingTimeService(ex -> handleException(ex)); ProcessingTimeServiceImpl timeService = new ProcessingTimeServiceImpl( processingTimeService, callback -> deferCallbackToMailbox(mailboxExecutor, callback)); TestKeyContext keyContext = new TestKeyContext(); Queue mailQueue = new LinkedBlockingDeque<>(); long curr = System.currentTimeMillis(); InternalTimerServiceImpl timerService = createAndStartInternalTimerService( mock(Triggerable.class), keyContext, timeService, testKeyGroupRange, createQueueFactory()); ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.execute( () -> { try { keyContext.setCurrentKey(1); mailboxExecutor.execute( () -> timerService.registerProcessingTimeTimer("void", curr + 6 * 1000L), "6"); Thread.sleep(2L); mailboxExecutor.execute( () -> timerService.registerProcessingTimeTimer("void", curr + 7 * 1000L), "7"); Thread.sleep(2L); mailboxExecutor.execute( () -> timerService.registerProcessingTimeTimer("void", curr + 8 * 1000L), "8"); Thread.sleep(2L); mailboxExecutor.execute( () -> { timerService.registerProcessingTimeTimer("void", curr + 1); }, "1"); mailboxExecutor.execute( () -> { Thread.sleep(3); // wait timer +1 submitted to mailbox timerService.registerProcessingTimeTimer("void", curr - 5); }, "-5");
[jira] [Created] (FLINK-27953) using the original order to add the primary key in PushProjectIntoTableSourceScanRule
zoucao created FLINK-27953: -- Summary: using the original order to add the primary key in PushProjectIntoTableSourceScanRule Key: FLINK-27953 URL: https://issues.apache.org/jira/browse/FLINK-27953 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.14.4 Reporter: zoucao In PushProjectIntoTableSourceScanRule, if the source produces a changelog stream, the primary key will be added to the end of projected fields, see the following SQL: {code:java} StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault()); TableEnvironment tEnv = util.getTableEnv(); String srcTableDdl = "CREATE TABLE fs (\n" + " a bigint,\n" + " b int,\n" + " c varchar,\n" + " d int,\n" + " e int,\n " + " primary key (a,b) not enforced \n" + ") with (\n" + " 'connector' = 'values',\n" + " 'disable-lookup'='true',\n" + " 'changelog-mode' = 'I,UB,UA,D')"; tEnv.executeSql(srcTableDdl); tEnv.getConfig().set("table.exec.source.cdc-events-duplicate", "true"); {code} {code:java} System.out.println(tEnv.explainSql("select a, c from fs where c > 0 and b = 0")); projected list: [[0],[1],[2]] == Optimized Execution Plan == Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 0)]) +- ChangelogNormalize(key=[a, b]) +- Exchange(distribution=[hash[a, b]]) +- Calc(select=[a, b, c], where=[(b = 0)]) +- DropUpdateBefore +- TableSourceScan(table=[[default_catalog, default_database, fs, filter=[], project=[a, b, c], metadata=[]]], fields=[a, b, c]) {code} {code:java} System.out.println(tEnv.explainSql("select a, c from fs where c > 0")); projected list: [[0],[2],[1]] == Optimized Execution Plan == Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 0)]) +- ChangelogNormalize(key=[a, b]) +- Exchange(distribution=[hash[a, b]]) +- DropUpdateBefore +- TableSourceScan(table=[[default_catalog, default_database, fs, filter=[], project=[a, c, b], metadata=[]]], fields=[a, c, b]) {code} Field b is not involved in {code:sql} select a, c from fs where c > 0{code} , but it is a primary key, so we add it to the end of projected list, If 'table.exec.source.cdc-events-duplicate' is enabled. The condition about field b will change output type, that says the duplicate node will get the different input type, and the state serializer will also be changed, leading to state incompatibility. I think we can use the original order from the source table to add the primary key to projected list. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27911) Set HDFS LEASE_TIMEOUT as user-configurable
zoucao created FLINK-27911: -- Summary: Set HDFS LEASE_TIMEOUT as user-configurable Key: FLINK-27911 URL: https://issues.apache.org/jira/browse/FLINK-27911 Project: Flink Issue Type: Improvement Affects Versions: 1.12.2 Reporter: zoucao Attachments: exceptions The HDFS LEASE_TIMEOUT in *HadoopRecoverableFsDataOutputStream* is set to 100_000 millis, In some cases, 100 seconds is not enough. In our company, when using StreamingFileSink to write records to HDFS and the parallelism is set to 512 or larger, the following exceptions occur frequently. I think we can update the parameter LEASE_TIMEOUT as user-configurable. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27898) fix PartitionPushDown in streaming mode for hive source
zoucao created FLINK-27898: -- Summary: fix PartitionPushDown in streaming mode for hive source Key: FLINK-27898 URL: https://issues.apache.org/jira/browse/FLINK-27898 Project: Flink Issue Type: Bug Reporter: zoucao In hive source, the PartitionPushDown will cause some problems in streaming-mode, we can this test in {*}HiveTableSourceITCase{*} {code:java} @Test public void testPushDown() throws Exception { final String catalogName = "hive"; final String dbName = "source_db"; final String tblName = "stream_test"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10 * 1000); StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE); tEnv.registerCatalog(catalogName, hiveCatalog); tEnv.useCatalog(catalogName); tEnv.executeSql( "CREATE TABLE source_db.stream_test (" + " a INT," + " b STRING" + ") PARTITIONED BY (ts int) TBLPROPERTIES (" + "'streaming-source.enable'='true'," + "'streaming-source.monitor-interval'='10s'," + "'streaming-source.consume-order'='partition-name'," + "'streaming-source.consume-start-offset'='ts=1'" + ")"); HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName) .addRow(new Object[]{0, "a0"}) .addRow(new Object[]{1, "a0"}) .commit("ts=0"); HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName) .addRow(new Object[]{1, "a1"}) .addRow(new Object[]{2, "a1"}) .commit("ts=1"); HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName) .addRow(new Object[]{1, "a2"}) .addRow(new Object[]{2, "a2"}) .commit("ts=2"); System.out.println(tEnv.explainSql("select * from hive.source_db.stream_test where ts > 1")); TableResult result = tEnv.executeSql("select * from hive.source_db.stream_test where ts > 1"); result.print(); ) {code} {code:java} ++-++-+ | op | a | b | ts | ++-++-+ | +I | 1 | a2 | 2 | | +I | 2 | a2 | 2 | | +I | 1 | a1 | 1 | | +I | 2 | a1 | 1 | {code} {code:java} == Abstract Syntax Tree == LogicalProject(a=[$0], b=[$1], ts=[$2]) +- LogicalFilter(condition=[>($2, 1)]) +- LogicalTableScan(table=[[hive, source_db, stream_test]]) == Optimized Physical Plan == TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]], fields=[a, b, ts]) == Optimized Execution Plan == TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]], fields=[a, b, ts]) {code} The PartitionPushDown rule can generate the correct partitions that need to consume by using the existing partition. If the partitions are pushed to the hive source, the filter node will be removed. But hive source will not use the partition info which is pushed down in streaming mode, I think it causes some problems. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-26982) strike a balance between reuse the same RelNode and project/filter/limit/partition push down
zoucao created FLINK-26982: -- Summary: strike a balance between reuse the same RelNode and project/filter/limit/partition push down Key: FLINK-26982 URL: https://issues.apache.org/jira/browse/FLINK-26982 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: zoucao Now, Flink has effective reuse logic to reuse the same RelNode and subplan, but it will lose efficacy in some situations, like project/filter/limit/partition push down, if one of them is enabled, the new source is not the same from old one, so the source can not be reused anymore. For some complicated SQL, many views will be created from the same table, and the scan RelNode can not be reused, such that many of the same threads about reading source data will be created in one task, which will cause the memory problem and sometimes will cause reading amplification. Should we do something to enforce reusing decided by users themselves? The following SQL shows the situation proposed above. {code:java} create table fs( a int, b string, c bigint ) PARTITIONED by ( c )with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = 'file:///tmp/test' ); select * from (select * from fs limit 1) union all (select * from fs where a = 2) union all (select 1, b, c from fs) union all (select 1, b, c from fs where c = 1) {code} == Optimized Execution Plan == {code:java} Union(all=[true], union=[a, b, c]) :- Union(all=[true], union=[a, b, c]) : :- Union(all=[true], union=[a, b, c]) : : :- Limit(offset=[0], fetch=[1]) : : : +- Exchange(distribution=[single]) : : : +- TableSourceScan(table=[[default_catalog, default_database, fs, limit=[1]]], fields=[a, b, c]) : : +- Calc(select=[CAST(2 AS INTEGER) AS a, b, c], where=[(a = 2)]) : : +- TableSourceScan(table=[[default_catalog, default_database, fs, filter=[=(a, 2)]]], fields=[a, b, c]) : +- Calc(select=[CAST(1 AS INTEGER) AS EXPR$0, b, c]) : +- TableSourceScan(table=[[default_catalog, default_database, fs, project=[b, c], metadata=[]]], fields=[b, c]) +- Calc(select=[CAST(1 AS INTEGER) AS EXPR$0, b, CAST(1 AS BIGINT) AS c]) +- TableSourceScan(table=[[default_catalog, default_database, fs, partitions=[{c=1}], project=[b], metadata=[]]], fields=[b]) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26761) Fix the cast exception thrown by PreValidateReWriter when insert into/overwrite a partitioned table.
zoucao created FLINK-26761: -- Summary: Fix the cast exception thrown by PreValidateReWriter when insert into/overwrite a partitioned table. Key: FLINK-26761 URL: https://issues.apache.org/jira/browse/FLINK-26761 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: zoucao In `PreValidateReWriter#appendPartitionAndNullsProjects`, we should use {code:java} val names = sqlInsert.getTargetTableID.asInstanceOf[SqlIdentifier].names {code} to get the table name, instead of {code:java} val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names {code} when we execute the following sql: {code:java} insert into/overwrite table_name /*+ options(xxx) */ partition(xxx) select {code} invoke `sqlInsert.getTargetTable` will get a SqlTableRef, which can not be cast to SqlIdentifier. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26726) Remove the unregistered task from readersAwaitingSplit
zoucao created FLINK-26726: -- Summary: Remove the unregistered task from readersAwaitingSplit Key: FLINK-26726 URL: https://issues.apache.org/jira/browse/FLINK-26726 Project: Flink Issue Type: Improvement Components: Table SQL / Ecosystem Reporter: zoucao Attachments: stack.txt Recently, we faced a problem caused by the unregistered task when using the hive table as a source to do streaming reading. I think the problem is that we do not remove the unregistered task from `readersAwaitingSplit` in `ContinuousHiveSplitEnumerator` and `ContinuousFileSplitEnumerator`. Assuming that we have two tasks 0 and 1, they all exist in `readersAwaitingSplit`, if there does not exist any new file in the path for a long time. Then, a new split is generated, and it is assigned to task-1. Unfortunately, task-1 can not consume the split successfully, and the exception will be thrown and cause all tasks to restart. The failover will not affect the `readersAwaitingSplit`, but it will clear the `SourceCoordinatorContext#registeredReaders`. After restarting, task-0 exists in `readersAwaitingSplit` but not in `registeredReaders`. if task-1 register first and send the request to get split, the SplitEnumerator will assign splits for both task-1 and task-0, but task-0 has not been registered. The stack exists in the attachment. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26723) Fix the content of exception in SourceCoordinatorContext
zoucao created FLINK-26723: -- Summary: Fix the content of exception in SourceCoordinatorContext Key: FLINK-26723 URL: https://issues.apache.org/jira/browse/FLINK-26723 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: zoucao the exception in `SourceCoordinatorContext#SourceCoordinatorContext` will always be "Cannot assign splits null to subtask xxx because the subtask is not registered." We should fix it by using splits info. [see|https://github.com/apache/flink/blob/ccbb05ea4f11aac51103cadd13a6a2e38e319e8b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java#L202] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26401) improve the compatibility for hive catalogView
zoucao created FLINK-26401: -- Summary: improve the compatibility for hive catalogView Key: FLINK-26401 URL: https://issues.apache.org/jira/browse/FLINK-26401 Project: Flink Issue Type: Improvement Components: Connectors / Hive Reporter: zoucao For now, many reserved keywords were kept for FlinkSQL[1], and some of them are different from other bigdata systems, like spark[2], hive[3]. In this context, users may be affected by the compatibility of various systems. For example, the keyword 'DATE' is marked as reserved by Flink and Hive, but not in spark. If the hive view is defined as {code:java} select a, b, date from hive_tb where date = '2022-02-28' {code} It will work well in spark, but in flink, the SqlParserException about ' SQL parse failed. Encountered "date at line xxx, column xxx '. IIUC, If we do nothing to improve the compatibility, users must change the query about the hive view to {code:java} select a, b, `date` from hive_tb where date = '2022-02-28' {code} , to make Flink work. I think we can add a converter to surround reserved keywords with backticks before parsing. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/overview/ [2] https://spark.apache.org/docs/3.2.1/sql-ref-ansi-compliance.html#sql-keywords [3] https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Keywords,Non-reservedKeywordsandReservedKeywords -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26355) VarCharType was not be considered in HiveTableSqlFunction
zoucao created FLINK-26355: -- Summary: VarCharType was not be considered in HiveTableSqlFunction Key: FLINK-26355 URL: https://issues.apache.org/jira/browse/FLINK-26355 Project: Flink Issue Type: Improvement Components: Table SQL / Ecosystem Reporter: zoucao VarCharType was not be considered in `HiveTableSqlFunction#coerce`, see [link|https://github.com/apache/flink/blob/a7192af8707f3f0d0f30fc71f3477edd92135cac/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java#L146], before invoke `HiveTableSqlFunction#coerce`, flink will call the method `createFieldTypeFromLogicalType` to build argumentsArray, if the field's type is varchar, the exception will be thrown. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25476) CharType lost in the creation of MaxAggFunction & MinAggFunction
zoucao created FLINK-25476: -- Summary: CharType lost in the creation of MaxAggFunction & MinAggFunction Key: FLINK-25476 URL: https://issues.apache.org/jira/browse/FLINK-25476 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.15.0 Reporter: zoucao CharType lost in AggFunctionFactory#createMinAggFunction(Retract) and AggFunctionFactory#createMaxAggFunction(Retract). If execute {code:java} SELECT Max('a') ... {code} , The following exception will throw. Caused by: org.apache.flink.table.api.TableException: Max aggregate function does not support type: ''CHAR''. Please re-check the data type. at org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createMaxAggFunction(AggFunctionFactory.scala:395) at org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:76) at org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:444) -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24958) correct the example and link for temporal table function documentation
zoucao created FLINK-24958: -- Summary: correct the example and link for temporal table function documentation Key: FLINK-24958 URL: https://issues.apache.org/jira/browse/FLINK-24958 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.14.0 Reporter: zoucao Fix For: 1.15.0 correct the example and link for temporal table function documentation -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-23865) Class cast error caused by nested Pojo in legacy outputConversion
zoucao created FLINK-23865: -- Summary: Class cast error caused by nested Pojo in legacy outputConversion Key: FLINK-23865 URL: https://issues.apache.org/jira/browse/FLINK-23865 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.13.2 Reporter: zoucao code: {code:java} Table table = tbEnv.fromValues(DataTypes.ROW( DataTypes.FIELD("innerPojo", DataTypes.ROW(DataTypes.FIELD("c", STRING(, DataTypes.FIELD("b", STRING()), DataTypes.FIELD("a", INT())), Row.of(Row.of("str-c"), "str-b", 1)); DataStream pojoDataStream = tbEnv.toAppendStream(table, Pojo.class); - public static class Pojo{ public InnerPojo innerPojo; public String b; public int a; public Pojo() { } } public static class InnerPojo { public String c; public InnerPojo() { } }{code} error: {code:java} java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType cannot be cast to org.apache.flink.table.types.logical.RowTypejava.lang.ClassCastException: org.apache.flink.table.types.logical.IntType cannot be cast to org.apache.flink.table.types.logical.RowType at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:163) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:155) {code} The fields of PojoTypeInfo is in the alphabet order, such that in `expandPojoTypeToSchema`, 'pojoType' and 'queryLogicalType' should have own index,but now we use the pojo field index to get 'queryLogicalType', this will casue the field type mismatch. It should be fixed like : {code:java} val queryIndex = queryLogicalType.getFieldIndex(name) val nestedLogicalType = queryLogicalType.getFields()(queryIndex).getType.asInstanceOf[RowType]{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23777) Support dynamic parameters for sink table in multiple connectors
zoucao created FLINK-23777: -- Summary: Support dynamic parameters for sink table in multiple connectors Key: FLINK-23777 URL: https://issues.apache.org/jira/browse/FLINK-23777 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch, Connectors / Kafka Reporter: zoucao For now, metadata has been supported in SQL, it is simple, elegant and easy to use. But it is mostly used to get meta info from source table for now, I think we could use it more to support dynamic parameters for sink table. For example, * In kafka sink, we can use metadata to achieve dynamic partitions,let record choose it's partition by calculation logic. {code:java} CREATE TABLE KafkaSinkTable ( `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, `partition` INT METADATA FROM 'partition' ) WITH ( 'connector' = 'kafka' ) {code} * In Elasticsearch sink, we also can use metadata to support dynamic index, like {code:java} CREATE TABLE ESSinkTable ( index VARCHAR METADATA FROM 'index', log_ts VARCHAR ) with ( 'connector' = 'elasticsearch-7' ) {code} which is more elegant than treating a specific field as partial index like `myusers-\{log_ts|-MM-dd}`. If there are some opinions or other dynamic parameters worth to do, it could be propsed and dicussed here. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23597) support Add Jar in Table api
zoucao created FLINK-23597: -- Summary: support Add Jar in Table api Key: FLINK-23597 URL: https://issues.apache.org/jira/browse/FLINK-23597 Project: Flink Issue Type: Improvement Reporter: zoucao -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23511) Fix the location error in metric page.
zoucao created FLINK-23511: -- Summary: Fix the location error in metric page. Key: FLINK-23511 URL: https://issues.apache.org/jira/browse/FLINK-23511 Project: Flink Issue Type: Improvement Components: Documentation Reporter: zoucao Attachments: image-2021-07-27-15-17-57-654.png This error exist in [master|https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#default-shuffle-service] and [1.13|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/] The affected metrics are : * 'inputFloatingBuffersUsage' * 'inputExclusiveBuffersUsage' * 'maxQueueLen' * 'avgQueueLen' * 'backPressuredTimeMsPerSecond' * 'busyTimeMsPerSecond' !image-2021-07-27-15-17-57-654.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23510) add a new metric to show the max usage of inputExclusiveBuffer for each inputChannel
zoucao created FLINK-23510: -- Summary: add a new metric to show the max usage of inputExclusiveBuffer for each inputChannel Key: FLINK-23510 URL: https://issues.apache.org/jira/browse/FLINK-23510 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Reporter: zoucao Now, the 'inPoolUsage' and 'inputExclusiveBuffersUsage' describe the overall situation of a task's network usage. Sometimes, when the back pressure was present, I found the value of 'inPoolUsage' or 'inputExclusiveBuffersUsage' were very low, beacuse they were averaged by the input channel whose buffer usage is low. I propose to add a new metric for each task to record the maximum buffer usage of all input channels belonged to this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23156) Change the reference of 'docs/dev/table/sql/queries'
zoucao created FLINK-23156: -- Summary: Change the reference of 'docs/dev/table/sql/queries' Key: FLINK-23156 URL: https://issues.apache.org/jira/browse/FLINK-23156 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.13.0, 1.14.0 Reporter: zoucao The ref of [docs/dev/table/sql/queries|https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/overview/] points to a page without any content, should we change it to [/docs/dev/table/sql/queries/overview/|https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/overview/] ? The problem exists in En/Zh doc with branch 1.13 and master -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22437) Miss adding parallesim for filter operator in batch mode
zoucao created FLINK-22437: -- Summary: Miss adding parallesim for filter operator in batch mode Key: FLINK-22437 URL: https://issues.apache.org/jira/browse/FLINK-22437 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.12.2 Reporter: zoucao when I execute batch sql as follow in flink-1.12.2, I found lots of small files in hdfs. In filesystem connector, `GroupedPartitionWriter` will be used, and it close the last partiton if a new record does not belong to the existing partition. The phenomenon occurred if there are more than one partiton' records are sent to filesystem sink at the same time. Hive source can determine parallesim by the number of file and partiton, and the parallesim will extended by sort operator, but in `CommonPhysicalSink#createSinkTransformation`,a filter operator will be add to support `SinkNotNullEnforcer`, There is no parallesim set for it, so filesystem sink operator can not get the correct parallesim from inputstream. {code:java} CREATE CATALOG myHive with ( 'type'='hive', 'property-version'='1', 'default-database' = 'flink_sql_online_test' ); -- SET table.sql-dialect=hive; -- CREATE TABLE IF NOT EXISTS myHive.flink_sql_online_test.hive_sink ( --`timestamp` BIGINT, --`time` STRING, --id BIGINT, --product STRING, --price DOUBLE, --canSell STRING, --selledNum BIGINT -- ) PARTITIONED BY ( --dt STRING, --`hour` STRING, -- `min` STRING -- ) TBLPROPERTIES ( --'partition.time-extractor.timestamp-pattern'='$dt $hr:$min:00', --'sink.partition-commit.trigger'='partition-time', --'sink.partition-commit.delay'='1 min', --'sink.partition-commit.policy.kind'='metastore,success-file' -- ); create table fs_sink ( `timestamp` BIGINT, `time` STRING, id BIGINT, product STRING, price DOUBLE, canSell STRING, selledNum BIGINT, dt STRING, `hour` STRING, `min` STRING ) PARTITIONED BY (dt, `hour`, `min`) with ( 'connector'='filesystem', 'path'='hdfs://', 'format'='csv' ); insert into fs_sink select * from myHive.flink_sql_online_test.hive_sink; {code} I think this problem can be fixed by add a parallesim for it just like {code:java} val dataStream = new DataStream(env, inputTransformation).filter(enforcer) .setParallelism(inputTransformation.getParallelism) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21738) reduce unnecessary method calls in ModuleManager
zoucao created FLINK-21738: -- Summary: reduce unnecessary method calls in ModuleManager Key: FLINK-21738 URL: https://issues.apache.org/jira/browse/FLINK-21738 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: zoucao In flink sql, if we use many functions(hive func or flink built-in func), Flink will call method `getFunctionDefinition` in [ModuleManager|https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java#L44] many times to load func and each module's method `listFunctions` will be called at the same time. I think the same result will be returned for one module, so maybe a cache should be used here to reduce time waste. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20827) Just read record correlating to join key in FilesystemLookUpFunc
zoucao created FLINK-20827: -- Summary: Just read record correlating to join key in FilesystemLookUpFunc Key: FLINK-20827 URL: https://issues.apache.org/jira/browse/FLINK-20827 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Reporter: zoucao When using Temporal table join, all hive tables' records will be loaded into cache. But sometimes, the size of hive temporal table is larger than expected, and users can't know exactly how big it is in memory. In this situation, some error will occur, for example, `GC overhead limit exceeded`, `the heartbeat of TaskManager timeout (caused by gc)`. Maybe we can optimize the number of records readed from hive table? If the upstream records can be hashed only by using `Join key`, then we only need to load the part of records into cache, whose value of join key after being hashed, is equal to one fixed hash value. If it can be done, the whole table can be divided by the number of parallelism. I don't know whether it could come true In the upstream under the existing framework, but It is easy to support in `FileSystemLookupFunction` If not, we can add some log to tell others the size of cache to help them to set MemorySize or other parameter of TM. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20795) add a parameter to decide whether or not print dirty record when `ignore-parse-errors` is true
zoucao created FLINK-20795: -- Summary: add a parameter to decide whether or not print dirty record when `ignore-parse-errors` is true Key: FLINK-20795 URL: https://issues.apache.org/jira/browse/FLINK-20795 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.13.0 Reporter: zoucao add a parameter to decide whether or not to print dirty data when `ignore-parse-errors`=true, some users want to make his task stability and know the dirty record to fix the upstream, too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20701) print connector supports outputting seriailzed record
zoucao created FLINK-20701: -- Summary: print connector supports outputting seriailzed record Key: FLINK-20701 URL: https://issues.apache.org/jira/browse/FLINK-20701 Project: Flink Issue Type: Improvement Components: Table SQL / Ecosystem Affects Versions: 1.13.0 Reporter: zoucao Hi all, print connector is designed for debugging,and it gives us better access to data. If I want to debug source connector and SQL statement, I will use it to print the result. However, I don't have a good choice to observe the seriailzed record, which will be sent to external connector when debugging. Consequently, we 'd better make print connetcor support outputting seriailzed record. We can use format=XXX to identify a seriailzed schema, help user to debug and understand the process of serialization. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20546) Misuse of the method in KafkaDynamicTableFactoryTest
zoucao created FLINK-20546: -- Summary: Misuse of the method in KafkaDynamicTableFactoryTest Key: FLINK-20546 URL: https://issues.apache.org/jira/browse/FLINK-20546 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.12.0 Reporter: zoucao hi all, I am reading something about DynamicTable Source/Sink Factory recently, and one thing let me confused. In `KafkaDynamicTableFactoryTest`, `getBasicSourceOptions()` and `getBasicSinkOptions()` are created for source table and sink table, respectively. However, in the situation of validating sink table, `getBasicSourceOptions()` is called, for example in method `testSinkWithTopicListOrTopicPattern`、`testInvalidSinkSemantic` etc. Although no exception is thrown, it also should be fixed. WDYT? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19893) stop consuming in specific offset or timestamp
zoucao created FLINK-19893: -- Summary: stop consuming in specific offset or timestamp Key: FLINK-19893 URL: https://issues.apache.org/jira/browse/FLINK-19893 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Reporter: zoucao Hi all, I think something new for kafka connector should be added. When using flink to consume message from MQ, there exists a large number of message in topic, not been consumed, and the production rate is equal to consumption rate,such that we have a high latency. In that condition, two task need to be started, one consumes the latest message, the other consumes the earliest message and stop consuming in the current offset or timestamp. May be we should support kafak connector in stopping consuming in specific offset or timestamp. -- This message was sent by Atlassian Jira (v8.3.4#803005)