[jira] [Created] (FLINK-34964) ScheduledTask leak in registering the processing timer

2024-03-29 Thread zoucao (Jira)
zoucao created FLINK-34964:
--

 Summary: ScheduledTask leak in registering the processing timer
 Key: FLINK-34964
 URL: https://issues.apache.org/jira/browse/FLINK-34964
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.20.0
Reporter: zoucao
 Attachments: image-2024-03-29-16-40-11-928.png

I have come across a problem regarding a leak in the 'ScheduledTask' while 
registering the processing timer. Upon further investigation, I have identified 
two factors that are responsible for the leak.


*1. Registered 'ScheduledTask' has not been canceled*


see 
`org.apache.flink.streaming.api.operators.InternalTimerServiceImpl#deleteProcessingTimeTimer`,
 when a registered timer want be deleted, flink only removes it from the 
'processingTimeTimersQueue'. However, it's possible that this timer is the 
earliest one that will be triggered in the future and has been scheduled as a 
task submitted to the ScheduledThreadPoolExecutor.

When deleting a registered timer, flink should check whether this timer is the 
next triggered time, if true, the current 'ScheduledTask' should be canceled.

 

*2. Re-submit a timer earlier than the System.currentTimeMillis*

Considering a case, the current time-millis is 100, and there exist 100、101、102 
in the processingQueue, timer-100 has been submitted to ScheduledThreadPool. At 
this moment, the user registers a timer-99. 99 is less than 100(the peek timer 
in queue), then Flink will cancel timer 100‘s task, and re-register using timer 
99. However, before canceling timer-100, the thread pool has submitted it to 
mailbox.
Then, the mail in mailbox is as follows:
{code:java}
 ->  * register timer-99
 ->trigger timer-100
-> trigger timer-99
{code}
 - when executing 'trigger timer 100', Flink will flush records whose timer 
belongs to 99 and 100, then submit timer-101 to the scheduled thread pool.
 - when executing 'trigger timer-99', no records need to flush, then it also 
submits timer-101 to the scheduled thread pool, because timer-101 is the next 
timer needs to trigger.
Obviously, Two tasks are registered to Flink's scheduled thread pool with the 
same timer.

In our online job, the number of these leaked Scheduled Tasks could be in the 
thousands, see the following figure.

 

Here an example is posted, convenient for reproducing the case-2.
{code:java}
@Test
public void testTimerTaskLeak() {
TaskMailboxImpl mailbox = new TaskMailboxImpl();
MailboxExecutor mailboxExecutor =
new MailboxExecutorImpl(
mailbox, 0, StreamTaskActionExecutor.IMMEDIATE);
SystemProcessingTimeService processingTimeService =
new SystemProcessingTimeService(ex -> handleException(ex));

ProcessingTimeServiceImpl timeService = new ProcessingTimeServiceImpl(
processingTimeService,
callback -> deferCallbackToMailbox(mailboxExecutor, callback));

TestKeyContext keyContext = new TestKeyContext();

Queue mailQueue = new LinkedBlockingDeque<>();
long curr = System.currentTimeMillis();
InternalTimerServiceImpl timerService =
createAndStartInternalTimerService(
mock(Triggerable.class),
keyContext,
timeService,
testKeyGroupRange,
createQueueFactory());

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(
() -> {
try {
keyContext.setCurrentKey(1);
mailboxExecutor.execute(
() -> 
timerService.registerProcessingTimeTimer("void", curr + 6 * 1000L), "6");

Thread.sleep(2L);

mailboxExecutor.execute(
() -> 
timerService.registerProcessingTimeTimer("void", curr + 7 * 1000L), "7");
Thread.sleep(2L);

mailboxExecutor.execute(
() -> 
timerService.registerProcessingTimeTimer("void", curr + 8 * 1000L), "8");
Thread.sleep(2L);
mailboxExecutor.execute(
() -> {

timerService.registerProcessingTimeTimer("void", curr + 1);
}, "1");

mailboxExecutor.execute(
() -> {
Thread.sleep(3); // wait timer +1 
submitted to mailbox

timerService.registerProcessingTimeTimer("void", curr - 5);
}, "-5");

[jira] [Created] (FLINK-27953) using the original order to add the primary key in PushProjectIntoTableSourceScanRule

2022-06-08 Thread zoucao (Jira)
zoucao created FLINK-27953:
--

 Summary: using the original order to add the primary key in 
PushProjectIntoTableSourceScanRule
 Key: FLINK-27953
 URL: https://issues.apache.org/jira/browse/FLINK-27953
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.14.4
Reporter: zoucao


In PushProjectIntoTableSourceScanRule, if the source produces a changelog 
stream, the primary key will be added to the end of projected fields, see the 
following SQL:
{code:java}
StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
TableEnvironment tEnv = util.getTableEnv();
String srcTableDdl =
"CREATE TABLE fs (\n"
+ "  a bigint,\n"
+ "  b int,\n"
+ "  c varchar,\n"
+ "  d int,\n"
+ "  e int,\n "
+ "  primary key (a,b) not enforced \n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'disable-lookup'='true',\n"
+ " 'changelog-mode' = 'I,UB,UA,D')";
tEnv.executeSql(srcTableDdl);
tEnv.getConfig().set("table.exec.source.cdc-events-duplicate", "true");
{code}
{code:java}
 System.out.println(tEnv.explainSql("select a, c from fs where c > 0 and b = 
0"));

projected list:
[[0],[1],[2]]

== Optimized Execution Plan ==
Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 0)])
+- ChangelogNormalize(key=[a, b])
   +- Exchange(distribution=[hash[a, b]])
  +- Calc(select=[a, b, c], where=[(b = 0)])
 +- DropUpdateBefore
+- TableSourceScan(table=[[default_catalog, default_database, fs, 
filter=[], project=[a, b, c], metadata=[]]], fields=[a, b, c])
{code}
{code:java}
 System.out.println(tEnv.explainSql("select a, c from fs where c > 0")); 

projected list:
[[0],[2],[1]]

 == Optimized Execution Plan ==
Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 0)])
+- ChangelogNormalize(key=[a, b])
   +- Exchange(distribution=[hash[a, b]])
  +- DropUpdateBefore
 +- TableSourceScan(table=[[default_catalog, default_database, fs, 
filter=[], project=[a, c, b], metadata=[]]], fields=[a, c, b])
{code}
Field b is not involved in
{code:sql}
select a, c from fs where c > 0{code}
, but it is a primary key, so we add it to the end of projected list, If 
'table.exec.source.cdc-events-duplicate' is enabled. The condition about field 
b will change output type, that says the duplicate node will get the different 
input type, and the state serializer will also be changed, leading to state 
incompatibility.

I think we can use the original order from the source table to add the primary 
key to projected list.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27911) Set HDFS LEASE_TIMEOUT as user-configurable

2022-06-06 Thread zoucao (Jira)
zoucao created FLINK-27911:
--

 Summary: Set HDFS LEASE_TIMEOUT as user-configurable
 Key: FLINK-27911
 URL: https://issues.apache.org/jira/browse/FLINK-27911
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.12.2
Reporter: zoucao
 Attachments: exceptions

The HDFS LEASE_TIMEOUT in *HadoopRecoverableFsDataOutputStream* is set to 
100_000 millis, In some cases, 100 seconds is not enough. In our company, when 
using StreamingFileSink to write records to HDFS and the parallelism is set to 
512 or larger, the following exceptions occur frequently. I think we can update 
the parameter LEASE_TIMEOUT as user-configurable.





--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27898) fix PartitionPushDown in streaming mode for hive source

2022-06-05 Thread zoucao (Jira)
zoucao created FLINK-27898:
--

 Summary: fix PartitionPushDown in streaming mode for hive source
 Key: FLINK-27898
 URL: https://issues.apache.org/jira/browse/FLINK-27898
 Project: Flink
  Issue Type: Bug
Reporter: zoucao


In hive source, the PartitionPushDown will cause some problems in 
streaming-mode, we can this test in {*}HiveTableSourceITCase{*}

{code:java}
@Test
public void testPushDown() throws Exception {
final String catalogName = "hive";
final String dbName = "source_db";
final String tblName = "stream_test";
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10 * 1000);
StreamTableEnvironment tEnv =
HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
tEnv.registerCatalog(catalogName, hiveCatalog);
tEnv.useCatalog(catalogName);
tEnv.executeSql(
"CREATE TABLE source_db.stream_test ("
+ " a INT,"
+ " b STRING"
+ ") PARTITIONED BY (ts int) TBLPROPERTIES ("
+ "'streaming-source.enable'='true',"
+ "'streaming-source.monitor-interval'='10s',"
+ "'streaming-source.consume-order'='partition-name',"
+ "'streaming-source.consume-start-offset'='ts=1'"
+ ")");

HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
.addRow(new Object[]{0, "a0"})
.addRow(new Object[]{1, "a0"})
.commit("ts=0");
HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
.addRow(new Object[]{1, "a1"})
.addRow(new Object[]{2, "a1"})
.commit("ts=1");

HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
.addRow(new Object[]{1, "a2"})
.addRow(new Object[]{2, "a2"})
.commit("ts=2");
System.out.println(tEnv.explainSql("select * from hive.source_db.stream_test 
where ts > 1"));
TableResult result = tEnv.executeSql("select * from hive.source_db.stream_test 
where ts > 1");
result.print();
)
{code}

{code:java}
++-++-+
| op |   a |  b |  ts |
++-++-+
| +I |   1 | a2 |   2 |
| +I |   2 | a2 |   2 |
| +I |   1 | a1 |   1 |
| +I |   2 | a1 |   1 |
{code}

{code:java}
== Abstract Syntax Tree ==
LogicalProject(a=[$0], b=[$1], ts=[$2])
+- LogicalFilter(condition=[>($2, 1)])
   +- LogicalTableScan(table=[[hive, source_db, stream_test]])

== Optimized Physical Plan ==
TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]], 
fields=[a, b, ts])

== Optimized Execution Plan ==
TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]], 
fields=[a, b, ts])
{code}

The PartitionPushDown rule can generate the correct partitions that need to 
consume by using the existing partition. If the partitions are pushed to the 
hive source, the filter node will be removed. But hive source will not use the 
partition info which is pushed down in streaming mode, I think it causes some 
problems.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-26982) strike a balance between reuse the same RelNode and project/filter/limit/partition push down

2022-04-01 Thread zoucao (Jira)
zoucao created FLINK-26982:
--

 Summary:  strike a balance between reuse the same RelNode and 
project/filter/limit/partition push down
 Key: FLINK-26982
 URL: https://issues.apache.org/jira/browse/FLINK-26982
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: zoucao


Now, Flink has effective reuse logic to reuse the same RelNode and subplan, but 
it will lose efficacy in some situations, like project/filter/limit/partition 
push down, if one of them is enabled, the new source is not the same from old 
one, so the source can not be reused anymore. 
For some complicated SQL, many views will be created from the same table, and 
the scan RelNode can not be reused, such that many of the same threads about 
reading source data will be created in one task, which will cause the memory 
problem and sometimes will cause reading amplification.
Should we do something to enforce reusing decided by users themselves?
The following SQL shows the situation proposed above.

{code:java}
create table fs(
a int,
b string,
c  bigint
) PARTITIONED by ( c )with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = 'file:///tmp/test'
);
select * from
   (select * from fs limit 1)
union all
   (select * from fs where a = 2)
union all
   (select 1, b, c from fs)
union all
   (select 1, b, c from fs where c = 1)
{code}
== Optimized Execution Plan ==
{code:java}
Union(all=[true], union=[a, b, c])
:- Union(all=[true], union=[a, b, c])
:  :- Union(all=[true], union=[a, b, c])
:  :  :- Limit(offset=[0], fetch=[1])
:  :  :  +- Exchange(distribution=[single])
:  :  : +- TableSourceScan(table=[[default_catalog, default_database, fs, 
limit=[1]]], fields=[a, b, c])
:  :  +- Calc(select=[CAST(2 AS INTEGER) AS a, b, c], where=[(a = 2)])
:  : +- TableSourceScan(table=[[default_catalog, default_database, fs, 
filter=[=(a, 2)]]], fields=[a, b, c])
:  +- Calc(select=[CAST(1 AS INTEGER) AS EXPR$0, b, c])
: +- TableSourceScan(table=[[default_catalog, default_database, fs, 
project=[b, c], metadata=[]]], fields=[b, c])
+- Calc(select=[CAST(1 AS INTEGER) AS EXPR$0, b, CAST(1 AS BIGINT) AS c])
   +- TableSourceScan(table=[[default_catalog, default_database, fs, 
partitions=[{c=1}], project=[b], metadata=[]]], fields=[b])
{code}





--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26761) Fix the cast exception thrown by PreValidateReWriter when insert into/overwrite a partitioned table.

2022-03-21 Thread zoucao (Jira)
zoucao created FLINK-26761:
--

 Summary: Fix the cast exception thrown by PreValidateReWriter when 
insert into/overwrite a partitioned table.
 Key: FLINK-26761
 URL: https://issues.apache.org/jira/browse/FLINK-26761
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: zoucao


In `PreValidateReWriter#appendPartitionAndNullsProjects`, we should use
{code:java}
val names = sqlInsert.getTargetTableID.asInstanceOf[SqlIdentifier].names
{code}
to get the table name, instead of
{code:java}
val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
{code}
when we execute the following sql:
{code:java}
insert into/overwrite table_name /*+ options(xxx) */ partition(xxx) select  
{code}
invoke `sqlInsert.getTargetTable` will get a SqlTableRef, which can not be cast 
to SqlIdentifier.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26726) Remove the unregistered task from readersAwaitingSplit

2022-03-18 Thread zoucao (Jira)
zoucao created FLINK-26726:
--

 Summary: Remove the unregistered  task from readersAwaitingSplit
 Key: FLINK-26726
 URL: https://issues.apache.org/jira/browse/FLINK-26726
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Ecosystem
Reporter: zoucao
 Attachments: stack.txt

Recently, we faced a problem caused by the unregistered task when using the 
hive table as a source to do streaming reading. 
I think the problem is that we do not remove the unregistered  task from 
`readersAwaitingSplit` in `ContinuousHiveSplitEnumerator` and 
`ContinuousFileSplitEnumerator`.

Assuming that we have two tasks 0 and 1, they all exist in 
`readersAwaitingSplit`,  if there does not exist any new file in the path for a 
long time. Then, a new split is generated, and it is assigned to task-1. 
Unfortunately, task-1 can not consume the split successfully, and the exception 
will be thrown and cause all tasks to restart. The failover will not affect the 
`readersAwaitingSplit`, but it will clear the 
`SourceCoordinatorContext#registeredReaders`.
After restarting, task-0 exists in `readersAwaitingSplit` but not in 
`registeredReaders`. if task-1 register first and send the request to get 
split, the SplitEnumerator will assign splits for both task-1 and task-0, but 
task-0 has not been registered.


The stack exists in the attachment.








--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26723) Fix the content of exception in SourceCoordinatorContext

2022-03-18 Thread zoucao (Jira)
zoucao created FLINK-26723:
--

 Summary: Fix the content of exception in SourceCoordinatorContext
 Key: FLINK-26723
 URL: https://issues.apache.org/jira/browse/FLINK-26723
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: zoucao


the exception in `SourceCoordinatorContext#SourceCoordinatorContext` will 
always be 
"Cannot assign splits null to subtask xxx because the subtask is not 
registered."

We should fix it by using splits info.
[see|https://github.com/apache/flink/blob/ccbb05ea4f11aac51103cadd13a6a2e38e319e8b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java#L202]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26401) improve the compatibility for hive catalogView

2022-02-28 Thread zoucao (Jira)
zoucao created FLINK-26401:
--

 Summary: improve the compatibility for hive catalogView
 Key: FLINK-26401
 URL: https://issues.apache.org/jira/browse/FLINK-26401
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Reporter: zoucao


For now, many reserved keywords were kept for FlinkSQL[1], and some of them are 
different from other bigdata systems, like spark[2], hive[3]. In this context, 
users may be affected by the compatibility of various systems. 
For example, the keyword 'DATE' is marked as reserved by Flink and Hive, but 
not in spark. If the hive view is defined as 
{code:java}
select a, b, date from hive_tb where date = '2022-02-28' 
{code}
It will work well in spark, but in flink, the SqlParserException about ' SQL 
parse failed. Encountered "date at line xxx, column xxx '. IIUC, If we do 
nothing to improve the compatibility, users must change the query about the 
hive view to 
{code:java}
select a, b, `date` from hive_tb where date = '2022-02-28' 
{code}
, to make Flink work.
I think we can add a converter to surround reserved keywords with backticks 
before parsing.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/overview/
[2] 
https://spark.apache.org/docs/3.2.1/sql-ref-ansi-compliance.html#sql-keywords
[3] 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Keywords,Non-reservedKeywordsandReservedKeywords



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26355) VarCharType was not be considered in HiveTableSqlFunction

2022-02-24 Thread zoucao (Jira)
zoucao created FLINK-26355:
--

 Summary: VarCharType was not be considered in HiveTableSqlFunction
 Key: FLINK-26355
 URL: https://issues.apache.org/jira/browse/FLINK-26355
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Ecosystem
Reporter: zoucao


VarCharType was not be considered in `HiveTableSqlFunction#coerce`, see 
[link|https://github.com/apache/flink/blob/a7192af8707f3f0d0f30fc71f3477edd92135cac/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java#L146],
 before invoke `HiveTableSqlFunction#coerce`, flink will call the method 
`createFieldTypeFromLogicalType` to build argumentsArray, if the field's type 
is varchar, the exception will be thrown.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25476) CharType lost in the creation of MaxAggFunction & MinAggFunction

2021-12-29 Thread zoucao (Jira)
zoucao created FLINK-25476:
--

 Summary: CharType lost in the creation of MaxAggFunction & 
MinAggFunction
 Key: FLINK-25476
 URL: https://issues.apache.org/jira/browse/FLINK-25476
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.15.0
Reporter: zoucao


CharType lost in AggFunctionFactory#createMinAggFunction(Retract) and 
AggFunctionFactory#createMaxAggFunction(Retract).
If execute 
{code:java}
SELECT Max('a') ...
{code}
 , The following exception will throw.



Caused by: org.apache.flink.table.api.TableException: Max aggregate function 
does not support type: ''CHAR''.
Please re-check the data type.
at 
org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createMaxAggFunction(AggFunctionFactory.scala:395)
at 
org.apache.flink.table.planner.plan.utils.AggFunctionFactory.createAggFunction(AggFunctionFactory.scala:76)
at 
org.apache.flink.table.planner.plan.utils.AggregateUtil$.$anonfun$transformToAggregateInfoList$1(AggregateUtil.scala:444)





--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24958) correct the example and link for temporal table function documentation

2021-11-18 Thread zoucao (Jira)
zoucao created FLINK-24958:
--

 Summary: correct the example and link for temporal table function 
documentation 
 Key: FLINK-24958
 URL: https://issues.apache.org/jira/browse/FLINK-24958
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.14.0
Reporter: zoucao
 Fix For: 1.15.0


correct the example and link for temporal table function documentation 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-23865) Class cast error caused by nested Pojo in legacy outputConversion

2021-08-18 Thread zoucao (Jira)
zoucao created FLINK-23865:
--

 Summary: Class cast error caused by nested Pojo in legacy 
outputConversion
 Key: FLINK-23865
 URL: https://issues.apache.org/jira/browse/FLINK-23865
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.13.2
Reporter: zoucao


code:
{code:java}
Table table = tbEnv.fromValues(DataTypes.ROW(
DataTypes.FIELD("innerPojo", DataTypes.ROW(DataTypes.FIELD("c", 
STRING(,
DataTypes.FIELD("b", STRING()),
DataTypes.FIELD("a", INT())),
Row.of(Row.of("str-c"), "str-b", 1));
DataStream pojoDataStream = tbEnv.toAppendStream(table, Pojo.class);
-
public static class Pojo{
public InnerPojo innerPojo;
public String b;
public int a;

public Pojo() {
}
}

public static class InnerPojo {
public String c;

public InnerPojo() {
}
}{code}
error:
{code:java}
java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType 
cannot be cast to 
org.apache.flink.table.types.logical.RowTypejava.lang.ClassCastException: 
org.apache.flink.table.types.logical.IntType cannot be cast to 
org.apache.flink.table.types.logical.RowType at 
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:163)
 at 
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:155)
{code}
The fields of PojoTypeInfo is in the alphabet order, such that in 
`expandPojoTypeToSchema`, 'pojoType' and 'queryLogicalType' should have own 
index,but now we use the pojo field index to get 'queryLogicalType', this will 
casue the field type mismatch. It should be fixed like :
{code:java}
val queryIndex = queryLogicalType.getFieldIndex(name)
val nestedLogicalType = 
queryLogicalType.getFields()(queryIndex).getType.asInstanceOf[RowType]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23777) Support dynamic parameters for sink table in multiple connectors

2021-08-15 Thread zoucao (Jira)
zoucao created FLINK-23777:
--

 Summary: Support dynamic parameters for sink table in multiple 
connectors
 Key: FLINK-23777
 URL: https://issues.apache.org/jira/browse/FLINK-23777
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / ElasticSearch, Connectors / Kafka
Reporter: zoucao


For now, metadata has been supported in SQL, it is simple, elegant and easy to 
use. 

But it is mostly used to get meta info from source table for now, I think we 
could use it more to support dynamic parameters for sink table. 

For example,
 * In kafka sink, we can use metadata to achieve dynamic partitions,let record 
choose it's partition by calculation logic.
{code:java}
CREATE TABLE KafkaSinkTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `partition` INT METADATA FROM 'partition'
) WITH (
  'connector' = 'kafka'
)
{code}

 * In Elasticsearch sink,  we also can use metadata to support dynamic index, 
like
{code:java}
CREATE TABLE ESSinkTable (
  index  VARCHAR  METADATA FROM 'index',
  log_ts VARCHAR
) with (
  'connector' = 'elasticsearch-7'
)
{code}
which is more elegant than treating a specific field as partial index like 
`myusers-\{log_ts|-MM-dd}`.

If there are some opinions or  other dynamic parameters worth to do, it could 
be propsed and dicussed here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23597) support Add Jar in Table api

2021-08-03 Thread zoucao (Jira)
zoucao created FLINK-23597:
--

 Summary: support Add Jar in Table api
 Key: FLINK-23597
 URL: https://issues.apache.org/jira/browse/FLINK-23597
 Project: Flink
  Issue Type: Improvement
Reporter: zoucao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23511) Fix the location error in metric page.

2021-07-27 Thread zoucao (Jira)
zoucao created FLINK-23511:
--

 Summary: Fix the location error in metric page.
 Key: FLINK-23511
 URL: https://issues.apache.org/jira/browse/FLINK-23511
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: zoucao
 Attachments: image-2021-07-27-15-17-57-654.png

This error exist in 
[master|https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#default-shuffle-service]
 and 
[1.13|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/]

The affected metrics are :
 * 'inputFloatingBuffersUsage'
 * 'inputExclusiveBuffersUsage'
 * 'maxQueueLen'
 * 'avgQueueLen'
 * 'backPressuredTimeMsPerSecond'
 * 'busyTimeMsPerSecond'

!image-2021-07-27-15-17-57-654.png!

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23510) add a new metric to show the max usage of inputExclusiveBuffer for each inputChannel

2021-07-27 Thread zoucao (Jira)
zoucao created FLINK-23510:
--

 Summary: add a new metric to show the max usage of 
inputExclusiveBuffer for each inputChannel
 Key: FLINK-23510
 URL: https://issues.apache.org/jira/browse/FLINK-23510
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: zoucao


Now, the 'inPoolUsage' and 'inputExclusiveBuffersUsage' describe the overall 
situation of a task's network usage. Sometimes, when the back pressure was 
present, I found the value of 'inPoolUsage' or 'inputExclusiveBuffersUsage' 
were very low, beacuse they were averaged by the input channel whose buffer 
usage is low. I propose to add a new metric for each task to record the maximum 
buffer usage of all input channels belonged to this task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23156) Change the reference of 'docs/dev/table/sql/queries'

2021-06-25 Thread zoucao (Jira)
zoucao created FLINK-23156:
--

 Summary: Change the reference of 'docs/dev/table/sql/queries'
 Key: FLINK-23156
 URL: https://issues.apache.org/jira/browse/FLINK-23156
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.13.0, 1.14.0
Reporter: zoucao


The ref of 
[docs/dev/table/sql/queries|https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/overview/]
 points to a page without any content, should we change it to 
[/docs/dev/table/sql/queries/overview/|https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/queries/overview/]
 ?

The problem exists in En/Zh doc with branch 1.13 and master



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22437) Miss adding parallesim for filter operator in batch mode

2021-04-23 Thread zoucao (Jira)
zoucao created FLINK-22437:
--

 Summary: Miss adding parallesim for filter operator in batch mode
 Key: FLINK-22437
 URL: https://issues.apache.org/jira/browse/FLINK-22437
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.12.2
Reporter: zoucao


when I execute batch sql as follow in flink-1.12.2, I found lots of small files 
in hdfs. In filesystem connector, `GroupedPartitionWriter` will be used, and it 
close the last partiton if a new record does not belong to the existing 
partition. The phenomenon occurred if there are more than one partiton' records 
are sent to filesystem sink at the same time. Hive source can determine 
parallesim by the number of file and partiton, and the parallesim will extended 
by sort operator,  but in `CommonPhysicalSink#createSinkTransformation`,a 
filter operator will be add to support `SinkNotNullEnforcer`, There is no 
parallesim set for it, so filesystem sink operator can not get the correct 
parallesim from inputstream.
{code:java}
CREATE CATALOG myHive with (
'type'='hive',
'property-version'='1',
'default-database' = 'flink_sql_online_test'
);
-- SET table.sql-dialect=hive;
-- CREATE TABLE IF NOT EXISTS myHive.flink_sql_online_test.hive_sink (
--`timestamp` BIGINT,
--`time` STRING,
--id BIGINT,
--product STRING,
--price DOUBLE,
--canSell STRING,
--selledNum BIGINT
-- ) PARTITIONED BY (
--dt STRING,
--`hour` STRING,
--   `min` STRING
-- ) TBLPROPERTIES (
--'partition.time-extractor.timestamp-pattern'='$dt $hr:$min:00',
--'sink.partition-commit.trigger'='partition-time',
--'sink.partition-commit.delay'='1 min',
--'sink.partition-commit.policy.kind'='metastore,success-file'
-- );
create table fs_sink (
`timestamp` BIGINT,
`time` STRING,
id BIGINT,
product STRING,
price DOUBLE,
canSell STRING,
selledNum BIGINT,
dt STRING,
`hour` STRING,
`min` STRING
) PARTITIONED BY (dt, `hour`, `min`) with (
'connector'='filesystem',
'path'='hdfs://',
'format'='csv'
);

insert into fs_sink
select * from myHive.flink_sql_online_test.hive_sink;
{code}
I think this problem can be fixed by add a parallesim for it just like
{code:java}
val dataStream = new DataStream(env, inputTransformation).filter(enforcer)
  .setParallelism(inputTransformation.getParallelism)
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21738) reduce unnecessary method calls in ModuleManager

2021-03-11 Thread zoucao (Jira)
zoucao created FLINK-21738:
--

 Summary: reduce unnecessary method calls  in ModuleManager
 Key: FLINK-21738
 URL: https://issues.apache.org/jira/browse/FLINK-21738
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: zoucao


In flink sql, if we use many functions(hive func or flink built-in func), Flink 
will call method

`getFunctionDefinition` in 
[ModuleManager|https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java#L44]
 many times to load func and each module's method `listFunctions` will be 
called at the same time. I think the same result will be returned for one 
module, so maybe a cache should be used here to reduce time waste.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20827) Just read record correlating to join key in FilesystemLookUpFunc

2021-01-01 Thread zoucao (Jira)
zoucao created FLINK-20827:
--

 Summary: Just read record correlating to join key in 
FilesystemLookUpFunc
 Key: FLINK-20827
 URL: https://issues.apache.org/jira/browse/FLINK-20827
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Reporter: zoucao


When using Temporal table join, all hive tables' records will be loaded into 
cache. But sometimes, the size of hive temporal table is larger than expected, 
and users can't know exactly how big it is in memory. In this situation, some 
error will occur, for example, `GC overhead limit exceeded`, `the heartbeat of 
TaskManager timeout (caused by gc)`. 

Maybe we can optimize the number of records readed from hive table?  If the 
upstream records can be hashed only by using `Join key`,  then we only need to 
load the part of  records into cache, whose value of join key after being 
hashed, is equal to one fixed hash value. If it can be done, the whole table 
can be divided by the number of parallelism. I don't know whether it could come 
true In the upstream under the existing framework, but It is easy to support in 
`FileSystemLookupFunction`

If not, we can add some log to tell others the size of cache to help them to 
set MemorySize or other parameter of TM.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20795) add a parameter to decide whether or not print dirty record when `ignore-parse-errors` is true

2020-12-28 Thread zoucao (Jira)
zoucao created FLINK-20795:
--

 Summary: add a parameter to decide whether or not print dirty 
record when `ignore-parse-errors` is true
 Key: FLINK-20795
 URL: https://issues.apache.org/jira/browse/FLINK-20795
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.13.0
Reporter: zoucao


add a parameter to decide whether or not to print dirty data when 
`ignore-parse-errors`=true, some users want to make his task stability and know 
the dirty record to fix the upstream, too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20701) print connector supports outputting seriailzed record

2020-12-21 Thread zoucao (Jira)
zoucao created FLINK-20701:
--

 Summary: print connector supports outputting seriailzed record
 Key: FLINK-20701
 URL: https://issues.apache.org/jira/browse/FLINK-20701
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Ecosystem
Affects Versions: 1.13.0
Reporter: zoucao


Hi all, print connector is designed for debugging,and it gives us better access 
to data. If I want to debug source connector and SQL statement, I will use it 
to print the result. However, I don't have a good choice to observe the 
seriailzed record, which will be sent to external connector when debugging. 
Consequently, we 'd better make print connetcor support outputting seriailzed 
record. We can use format=XXX to identify a seriailzed schema, help user to 
debug and understand the process of serialization.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20546) Misuse of the method in KafkaDynamicTableFactoryTest

2020-12-09 Thread zoucao (Jira)
zoucao created FLINK-20546:
--

 Summary: Misuse of the method in KafkaDynamicTableFactoryTest
 Key: FLINK-20546
 URL: https://issues.apache.org/jira/browse/FLINK-20546
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.12.0
Reporter: zoucao


hi all, I am reading something about DynamicTable Source/Sink Factory recently, 
and one thing let me confused. In `KafkaDynamicTableFactoryTest`, 
`getBasicSourceOptions()`  and `getBasicSinkOptions()` are created for source 
table and sink table, respectively. However, in the situation of validating 
sink table, `getBasicSourceOptions()` is called, for example in method 
`testSinkWithTopicListOrTopicPattern`、`testInvalidSinkSemantic` etc.  Although 
no exception is thrown, it also should be fixed. WDYT?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19893) stop consuming in specific offset or timestamp

2020-10-30 Thread zoucao (Jira)
zoucao created FLINK-19893:
--

 Summary: stop consuming in specific offset or timestamp
 Key: FLINK-19893
 URL: https://issues.apache.org/jira/browse/FLINK-19893
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: zoucao


Hi all, I think something new  for kafka connector should be added.

When using flink to consume message from MQ, there exists a large number of 
message in topic, not been consumed, and the production rate is equal to 
consumption rate,such that we have a high latency. In that condition, two task 
need to be started, one consumes the latest message, the other consumes the 
earliest message and stop consuming in the current offset or timestamp. May be 
we should support kafak connector in stopping consuming in specific offset or 
timestamp.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)