Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.
@Shengkai > About the catalog jar hot updates Currently we do not have a similar requirement, but if the catalog management interface is opened, this can indeed realize the hot loading of the catalog jar > do we need to instantiate the Catalog immediately or defer to the usage I think this can be the same as before . @Jark > There only can be a single catalog manager in TableEnvironment. big +1 for this. This can avoid conflicts and also meet the catalog persistence requirements. Best, Feng On Fri, Feb 10, 2023 at 3:09 PM Jark Wu wrote: > > Hi Feng, > > It's still easy to conflict and be inconsistent even if we have only one > CatalogProvider, because CatalogProvider only provides readable interfaces > (listCatalogs, getCatalog). For example, you may register a catalog X, but > can't list it because it's not in the external metadata service. > > To avoid catalog conflicts and keep consistent, we can extract the catalog > management logic as a pluggable interface, including listCatalog, > getCatalog, registerCatalog, unregisterCatalog, etc. The > current CatalogManager is a default in-memory implementation, you can > replace it with user-defined managers, such as > - file-based: which manages catalog information on local files, just like > how Presto/Trino manages catalogs > - metaservice-based: which manages catalog information on external > metadata service. > > There only can be a single catalog manager in TableEnvironment. This > guarantees data consistency and avoids conflicts. This approach can address > another pain point of Flink SQL: the catalog information is not persisted. > > Can this approach satisfy your requirements? > > Best, > Jark > > > > > > On Fri, 10 Feb 2023 at 11:21, Shengkai Fang wrote: > > > Hi Feng. > > > > I think your idea is very interesting! > > > > 1. I just wonder after initializing the Catalog, will the Session reuse the > > same Catalog instance or build a new one for later usage? If we reuse the > > same Catalog, I think it's more like lazy initialization. I am a > > little prone to rebuild a new one because it's easier for us to catalog jar > > hot updates. > > > > 2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this > > case, do we need to instantiate the Catalog immediately or defer to the > > usage? > > > > Best, > > Shengkai > > > > Feng Jin 于2023年2月9日周四 20:13写道: > > > > > Thanks for your reply. > > > > > > @Timo > > > > > > > 2) avoid the default in-memory catalog and offer their catalog before > > > a TableEnvironment session starts > > > > 3) whether this can be disabled and SHOW CATALOGS can be used for > > > listing first without having a default catalog. > > > > > > > > > Regarding 2 and 3, I think this problem can be solved by introducing > > > catalog providers, and users can control some default catalog > > > behavior. > > > > > > > > > > We could also use the org.apache.flink.table.factories.Factory infra > > > and allow catalog providers via pure string properties > > > > > > I think this is also very useful. In our usage scenarios, it is > > > usually multi-cluster management, and it is also necessary to pass > > > different configurations through parameters. > > > > > > > > > @Jark @Huang > > > > > > > About the lazy catalog initialization > > > > > > Our needs may be different. If these properties already exist in an > > > external system, especially when there may be thousands of these > > > catalog properties, I don’t think it is necessary to register all > > > these properties in the Flink env at startup, but we need is that we > > > can register a catalog when it needs and we can get the properties > > > from the external meta system . > > > > > > > > > > It may be hard to avoid conflicts and duplicates between > > > CatalogProvider and CatalogManager > > > > > > It is indeed easy to conflict. My idea is that if we separate the > > > catalog management of the current CatalogManager as the default > > > CatalogProvider behavior, at the same time, only one CatalogProvider > > > exists in a Flink Env. This may avoid catalog conflicts. > > > > > > > > > Best, > > > Feng > > > > > > On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan wrote: > > > > > > > > Hi Feng, > > > > I agree with what Jark said. I think what you are looking for is lazy > > > > initialization. > > > > > > > > I don't think we should introduce the new interface CatalogProvider for > > > > lazy initialization. What we should do is to store the catalog > > properties > > > > and initialize the catalog when we need it. Could you please introduce > > > some > > > > other scenarios that we need the CatalogProvider besides the lazy > > > > initialization? > > > > > > > > If we really need the CatalogProvider, I think it is better to be a > > > single > > > > instance. Multiple instances are difficult to manage and there are name > > > > conflicts among providers. > > > > > > > > Best, > > > > Hang > > > > > > > > Jark Wu 于2023年2月7日周二 10:48写道: > > >
[jira] [Created] (FLINK-31008) [Flink][Table Store] The Split allocation of the same bucket in ContinuousFileSplitEnumerator may be out of order
ming li created FLINK-31008: --- Summary: [Flink][Table Store] The Split allocation of the same bucket in ContinuousFileSplitEnumerator may be out of order Key: FLINK-31008 URL: https://issues.apache.org/jira/browse/FLINK-31008 Project: Flink Issue Type: Bug Components: Table Store Reporter: ming li There are two places in {{ContinuousFileSplitEnumerator}} that add {{FileStoreSourceSplit}} to {{{}bucketSplits{}}}: {{addSplitsBack}} and {{{}processDiscoveredSplits{}}}. {{processDiscoveredSplits}} will continuously check for new splits and add them to the queue. At this time, the order of the splits is in order. {code:java} private void addSplits(Collection splits) { splits.forEach(this::addSplit); } private void addSplit(FileStoreSourceSplit split) { bucketSplits .computeIfAbsent(((DataSplit) split.split()).bucket(), i -> new LinkedList<>()) .add(split); }{code} However, when the task failover, the splits that have been allocated before will be returned. At this time, these returned splits are also added to the end of the queue, which leads to disorder in the allocation of splits. I think these returned splits should be added to the head of the queue to ensure the order of allocation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.
Hi Feng, It's still easy to conflict and be inconsistent even if we have only one CatalogProvider, because CatalogProvider only provides readable interfaces (listCatalogs, getCatalog). For example, you may register a catalog X, but can't list it because it's not in the external metadata service. To avoid catalog conflicts and keep consistent, we can extract the catalog management logic as a pluggable interface, including listCatalog, getCatalog, registerCatalog, unregisterCatalog, etc. The current CatalogManager is a default in-memory implementation, you can replace it with user-defined managers, such as - file-based: which manages catalog information on local files, just like how Presto/Trino manages catalogs - metaservice-based: which manages catalog information on external metadata service. There only can be a single catalog manager in TableEnvironment. This guarantees data consistency and avoids conflicts. This approach can address another pain point of Flink SQL: the catalog information is not persisted. Can this approach satisfy your requirements? Best, Jark On Fri, 10 Feb 2023 at 11:21, Shengkai Fang wrote: > Hi Feng. > > I think your idea is very interesting! > > 1. I just wonder after initializing the Catalog, will the Session reuse the > same Catalog instance or build a new one for later usage? If we reuse the > same Catalog, I think it's more like lazy initialization. I am a > little prone to rebuild a new one because it's easier for us to catalog jar > hot updates. > > 2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this > case, do we need to instantiate the Catalog immediately or defer to the > usage? > > Best, > Shengkai > > Feng Jin 于2023年2月9日周四 20:13写道: > > > Thanks for your reply. > > > > @Timo > > > > > 2) avoid the default in-memory catalog and offer their catalog before > > a TableEnvironment session starts > > > 3) whether this can be disabled and SHOW CATALOGS can be used for > > listing first without having a default catalog. > > > > > > Regarding 2 and 3, I think this problem can be solved by introducing > > catalog providers, and users can control some default catalog > > behavior. > > > > > > > We could also use the org.apache.flink.table.factories.Factory infra > > and allow catalog providers via pure string properties > > > > I think this is also very useful. In our usage scenarios, it is > > usually multi-cluster management, and it is also necessary to pass > > different configurations through parameters. > > > > > > @Jark @Huang > > > > > About the lazy catalog initialization > > > > Our needs may be different. If these properties already exist in an > > external system, especially when there may be thousands of these > > catalog properties, I don’t think it is necessary to register all > > these properties in the Flink env at startup, but we need is that we > > can register a catalog when it needs and we can get the properties > > from the external meta system . > > > > > > > It may be hard to avoid conflicts and duplicates between > > CatalogProvider and CatalogManager > > > > It is indeed easy to conflict. My idea is that if we separate the > > catalog management of the current CatalogManager as the default > > CatalogProvider behavior, at the same time, only one CatalogProvider > > exists in a Flink Env. This may avoid catalog conflicts. > > > > > > Best, > > Feng > > > > On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan wrote: > > > > > > Hi Feng, > > > I agree with what Jark said. I think what you are looking for is lazy > > > initialization. > > > > > > I don't think we should introduce the new interface CatalogProvider for > > > lazy initialization. What we should do is to store the catalog > properties > > > and initialize the catalog when we need it. Could you please introduce > > some > > > other scenarios that we need the CatalogProvider besides the lazy > > > initialization? > > > > > > If we really need the CatalogProvider, I think it is better to be a > > single > > > instance. Multiple instances are difficult to manage and there are name > > > conflicts among providers. > > > > > > Best, > > > Hang > > > > > > Jark Wu 于2023年2月7日周二 10:48写道: > > > > > > > Hi Feng, > > > > > > > > I think this feature makes a lot of sense. If I understand correctly, > > what > > > > you are looking for is lazy catalog initialization. > > > > > > > > However, I have some concerns about introducing CatalogProvider, > which > > > > delegates catalog management to users. It may be hard to avoid > > conflicts > > > > and duplicates between CatalogProvider and CatalogManager. Is it > > possible > > > > to have a built-in CatalogProvider to instantiate catalogs lazily? > > > > > > > > An idea in my mind is to introduce another catalog registration API > > > > without instantiating the catalog, e.g., registerCatalog(String > > > > catalogName, Map catalogProperties). The catalog > > > > information is stored in CatalogManager as pure strings. The
[jira] [Created] (FLINK-31007) The code generated by the IF function throws NullPointerException
tivanli created FLINK-31007: --- Summary: The code generated by the IF function throws NullPointerException Key: FLINK-31007 URL: https://issues.apache.org/jira/browse/FLINK-31007 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.15.3, 1.15.2 Environment: {code:java} // code placeholder final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); final DataStream tab = env.fromCollection(Arrays.asList( new Tuple2<>(1L, "a_b_c"), new Tuple2<>(-1L, "a_b_c"))); final Table tableA = tableEnv.fromDataStream(tab); tableEnv.executeSql("SELECT if(f0 = -1, '', split_index(f1, '_', 0)) as id FROM " + tableA) .print(); {code} Reporter: tivanli Caused by: java.lang.NullPointerException at StreamExecCalc$19.processElement_split1(Unknown Source) at StreamExecCalc$19.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513) at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103) at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:231) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-289: Support online inference (Flink ML)
Hi Dong, Thanks for the reply. Got your points and it makes sense to me now~ Regards, Dian On Thu, Feb 9, 2023 at 3:45 PM Dong Lin wrote: > Hi Dian, > > Thanks for the review! Please see my reply inline. > > Regards, > Dong > > On Wed, Feb 8, 2023 at 11:58 AM Dian Fu wrote: > > > Hi Dong, > > > > > > Thanks for driving this effort! This FLIP LGTM overall. I have just a few > > minor comments regarding the proposed API: > > > > 1) For the method `DataFrame.collect()`, why is it named `collect` > instead > > of something else, e.g. `get`? Does it mean that the result will be > > computed during this method call? > > > > I have chosen the names to be closer to the names of similar classes or > concepts in the existing popular open-source projects. > > The reasons for using `collect` include: > - mleap is a popular machine learning serving framework and it uses > LeapFrame#collect() > < > https://github.com/combust/mleap/blob/master/mleap-runtime/src/main/scala/ml/combust/mleap/runtime/frame/LeapFrame.scala#L13 > > > to represent this concept. > - Collect rows from table seems close to the concept of > TableResult#collect() in Flink. > - The method returns a *collection* of rows from this table, which makes > `collect()` is bit more relevant than `get()`. > > Currently, I expect DataFrame.collect() to just return the values already > computed before this method is called. > > > > 2) For the method `DataFrame.addColumn`, when will this method be used? > > > > For example, we might want to implement KMeansModelServable#transform > that appends the prediction result to the input dataFrame. This is similar > to the existing way that KMeansModel#transform > < > https://github.com/apache/flink-ml/blob/master/flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/kmeans/KMeansModel.java#L84 > > > appends > the result to the input Table. Note that we probably don't want to copy the > entire input DataFrame to the outpuDataFrame. > > We will need to use DataFrame.addColumn to append the prediction result in > this case. > > > > 3) In the example `runOnlineInferenceOnWebServer`, it uses > > `output_df.getDataType("output")` to get the result type. It's not quite > > intuitive. Does it make sense to add a method `getDataType()` in > DataFrame? > > > I guess you are asking whether we should just have a method getDataType() > that returns types of all columns of the dataframe. > > Here is a scenario where users and algorithms will read only selected > columns from DataFrame: > - Users provided a DataFrame with columnA and columnB > - The 1st algorithm will read columnA and columnB, then computes/appends > columnC to the DataFrame. > - The 2nd algorithm will read columnB and columnC, then computes/appends > columnD to the DataFrame. > - Users need to read columnD in the final DataFrame. > > > > Regards, > > Dian > > > > > > > > On Tue, Feb 7, 2023 at 12:37 PM Dong Lin wrote: > > > > > Hi all, > > > > > > If there is no question related to this FLIP, we will start the voting > > > thread on 2/10. > > > > > > Regards, > > > Dong > > > > > > On Wed, Feb 1, 2023 at 8:38 PM Dong Lin wrote: > > > > > > > Hi all, > > > > > > > > Fan, Jiang, Zhipeng, and I have created FLIP-289: Support online > > > inference > > > > (Flink ML). > > > > > > > > The goal of this FLIP is to enable users to use the model trained by > > > Flink > > > > ML to do online inference. More details can be found in the FLIP doc > at > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240881268 > > > > . > > > > > > > > We are looking forward to your comments. > > > > > > > > Regards, > > > > Dong > > > > > > > > > >
[jira] [Created] (FLINK-31006) job is not finished when using pipeline mode to run bounded source like kafka/pulsar
jackylau created FLINK-31006: Summary: job is not finished when using pipeline mode to run bounded source like kafka/pulsar Key: FLINK-31006 URL: https://issues.apache.org/jira/browse/FLINK-31006 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Connectors / Pulsar Affects Versions: 1.17.0 Reporter: jackylau Fix For: 1.17.0 Attachments: image-2023-02-10-13-20-52-890.png, image-2023-02-10-13-23-38-430.png, image-2023-02-10-13-24-46-929.png when i do failover works like kill jm/tm when using pipeline mode to run bounded source like kafka, i found job is not finished, when every partition data has consumed. After dig into code, i found this logical not run when JM recover. the partition infos are not changed. so noMoreNewPartitionSplits is not set to true. then this will not run !image-2023-02-10-13-23-38-430.png! !image-2023-02-10-13-24-46-929.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: Confusion about some overlapping functionality of SupportsProjectionPushDown and SupportsReadingMetadata
Hi, Ran. I think it's a little difficult to split the rule into two parts. Because the ProjectDown and ReadingMetadata both need to reorder the fields. The ReadingMetadata requires the metadata columns to be at the last and the ProjectPushDown now is responsible to reorder the columns as the user specified. One more concern, the push-down optimization occurs in the logical phase that uses the volcano planner. I am not sure whether the rule will be applied at last because metadata push-down doesn't change the cost. Best, Shengkai
[jira] [Created] (FLINK-31005) Release Testing: Verify FLIP-281 Supports speculative execution of sinks
Zhu Zhu created FLINK-31005: --- Summary: Release Testing: Verify FLIP-281 Supports speculative execution of sinks Key: FLINK-31005 URL: https://issues.apache.org/jira/browse/FLINK-31005 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhu Zhu Fix For: 1.17.0 This task aims to verify [FLIP-281 Supports speculative execution of sinks|https://issues.apache.org/jira/browse/FLINK-30725]. The documentation can be found [here|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution] . Things to verify: 1. If a sink implements the decorative interface {{SupportsConcurrentExecutionAttempts}, Speculative executions can be performed, otherwise not. Sinks to verify includes SinkFunction, OutputFormat and Sink(V2). 2. These built-in sinks supports speculative execution: DiscardingSink, PrintSinkFunction, PrintSink, FileSink, FileSystemOutputFormat, HiveTableSink If it's hard to construct a case that speculative execution would happen, especially for those built-in sinks, the speculative execution configuration can be tuned to allow it easier to happen, e.g. set {{slow-task-detector.execution-time.baseline-lower-bound}} and {{slow-task-detector.execution-time.baseline-ratio}} to {{0}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31004) Introduce data input and output stream for table store
Shammon created FLINK-31004: --- Summary: Introduce data input and output stream for table store Key: FLINK-31004 URL: https://issues.apache.org/jira/browse/FLINK-31004 Project: Flink Issue Type: Sub-task Components: Table Store Affects Versions: table-store-0.4.0 Reporter: Shammon Introduce data input/output stream for table store -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect
weiqinpan created FLINK-31003: - Summary: Flink SQL IF / CASE WHEN Funcation incorrect Key: FLINK-31003 URL: https://issues.apache.org/jira/browse/FLINK-31003 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.16.1, 1.15.3, 1.15.2, 1.16.0, 1.15.1, 1.15.0 Reporter: weiqinpan When I execute the below sql using sql-client,i found something wrong. {code:java} CREATE TEMPORARY TABLE source ( mktgmsg_biz_type STRING, marketing_flow_id STRING, mktgmsg_campaign_id STRING ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///Users/xxx/Desktop/demo.json', 'format' = 'json' ); -- return correct value('marketing_flow_id') SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM source; -- return incorrect value('') SELECT IF(`marketing_flow_id` IS NULL, '', `marketing_flow_id`) FROM source;{code} The demo.json data is {code:java} {"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": "marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code} BTW, use case when + if / ifnull also have something wrong. {code:java} -- return wrong value(''), expect return marketing_flow_id select CASE WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN IF(`marketing_flow_id` IS NULL, `marketing_flow_id`, '') WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN IF(`mktgmsg_campaign_id` IS NULL, '', `mktgmsg_campaign_id`) ELSE '' END AS `message_campaign_instance_id` FROM source; -- return wrong value('') select CASE WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN IFNULL(`marketing_flow_id`, '') WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN IFNULL(`mktgmsg_campaign_id`, '') ELSE '' END AS `message_campaign_instance_id` FROM source; -- return correct value, the difference is [else return ' '] select CASE WHEN `mktgmsg_biz_type` = 'marketing_flow' THEN IFNULL(`marketing_flow_id`, '') WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign' THEN IFNULL(`mktgmsg_campaign_id`, '') ELSE ' ' END AS `message_campaign_instance_id` FROM source; {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.
Hi Feng. I think your idea is very interesting! 1. I just wonder after initializing the Catalog, will the Session reuse the same Catalog instance or build a new one for later usage? If we reuse the same Catalog, I think it's more like lazy initialization. I am a little prone to rebuild a new one because it's easier for us to catalog jar hot updates. 2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this case, do we need to instantiate the Catalog immediately or defer to the usage? Best, Shengkai Feng Jin 于2023年2月9日周四 20:13写道: > Thanks for your reply. > > @Timo > > > 2) avoid the default in-memory catalog and offer their catalog before > a TableEnvironment session starts > > 3) whether this can be disabled and SHOW CATALOGS can be used for > listing first without having a default catalog. > > > Regarding 2 and 3, I think this problem can be solved by introducing > catalog providers, and users can control some default catalog > behavior. > > > > We could also use the org.apache.flink.table.factories.Factory infra > and allow catalog providers via pure string properties > > I think this is also very useful. In our usage scenarios, it is > usually multi-cluster management, and it is also necessary to pass > different configurations through parameters. > > > @Jark @Huang > > > About the lazy catalog initialization > > Our needs may be different. If these properties already exist in an > external system, especially when there may be thousands of these > catalog properties, I don’t think it is necessary to register all > these properties in the Flink env at startup, but we need is that we > can register a catalog when it needs and we can get the properties > from the external meta system . > > > > It may be hard to avoid conflicts and duplicates between > CatalogProvider and CatalogManager > > It is indeed easy to conflict. My idea is that if we separate the > catalog management of the current CatalogManager as the default > CatalogProvider behavior, at the same time, only one CatalogProvider > exists in a Flink Env. This may avoid catalog conflicts. > > > Best, > Feng > > On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan wrote: > > > > Hi Feng, > > I agree with what Jark said. I think what you are looking for is lazy > > initialization. > > > > I don't think we should introduce the new interface CatalogProvider for > > lazy initialization. What we should do is to store the catalog properties > > and initialize the catalog when we need it. Could you please introduce > some > > other scenarios that we need the CatalogProvider besides the lazy > > initialization? > > > > If we really need the CatalogProvider, I think it is better to be a > single > > instance. Multiple instances are difficult to manage and there are name > > conflicts among providers. > > > > Best, > > Hang > > > > Jark Wu 于2023年2月7日周二 10:48写道: > > > > > Hi Feng, > > > > > > I think this feature makes a lot of sense. If I understand correctly, > what > > > you are looking for is lazy catalog initialization. > > > > > > However, I have some concerns about introducing CatalogProvider, which > > > delegates catalog management to users. It may be hard to avoid > conflicts > > > and duplicates between CatalogProvider and CatalogManager. Is it > possible > > > to have a built-in CatalogProvider to instantiate catalogs lazily? > > > > > > An idea in my mind is to introduce another catalog registration API > > > without instantiating the catalog, e.g., registerCatalog(String > > > catalogName, Map catalogProperties). The catalog > > > information is stored in CatalogManager as pure strings. The catalog is > > > instantiated and initialized when used. > > > > > > This new API is very similar to other pure-string metadata > registration, > > > such as "createTable(String path, TableDescriptor descriptor)" and > > > "createFunction(String path, String className, List > > > resourceUris)". > > > > > > Can this approach satisfy your requirement? > > > > > > Best, > > > Jark > > > > > > On Mon, 6 Feb 2023 at 22:53, Timo Walther wrote: > > > > > > > Hi Feng, > > > > > > > > this is indeed a good proposal. > > > > > > > > 1) It makes sense to improve the catalog listing for platform > providers. > > > > > > > > 2) Other feedback from the past has shown that users would like to > avoid > > > > the default in-memory catalog and offer their catalog before a > > > > TableEnvironment session starts. > > > > > > > > 3) Also we might reconsider whether a default catalog and default > > > > database make sense. Or whether this can be disabled and SHOW > CATALOGS > > > > can be used for listing first without having a default catalog. > > > > > > > > What do you think about option 2 and 3? > > > > > > > > In any case, I would propose we pass a CatalogProvider to > > > > EnvironmentSettings and only allow a single instance. Catalogs should > > > > never shadow other catalogs. > > > > > > > > We could also use the
[jira] [Created] (FLINK-31002) Provide data sampling query
Jingsong Lee created FLINK-31002: Summary: Provide data sampling query Key: FLINK-31002 URL: https://issues.apache.org/jira/browse/FLINK-31002 Project: Flink Issue Type: New Feature Components: Table Store Reporter: Jingsong Lee Want to take several randomly from each partition, but the limit is always fixed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31001) Introduce Hive writer
Jingsong Lee created FLINK-31001: Summary: Introduce Hive writer Key: FLINK-31001 URL: https://issues.apache.org/jira/browse/FLINK-31001 Project: Flink Issue Type: New Feature Components: Table Store Reporter: Jingsong Lee -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31000) Upgrade test units in flink-table-store-common to junit5
Shammon created FLINK-31000: --- Summary: Upgrade test units in flink-table-store-common to junit5 Key: FLINK-31000 URL: https://issues.apache.org/jira/browse/FLINK-31000 Project: Flink Issue Type: Sub-task Components: Table Store Affects Versions: table-store-0.4.0 Reporter: Shammon -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30999) Introduce flink-table-store-test-utils for table store
Shammon created FLINK-30999: --- Summary: Introduce flink-table-store-test-utils for table store Key: FLINK-30999 URL: https://issues.apache.org/jira/browse/FLINK-30999 Project: Flink Issue Type: Sub-task Components: Table Store Affects Versions: table-store-0.4.0 Reporter: Shammon Introduce flink-table-store-test-utils module for table store -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] Release flink-connector-kafka, release candidate #1
+1 (binding) - Verified legals (license headers and root LICENSE / NOTICE file). AFAICT no dependencies require explicit acknowledgement in the NOTICE files. - No binaries in staging area - Built source with tests - Verified signatures and hashes - Web PR changes LGTM Thanks Martijn! Cheers, Gordon On Mon, Feb 6, 2023 at 6:12 PM Mason Chen wrote: > That makes sense, thanks for the clarification! > > Best, > Mason > > On Wed, Feb 1, 2023 at 7:16 AM Martijn Visser > wrote: > > > Hi Mason, > > > > Thanks, [4] is indeed a copy-paste error and you've made the right > > assumption that > > > > > https://repository.apache.org/content/repositories/orgapacheflink-1582/org/apache/flink/ > > is the correct maven central link. > > > > I think we should use FLINK-30052 to move the Kafka connector code from > the > > 1.17 release also over the Kafka connector repo (especially since there's > > now a v3.0 branch for the Kafka connector, so it can be merged in main). > > When those commits have been merged, we can make a next Kafka connector > > release (which is equivalent to the 1.17 release, which can only be done > > when 1.17 is done because of the split level watermark alignment) and > then > > FLINK-30859 can be finished. > > > > Best regards, > > > > Martijn > > > > Op wo 1 feb. 2023 om 09:16 schreef Mason Chen : > > > > > +1 (non-binding) > > > > > > * Verified hashes and signatures > > > * Verified no binaries > > > * Verified LICENSE and NOTICE files > > > * Verified poms point to 3.0.0-1.16 > > > * Reviewed web PR > > > * Built from source > > > * Verified git tag > > > > > > I think [4] your is a copy-paste error and I did all the verification > > > assuming that > > > > > > > > > https://repository.apache.org/content/repositories/orgapacheflink-1582/org/apache/flink/ > > > is the correct maven central link. > > > > > > Regarding the release notes, should we close > > > https://issues.apache.org/jira/browse/FLINK-30052 and link it there? > > I've > > > created https://issues.apache.org/jira/browse/FLINK-30859 to remove > the > > > existing code from the master branch. > > > > > > Best, > > > Mason > > > > > > On Tue, Jan 31, 2023 at 6:23 AM Martijn Visser < > martijnvis...@apache.org > > > > > > wrote: > > > > > > > Hi everyone, > > > > Please review and vote on the release candidate #1 for > > > > flink-connector-kafka version 3.0.0, as follows: > > > > [ ] +1, Approve the release > > > > [ ] -1, Do not approve the release (please provide specific comments) > > > > > > > > Note: this is the same code as the Kafka connector for the Flink 1.16 > > > > release. > > > > > > > > The complete staging area is available for your review, which > includes: > > > > * JIRA release notes [1], > > > > * the official Apache source release to be deployed to > dist.apache.org > > > > [2], > > > > which are signed with the key with fingerprint > > > > A5F3BCE4CBE993573EC5966A65321B8382B219AF [3], > > > > * all artifacts to be deployed to the Maven Central Repository [4], > > > > * source code tag v3.0.0-rc1 [5], > > > > * website pull request listing the new release [6]. > > > > > > > > The vote will be open for at least 72 hours. It is adopted by > majority > > > > approval, with at least 3 PMC affirmative votes. > > > > > > > > Thanks, > > > > Release Manager > > > > > > > > [1] > > > > > > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352577 > > > > [2] > > > > > > > > > > > > > > https://dist.apache.org/repos/dist/dev/flink/flink-connector-kafka-3.0.0-rc1 > > > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS > > > > [4] > > > > > > > > > > > > > > https://dist.apache.org/repos/dist/dev/flink/flink-connector-kafka-3.0.0-rc1/ > > > > [5] > > > > > > https://github.com/apache/flink-connector-kafka/releases/tag/v3.0.0-rc1 > > > > [6] https://github.com/apache/flink-web/pull/606 > > > > > > > > > >
[jira] [Created] (FLINK-30998) Add optional exception handler to flink-connector-opensearch
Leonid Ilyevsky created FLINK-30998: --- Summary: Add optional exception handler to flink-connector-opensearch Key: FLINK-30998 URL: https://issues.apache.org/jira/browse/FLINK-30998 Project: Flink Issue Type: Improvement Components: Connectors / Opensearch Affects Versions: 1.16.1 Reporter: Leonid Ilyevsky Currently, when there is a failure coming from Opensearch, the FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). This makes the Flink pipeline fail. There is no way to handle the exception in the client code. I suggest to add an option to set a failure handler, similar to the way it is done in elasticsearch connector. This way the client code has a chance to examine the failure and handle it. Here is the use case example when it will be very useful. We are using streams on Opensearch side, and we are setting our own document IDs. Sometimes these IDs are duplicated; we need to ignore this situation and continue (this way it works for us with Elastisearch). However, with opensearch connector, the error comes back, saying that the batch failed (even though most of the documents were indexed, only the ones with duplicated IDs were rejected), and the whole flink job fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30997) Refactor tests in connector to extends AbstractTestBase
Shammon created FLINK-30997: --- Summary: Refactor tests in connector to extends AbstractTestBase Key: FLINK-30997 URL: https://issues.apache.org/jira/browse/FLINK-30997 Project: Flink Issue Type: Sub-task Components: Table Store Affects Versions: table-store-0.4.0 Reporter: Shammon Refactor tests in connector to extends `AbstractTestBase` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30996) Sync Kafka 1.17 commits from apache/flink repo to flink-connector-kafka
Martijn Visser created FLINK-30996: -- Summary: Sync Kafka 1.17 commits from apache/flink repo to flink-connector-kafka Key: FLINK-30996 URL: https://issues.apache.org/jira/browse/FLINK-30996 Project: Flink Issue Type: Sub-task Components: Connectors / Kafka Affects Versions: kafka-4.0.0 Reporter: Martijn Visser Assignee: Martijn Visser -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30995) Introduce ByteSerializer for Table Store
Feng Wang created FLINK-30995: - Summary: Introduce ByteSerializer for Table Store Key: FLINK-30995 URL: https://issues.apache.org/jira/browse/FLINK-30995 Project: Flink Issue Type: New Feature Components: Table Store Reporter: Feng Wang Fix For: table-store-0.4.0 Introduce ByteSerializer for Table Store{+}{+} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30993) Introduce FloatSerializer for Table Store
Feng Wang created FLINK-30993: - Summary: Introduce FloatSerializer for Table Store Key: FLINK-30993 URL: https://issues.apache.org/jira/browse/FLINK-30993 Project: Flink Issue Type: New Feature Reporter: Feng Wang Fix For: table-store-0.4.0 Introduce FloatSerializer for Table Store -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30994) Introduce DoubleSerializer for Table Store
Feng Wang created FLINK-30994: - Summary: Introduce DoubleSerializer for Table Store Key: FLINK-30994 URL: https://issues.apache.org/jira/browse/FLINK-30994 Project: Flink Issue Type: New Feature Components: Table Store Reporter: Feng Wang Fix For: table-store-0.4.0 Introduce DoubleSerializer for Table Store -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30992) Introduce ShortSerializer for Table Store
Feng Wang created FLINK-30992: - Summary: Introduce ShortSerializer for Table Store Key: FLINK-30992 URL: https://issues.apache.org/jira/browse/FLINK-30992 Project: Flink Issue Type: New Feature Components: Table Store Reporter: Feng Wang Fix For: table-store-0.4.0 Introduce ShortSerializer for Table Store -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30991) Introduce LongSerializer for Table Store
Feng Wang created FLINK-30991: - Summary: Introduce LongSerializer for Table Store Key: FLINK-30991 URL: https://issues.apache.org/jira/browse/FLINK-30991 Project: Flink Issue Type: New Feature Components: Table Store Reporter: Feng Wang Fix For: table-store-0.4.0 Introduce LongSerializer for Table Store -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30990) Performance of has worsened DownstreamTasks.BATCH
Martijn Visser created FLINK-30990: -- Summary: Performance of has worsened DownstreamTasks.BATCH Key: FLINK-30990 URL: https://issues.apache.org/jira/browse/FLINK-30990 Project: Flink Issue Type: Bug Affects Versions: 1.17.0 Reporter: Martijn Visser See http://codespeed.dak8s.net:8000/timeline/#/?exe=8=deployDownstreamTasks.BATCH=on=on=off=2=200 It appears that the trend has now stabilized upwards, from 60-70 ms/op to 80-90 ms/op, while less is better. Not sure if this has meaningful impact in a production setup, but I would like to get this verified. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: Fwd: [FLINK-30596] Duplicate jobs on /run with the same jobId
FYI, the original mail never actually made it to the dev mailing list (maybe got stuck in moderation queue). On 08/02/2023 18:21, Mohsen Rezaei wrote: Hi, I was hoping to resurface this issue given that it affects any application using the REST endpoints to run/submit jobs, causing confusion on the actual state of the submitted jobs in a cluster. I've created a PR against the "master" branch, and I'd like to be able to port it back to 1.16.2 and 1.15.4 to close the loop on the issue for the last three major releases: https://github.com/apache/flink/pull/21849 Cheers, Mohsen Rezaei -- Forwarded message - From: Mohsen Rezaei Date: Sat, Jan 21, 2023 at 5:58 PM Subject: Duplicate jobs on /run with the same jobId To: Hi everyone, I have run into an issue with the Dispatcher's runJob() that accepts multiple jobs and starts them with the same jobId. The filtering fix in waitForTerminatingJob [1] fixes the issue, but I need a consensus from devs to ensure this is the proper fix, before submitting a PR. Cheers, Mohsen Rezaei [1] https://issues.apache.org/jira/browse/FLINK-30596
Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.
Thanks for your reply. @Timo > 2) avoid the default in-memory catalog and offer their catalog before a > TableEnvironment session starts > 3) whether this can be disabled and SHOW CATALOGS can be used for listing > first without having a default catalog. Regarding 2 and 3, I think this problem can be solved by introducing catalog providers, and users can control some default catalog behavior. > We could also use the org.apache.flink.table.factories.Factory infra and > allow catalog providers via pure string properties I think this is also very useful. In our usage scenarios, it is usually multi-cluster management, and it is also necessary to pass different configurations through parameters. @Jark @Huang > About the lazy catalog initialization Our needs may be different. If these properties already exist in an external system, especially when there may be thousands of these catalog properties, I don’t think it is necessary to register all these properties in the Flink env at startup, but we need is that we can register a catalog when it needs and we can get the properties from the external meta system . > It may be hard to avoid conflicts and duplicates between CatalogProvider > and CatalogManager It is indeed easy to conflict. My idea is that if we separate the catalog management of the current CatalogManager as the default CatalogProvider behavior, at the same time, only one CatalogProvider exists in a Flink Env. This may avoid catalog conflicts. Best, Feng On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan wrote: > > Hi Feng, > I agree with what Jark said. I think what you are looking for is lazy > initialization. > > I don't think we should introduce the new interface CatalogProvider for > lazy initialization. What we should do is to store the catalog properties > and initialize the catalog when we need it. Could you please introduce some > other scenarios that we need the CatalogProvider besides the lazy > initialization? > > If we really need the CatalogProvider, I think it is better to be a single > instance. Multiple instances are difficult to manage and there are name > conflicts among providers. > > Best, > Hang > > Jark Wu 于2023年2月7日周二 10:48写道: > > > Hi Feng, > > > > I think this feature makes a lot of sense. If I understand correctly, what > > you are looking for is lazy catalog initialization. > > > > However, I have some concerns about introducing CatalogProvider, which > > delegates catalog management to users. It may be hard to avoid conflicts > > and duplicates between CatalogProvider and CatalogManager. Is it possible > > to have a built-in CatalogProvider to instantiate catalogs lazily? > > > > An idea in my mind is to introduce another catalog registration API > > without instantiating the catalog, e.g., registerCatalog(String > > catalogName, Map catalogProperties). The catalog > > information is stored in CatalogManager as pure strings. The catalog is > > instantiated and initialized when used. > > > > This new API is very similar to other pure-string metadata registration, > > such as "createTable(String path, TableDescriptor descriptor)" and > > "createFunction(String path, String className, List > > resourceUris)". > > > > Can this approach satisfy your requirement? > > > > Best, > > Jark > > > > On Mon, 6 Feb 2023 at 22:53, Timo Walther wrote: > > > > > Hi Feng, > > > > > > this is indeed a good proposal. > > > > > > 1) It makes sense to improve the catalog listing for platform providers. > > > > > > 2) Other feedback from the past has shown that users would like to avoid > > > the default in-memory catalog and offer their catalog before a > > > TableEnvironment session starts. > > > > > > 3) Also we might reconsider whether a default catalog and default > > > database make sense. Or whether this can be disabled and SHOW CATALOGS > > > can be used for listing first without having a default catalog. > > > > > > What do you think about option 2 and 3? > > > > > > In any case, I would propose we pass a CatalogProvider to > > > EnvironmentSettings and only allow a single instance. Catalogs should > > > never shadow other catalogs. > > > > > > We could also use the org.apache.flink.table.factories.Factory infra and > > > allow catalog providers via pure string properties. Not sure if we need > > > this in the first version though. > > > > > > Cheers, > > > Timo > > > > > > > > > On 06.02.23 11:21, Feng Jin wrote: > > > > Hi everyone, > > > > > > > > The original discussion address is > > > > https://issues.apache.org/jira/browse/FLINK-30126 > > > > > > > > Currently, Flink has access to many systems, including kafka, hive, > > > > iceberg, hudi, elasticsearch, mysql... The corresponding catalog name > > > > might be: > > > > kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2, > > > > iceberg_cluster2, elasticsearch_cluster1, mysql_database1_xxx, > > > > mysql_database2_ > > > > > > > > As the platform of the Flink SQL job, we need to maintain
Fwd: [FLINK-30596] Duplicate jobs on /run with the same jobId
Hi, I was hoping to resurface this issue given that it affects any application using the REST endpoints to run/submit jobs, causing confusion on the actual state of the submitted jobs in a cluster. I've created a PR against the "master" branch, and I'd like to be able to port it back to 1.16.2 and 1.15.4 to close the loop on the issue for the last three major releases: https://github.com/apache/flink/pull/21849 Cheers, Mohsen Rezaei -- Forwarded message - From: Mohsen Rezaei Date: Sat, Jan 21, 2023 at 5:58 PM Subject: Duplicate jobs on /run with the same jobId To: Hi everyone, I have run into an issue with the Dispatcher's runJob() that accepts multiple jobs and starts them with the same jobId. The filtering fix in waitForTerminatingJob [1] fixes the issue, but I need a consensus from devs to ensure this is the proper fix, before submitting a PR. Cheers, Mohsen Rezaei [1] https://issues.apache.org/jira/browse/FLINK-30596
[jira] [Created] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job
shen created FLINK-30989: Summary: Configuration table.exec.spill-compression.block-size not take effect in batch job Key: FLINK-30989 URL: https://issues.apache.org/jira/browse/FLINK-30989 Project: Flink Issue Type: Bug Components: API / DataStream, Runtime / Configuration Affects Versions: 1.16.1 Reporter: shen Attachments: image-2023-02-09-19-37-44-927.png h1. Description I tried to config table.exec.spill-compression.block-size in TableEnv in my job and failed. I attached to TaskManager and found conf passed to constructor of [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204] is empty: !image-2023-02-09-19-37-44-927.png|width=306,height=185! h1. How to reproduce A simple code to reproduce this problem: {code:java} // App.java package test.flink403; import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.AlgorithmOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import java.util.Arrays; public class App { public static void main(String argc[]) throws Exception { Configuration config = new Configuration(); config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, true); config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true); config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); // < cannot take effect config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f)); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 m"); // < cannot take effect final DataStream orderA = env.fromCollection( Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); final Table tableA = tableEnv.fromDataStream(orderA); final Table result = tableEnv.sqlQuery( "SELECT * FROM " + tableA + " " + " order by user"); tableEnv.toDataStream(result, Order.class).print(); env.execute(); } } // --- // Order.java package test.flink403; public class Order { public Long user; public String product; public int amount; // for POJO detection in DataStream API public Order() {} // for structured type detection in Table API public Order(Long user, String product, int amount) { this.user = user; this.product = product; this.amount = amount; } @Override public String toString() { return "Order{" + "user=" + user + ", product='" + product + '\'' + ", amount=" + amount + '}'; } }{code} I think it is because [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88] try to get conf from JobConfiguration, which should be set in JobGraph. Following are the Classes use the same method to get conf from JobConfiguration: * BinaryExternalSorter ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE * BinaryHashTable,BaseHybridHashTable ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE * SortDataInput ** AlgorithmOptions.SORT_SPILLING_THRESHOLD ** AlgorithmOptions.SPILLING_MAX_FAN ** AlgorithmOptions.USE_LARGE_RECORDS_HANDLER -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30988) Refactor E2E tests to get rid of managed table
yuzelin created FLINK-30988: --- Summary: Refactor E2E tests to get rid of managed table Key: FLINK-30988 URL: https://issues.apache.org/jira/browse/FLINK-30988 Project: Flink Issue Type: Sub-task Components: Table Store Affects Versions: table-store-0.4.0 Reporter: yuzelin -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: Confusion about some overlapping functionality of SupportsProjectionPushDown and SupportsReadingMetadata
hi, yuxia, thanks for your feedback. The point I am talking about here is that when implementing two interfaces at the same time, the SupportsProjectionPushDown method can actually be left blank, just use SupportsProjectionPushDown to mark it, and then SupportsReadingMetadata can complete the physical column pushdown and metadata processing at the same time. Another point is that if the methods of two interfaces are implemented at the same time, the developer may have unexpected behaviors due to the sequence problem. Of course I agree to update the java doc notes. yuxia 于2023年2月9日周四 16:09写道: > Hi, Ran Tao. > Thanks for bring it up. > TBH, to me, it's not as so confusing. > Is that the fact that the applyReadableMetadata and applyProjection all > will pass producedDataType and the source conneector developer will > need to choose which one as the finnal output type? > > As the Java doc of applyReadableMetadata said, use the producedDataType in > this method instead of applyProjection. > > For you question, I think the responsibilities of these two interfaces are > quite independent. What kind of independence are you expecting? > > Btw, to aovid confusing, i think we may need to specific it that the > applyProjection will be called before method applyReadableMetadata > in the java doc of applyReadableMetadata. > > > Best regards, > Yuxia > > - 原始邮件 - > 发件人: "Ran Tao" > 收件人: "dev" > 发送时间: 星期三, 2023年 2 月 08日 下午 8:46:45 > 主题: Confusion about some overlapping functionality of > SupportsProjectionPushDown and SupportsReadingMetadata > > Currently we use SupportsProjectionPushDown to push down physical columns, > and SupportsReadingMetadata is used to read metadata columns. > There is no problem when implementing one of the interfaces alone. If two > interfaces are implemented at the same time, there will be confusing > semantics. > > For example, if we update the schema or producedDataType in > SupportsProjectionPushDown#applyProjection and > SupportsReadingMetadata#applyReadableMetadata at the same time, the former > is actually invalid, because the former is called first, and then the > latter will overwrite it. > > There are some similar usage notes in the interface's documentation. But > this is very confusing. In this case, you only need to implement > SupportsReadingMetadata#applyReadableMetadata (only implement > SupportsProjectionPushDown, the override method is empty), and the rule > match logic of SupportsReadingMetadata will push down the physical column > and metadata columns to generate producedDataType and return it. > > At this point SupportsProjectionPushDown is more like a marker interface. > In addition, if some member variables are relied on in the implementation > of SupportsReadingMetadata, and the member variables are also updated in > SupportsProjectionPushDown, unexpected problems may occur. Developers > should clearly read the implementation of these two interfaces and > understand that these overlapping functions will cause a certain > development cost to the developer of the connector (normally, the two > interfaces should be isolated functions, developers see the meaning of the > name ). > > I wonder if the community has considered making the responsibilities of > these two interfaces more independent and clear in subsequent updates. > Maybe my understanding is not very sufficient, looking forward to your > opinions. > > -- > Best Regards, > Ran Tao > https://github.com/chucheng92 > -- Best Regards, Ran Tao https://github.com/chucheng92
[jira] [Created] (FLINK-30987) output source exception for SocketStreamIterator
chenyuzhi created FLINK-30987: - Summary: output source exception for SocketStreamIterator Key: FLINK-30987 URL: https://issues.apache.org/jira/browse/FLINK-30987 Project: Flink Issue Type: Improvement Reporter: chenyuzhi Attachments: image-2023-02-09-16-47-49-928.png Sometime we could meet some error when using ` org.apache.flink.streaming.experimental. SocketStreamIterator ` for testing or output. Howerver, we can't got the source exception on the log info. May be we could throw the source exception directly !image-2023-02-09-16-47-49-928.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30986) Refactor CreateTableITCase and DropTableITCase to get rid of managed table
yuzelin created FLINK-30986: --- Summary: Refactor CreateTableITCase and DropTableITCase to get rid of managed table Key: FLINK-30986 URL: https://issues.apache.org/jira/browse/FLINK-30986 Project: Flink Issue Type: Improvement Components: Table Store Affects Versions: table-store-0.4.0 Reporter: yuzelin -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: Confusion about some overlapping functionality of SupportsProjectionPushDown and SupportsReadingMetadata
Hi, Ran Tao. Thanks for bring it up. TBH, to me, it's not as so confusing. Is that the fact that the applyReadableMetadata and applyProjection all will pass producedDataType and the source conneector developer will need to choose which one as the finnal output type? As the Java doc of applyReadableMetadata said, use the producedDataType in this method instead of applyProjection. For you question, I think the responsibilities of these two interfaces are quite independent. What kind of independence are you expecting? Btw, to aovid confusing, i think we may need to specific it that the applyProjection will be called before method applyReadableMetadata in the java doc of applyReadableMetadata. Best regards, Yuxia - 原始邮件 - 发件人: "Ran Tao" 收件人: "dev" 发送时间: 星期三, 2023年 2 月 08日 下午 8:46:45 主题: Confusion about some overlapping functionality of SupportsProjectionPushDown and SupportsReadingMetadata Currently we use SupportsProjectionPushDown to push down physical columns, and SupportsReadingMetadata is used to read metadata columns. There is no problem when implementing one of the interfaces alone. If two interfaces are implemented at the same time, there will be confusing semantics. For example, if we update the schema or producedDataType in SupportsProjectionPushDown#applyProjection and SupportsReadingMetadata#applyReadableMetadata at the same time, the former is actually invalid, because the former is called first, and then the latter will overwrite it. There are some similar usage notes in the interface's documentation. But this is very confusing. In this case, you only need to implement SupportsReadingMetadata#applyReadableMetadata (only implement SupportsProjectionPushDown, the override method is empty), and the rule match logic of SupportsReadingMetadata will push down the physical column and metadata columns to generate producedDataType and return it. At this point SupportsProjectionPushDown is more like a marker interface. In addition, if some member variables are relied on in the implementation of SupportsReadingMetadata, and the member variables are also updated in SupportsProjectionPushDown, unexpected problems may occur. Developers should clearly read the implementation of these two interfaces and understand that these overlapping functions will cause a certain development cost to the developer of the connector (normally, the two interfaces should be isolated functions, developers see the meaning of the name ). I wonder if the community has considered making the responsibilities of these two interfaces more independent and clear in subsequent updates. Maybe my understanding is not very sufficient, looking forward to your opinions. -- Best Regards, Ran Tao https://github.com/chucheng92
[jira] [Created] (FLINK-30985) [Flink][table-store] Change the Splits allocation algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm.
ming li created FLINK-30985: --- Summary: [Flink][table-store] Change the Splits allocation algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm. Key: FLINK-30985 URL: https://issues.apache.org/jira/browse/FLINK-30985 Project: Flink Issue Type: Improvement Components: Table Store Reporter: ming li Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in {{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the number of buckets is fixed, the order of traversal is also fixed. {code:java} private void assignSplits() { bucketSplits.forEach( (bucket, splits) -> { if (splits.size() > 0) { // To ensure the order of consumption, the data of the same bucket is given // to a task to be consumed. int task = bucket % context.currentParallelism(); if (readersAwaitingSplit.remove(task)) { // if the reader that requested another split has failed in the // meantime, remove // it from the list of waiting readers if (!context.registeredReaders().containsKey(task)) { return; } context.assignSplit(splits.poll(), task); } } }); }{code} Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough split in each {{bucket}} , so that the first {{bucket}} will always be assigned to the task, and other buckets may not be consumed for a long time, resulting in uneven consumption and difficulty in advancing {{{}watermark{}}}. So I think we should change the split allocation algorithm to a fair algorithm. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30984) Remove explicit cast required by 3.1.x janino
Sergey Nuyanzin created FLINK-30984: --- Summary: Remove explicit cast required by 3.1.x janino Key: FLINK-30984 URL: https://issues.apache.org/jira/browse/FLINK-30984 Project: Flink Issue Type: Technical Debt Components: Table SQL / Runtime Reporter: Sergey Nuyanzin This is a follow up task. Currently in 3.1.x Janino there is [https://github.com/janino-compiler/janino/issues/188] leading to fail several Flink tests. Once it is fixed on janino side WAs should be removed together with janino's update -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30983) the security.ssl.algorithms configuration does not take effect in rest ssl
luyuan created FLINK-30983: -- Summary: the security.ssl.algorithms configuration does not take effect in rest ssl Key: FLINK-30983 URL: https://issues.apache.org/jira/browse/FLINK-30983 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.16.0 Reporter: luyuan Attachments: image-2023-02-09-15-58-36-254.png, image-2023-02-09-15-58-43-963.png The security.ssl.algorithms configuration does not take effect in rest ssl. SSLUtils#createRestNettySSLContext does not call SslContextBuilder#ciphers as SSLUtils#createInternalNettySSLContext. !image-2023-02-09-15-58-36-254.png! !image-2023-02-09-15-58-43-963.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)