Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

2023-02-09 Thread Feng Jin
@Shengkai
> About the catalog jar hot updates

Currently we do not have a similar requirement, but if the catalog
management interface is opened, this can indeed realize the hot
loading of the catalog jar


>  do we need to instantiate the Catalog immediately or defer to the usage

I think this can be the same as before .



@Jark
> There only can be a single catalog manager in TableEnvironment.

big +1 for this.  This can avoid conflicts and also meet the catalog
persistence requirements.


Best,
Feng

On Fri, Feb 10, 2023 at 3:09 PM Jark Wu  wrote:
>
> Hi Feng,
>
> It's still easy to conflict and be inconsistent even if we have only one
> CatalogProvider, because CatalogProvider only provides readable interfaces
> (listCatalogs, getCatalog). For example, you may register a catalog X, but
> can't list it because it's not in the external metadata service.
>
> To avoid catalog conflicts and keep consistent, we can extract the catalog
> management logic as a pluggable interface, including listCatalog,
> getCatalog, registerCatalog, unregisterCatalog, etc. The
> current CatalogManager is a default in-memory implementation, you can
> replace it with user-defined managers, such as
>  - file-based: which manages catalog information on local files, just like
> how Presto/Trino manages catalogs
>  - metaservice-based: which manages catalog information on external
> metadata service.
>
> There only can be a single catalog manager in TableEnvironment. This
> guarantees data consistency and avoids conflicts. This approach can address
> another pain point of Flink SQL: the catalog information is not persisted.
>
> Can this approach satisfy your requirements?
>
> Best,
> Jark
>
>
>
>
>
> On Fri, 10 Feb 2023 at 11:21, Shengkai Fang  wrote:
>
> > Hi Feng.
> >
> > I think your idea is very interesting!
> >
> > 1. I just wonder after initializing the Catalog, will the Session reuse the
> > same Catalog instance or build a new one for later usage? If we reuse the
> > same Catalog, I think it's more like lazy initialization. I am a
> > little prone to rebuild a new one because it's easier for us to catalog jar
> > hot updates.
> >
> > 2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this
> > case, do we need to instantiate the Catalog immediately or defer to the
> > usage?
> >
> > Best,
> > Shengkai
> >
> > Feng Jin  于2023年2月9日周四 20:13写道:
> >
> > > Thanks for your reply.
> > >
> > > @Timo
> > >
> > > >  2) avoid  the default in-memory catalog and offer their catalog before
> > > a  TableEnvironment session starts
> > > >  3) whether this can be disabled and SHOW CATALOGS  can be used for
> > > listing first without having a default catalog.
> > >
> > >
> > > Regarding 2 and 3, I think this problem can be solved by introducing
> > > catalog providers, and users can control some default catalog
> > > behavior.
> > >
> > >
> > > > We could also use the org.apache.flink.table.factories.Factory infra
> > > and  allow catalog providers via pure string properties
> > >
> > > I think this is also very useful. In our usage scenarios, it is
> > > usually multi-cluster management, and it is also necessary to pass
> > > different configurations through parameters.
> > >
> > >
> > > @Jark @Huang
> > >
> > > >  About the lazy catalog initialization
> > >
> > > Our needs may be different. If these properties already exist in an
> > > external system, especially when there may be thousands of these
> > > catalog properties, I don’t think it is necessary to register all
> > > these properties in the Flink env at startup, but we need is that we
> > > can register a catalog  when it needs and we can get the properties
> > > from the external meta system .
> > >
> > >
> > > >  It may be hard to avoid conflicts  and duplicates between
> > > CatalogProvider and CatalogManager
> > >
> > > It is indeed easy to conflict. My idea is that if we separate the
> > > catalog management of the current CatalogManager as the default
> > > CatalogProvider behavior, at the same time, only one CatalogProvider
> > > exists in a Flink Env.  This may avoid catalog conflicts.
> > >
> > >
> > > Best,
> > > Feng
> > >
> > > On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan  wrote:
> > > >
> > > > Hi Feng,
> > > > I agree with what Jark said. I think what you are looking for is lazy
> > > > initialization.
> > > >
> > > > I don't think we should introduce the new interface CatalogProvider for
> > > > lazy initialization. What we should do is to store the catalog
> > properties
> > > > and initialize the catalog when we need it. Could you please introduce
> > > some
> > > > other scenarios that we need the CatalogProvider besides the lazy
> > > > initialization?
> > > >
> > > > If we really need the CatalogProvider, I think it is better to be a
> > > single
> > > > instance. Multiple instances are difficult to manage and there are name
> > > > conflicts among providers.
> > > >
> > > > Best,
> > > > Hang
> > > >
> > > > Jark Wu  于2023年2月7日周二 10:48写道:
> > > 

[jira] [Created] (FLINK-31008) [Flink][Table Store] The Split allocation of the same bucket in ContinuousFileSplitEnumerator may be out of order

2023-02-09 Thread ming li (Jira)
ming li created FLINK-31008:
---

 Summary: [Flink][Table Store] The Split allocation of the same 
bucket in ContinuousFileSplitEnumerator may be out of order
 Key: FLINK-31008
 URL: https://issues.apache.org/jira/browse/FLINK-31008
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Reporter: ming li


There are two places in {{ContinuousFileSplitEnumerator}} that add 
{{FileStoreSourceSplit}} to {{{}bucketSplits{}}}: {{addSplitsBack}} and 
{{{}processDiscoveredSplits{}}}. {{processDiscoveredSplits}} will continuously 
check for new splits and add them to the queue.  At this time, the order of the 
splits is in order.
{code:java}
private void addSplits(Collection splits) {
splits.forEach(this::addSplit);
}

private void addSplit(FileStoreSourceSplit split) {
bucketSplits
.computeIfAbsent(((DataSplit) split.split()).bucket(), i -> new 
LinkedList<>())
.add(split);
}{code}
However, when the task failover, the splits that have been allocated before 
will be returned. At this time, these returned splits are also added to the end 
of the queue, which leads to disorder in the allocation of splits.

 

I think these returned splits should be added to the head of the queue to 
ensure the order of allocation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

2023-02-09 Thread Jark Wu
Hi Feng,

It's still easy to conflict and be inconsistent even if we have only one
CatalogProvider, because CatalogProvider only provides readable interfaces
(listCatalogs, getCatalog). For example, you may register a catalog X, but
can't list it because it's not in the external metadata service.

To avoid catalog conflicts and keep consistent, we can extract the catalog
management logic as a pluggable interface, including listCatalog,
getCatalog, registerCatalog, unregisterCatalog, etc. The
current CatalogManager is a default in-memory implementation, you can
replace it with user-defined managers, such as
 - file-based: which manages catalog information on local files, just like
how Presto/Trino manages catalogs
 - metaservice-based: which manages catalog information on external
metadata service.

There only can be a single catalog manager in TableEnvironment. This
guarantees data consistency and avoids conflicts. This approach can address
another pain point of Flink SQL: the catalog information is not persisted.

Can this approach satisfy your requirements?

Best,
Jark





On Fri, 10 Feb 2023 at 11:21, Shengkai Fang  wrote:

> Hi Feng.
>
> I think your idea is very interesting!
>
> 1. I just wonder after initializing the Catalog, will the Session reuse the
> same Catalog instance or build a new one for later usage? If we reuse the
> same Catalog, I think it's more like lazy initialization. I am a
> little prone to rebuild a new one because it's easier for us to catalog jar
> hot updates.
>
> 2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this
> case, do we need to instantiate the Catalog immediately or defer to the
> usage?
>
> Best,
> Shengkai
>
> Feng Jin  于2023年2月9日周四 20:13写道:
>
> > Thanks for your reply.
> >
> > @Timo
> >
> > >  2) avoid  the default in-memory catalog and offer their catalog before
> > a  TableEnvironment session starts
> > >  3) whether this can be disabled and SHOW CATALOGS  can be used for
> > listing first without having a default catalog.
> >
> >
> > Regarding 2 and 3, I think this problem can be solved by introducing
> > catalog providers, and users can control some default catalog
> > behavior.
> >
> >
> > > We could also use the org.apache.flink.table.factories.Factory infra
> > and  allow catalog providers via pure string properties
> >
> > I think this is also very useful. In our usage scenarios, it is
> > usually multi-cluster management, and it is also necessary to pass
> > different configurations through parameters.
> >
> >
> > @Jark @Huang
> >
> > >  About the lazy catalog initialization
> >
> > Our needs may be different. If these properties already exist in an
> > external system, especially when there may be thousands of these
> > catalog properties, I don’t think it is necessary to register all
> > these properties in the Flink env at startup, but we need is that we
> > can register a catalog  when it needs and we can get the properties
> > from the external meta system .
> >
> >
> > >  It may be hard to avoid conflicts  and duplicates between
> > CatalogProvider and CatalogManager
> >
> > It is indeed easy to conflict. My idea is that if we separate the
> > catalog management of the current CatalogManager as the default
> > CatalogProvider behavior, at the same time, only one CatalogProvider
> > exists in a Flink Env.  This may avoid catalog conflicts.
> >
> >
> > Best,
> > Feng
> >
> > On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan  wrote:
> > >
> > > Hi Feng,
> > > I agree with what Jark said. I think what you are looking for is lazy
> > > initialization.
> > >
> > > I don't think we should introduce the new interface CatalogProvider for
> > > lazy initialization. What we should do is to store the catalog
> properties
> > > and initialize the catalog when we need it. Could you please introduce
> > some
> > > other scenarios that we need the CatalogProvider besides the lazy
> > > initialization?
> > >
> > > If we really need the CatalogProvider, I think it is better to be a
> > single
> > > instance. Multiple instances are difficult to manage and there are name
> > > conflicts among providers.
> > >
> > > Best,
> > > Hang
> > >
> > > Jark Wu  于2023年2月7日周二 10:48写道:
> > >
> > > > Hi Feng,
> > > >
> > > > I think this feature makes a lot of sense. If I understand correctly,
> > what
> > > > you are looking for is lazy catalog initialization.
> > > >
> > > > However, I have some concerns about introducing CatalogProvider,
> which
> > > > delegates catalog management to users. It may be hard to avoid
> > conflicts
> > > > and duplicates between CatalogProvider and CatalogManager. Is it
> > possible
> > > > to have a built-in CatalogProvider to instantiate catalogs lazily?
> > > >
> > > > An idea in my mind is to introduce another catalog registration API
> > > > without instantiating the catalog, e.g., registerCatalog(String
> > > > catalogName, Map catalogProperties). The catalog
> > > > information is stored in CatalogManager as pure strings. The 

[jira] [Created] (FLINK-31007) The code generated by the IF function throws NullPointerException

2023-02-09 Thread tivanli (Jira)
tivanli created FLINK-31007:
---

 Summary: The code generated by the IF function throws 
NullPointerException
 Key: FLINK-31007
 URL: https://issues.apache.org/jira/browse/FLINK-31007
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.15.3, 1.15.2
 Environment: {code:java}
// code placeholder
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

final DataStream tab =
env.fromCollection(Arrays.asList(
new Tuple2<>(1L, "a_b_c"),
new Tuple2<>(-1L, "a_b_c")));

final Table tableA = tableEnv.fromDataStream(tab);

tableEnv.executeSql("SELECT if(f0 = -1, '', split_index(f1, '_', 0)) as id FROM 
" + tableA)
.print(); {code}
Reporter: tivanli


Caused by: java.lang.NullPointerException
    at StreamExecCalc$19.processElement_split1(Unknown Source)
    at StreamExecCalc$19.processElement(Unknown Source)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at 
org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
    at 
org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:231)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-289: Support online inference (Flink ML)

2023-02-09 Thread Dian Fu
Hi Dong,

Thanks for the reply. Got your points and it makes sense to me now~

Regards,
Dian

On Thu, Feb 9, 2023 at 3:45 PM Dong Lin  wrote:

> Hi Dian,
>
> Thanks for the review! Please see my reply inline.
>
> Regards,
> Dong
>
> On Wed, Feb 8, 2023 at 11:58 AM Dian Fu  wrote:
>
> > Hi Dong,
> >
> >
> > Thanks for driving this effort! This FLIP LGTM overall. I have just a few
> > minor comments regarding the proposed API:
> >
> > 1) For the method `DataFrame.collect()`, why is it named `collect`
> instead
> > of something else, e.g. `get`? Does it mean that the result will be
> > computed during this method call?
> >
>
> I have chosen the names to be closer to the names of similar classes or
> concepts in the existing popular open-source projects.
>
> The reasons for using `collect` include:
> - mleap is a popular machine learning serving framework and it uses
> LeapFrame#collect()
> <
> https://github.com/combust/mleap/blob/master/mleap-runtime/src/main/scala/ml/combust/mleap/runtime/frame/LeapFrame.scala#L13
> >
> to represent this concept.
> - Collect rows from table seems close to the concept of
> TableResult#collect() in Flink.
> - The method returns a *collection* of rows from this table, which makes
> `collect()` is bit more relevant than `get()`.
>
> Currently, I expect DataFrame.collect() to just return the values already
> computed before this method is called.
>
>
> > 2) For the method `DataFrame.addColumn`, when will this method be used?
> >
>
> For example, we might want to implement KMeansModelServable#transform
> that appends the prediction result to the input dataFrame. This is similar
> to the existing way that KMeansModel#transform
> <
> https://github.com/apache/flink-ml/blob/master/flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/kmeans/KMeansModel.java#L84
> >
> appends
> the result to the input Table. Note that we probably don't want to copy the
> entire input DataFrame to the outpuDataFrame.
>
> We will need to use DataFrame.addColumn to append the prediction result in
> this case.
>
>
> > 3) In the example `runOnlineInferenceOnWebServer`, it uses
> > `output_df.getDataType("output")` to get the result type. It's not quite
> > intuitive. Does it make sense to add a method `getDataType()` in
> DataFrame?
>
>
> I guess you are asking whether we should just have a method getDataType()
> that returns types of all columns of the dataframe.
>
> Here is a scenario where users and algorithms will read only selected
> columns from DataFrame:
> - Users provided a DataFrame with columnA and columnB
> - The 1st algorithm will read columnA and columnB, then computes/appends
> columnC to the DataFrame.
> - The 2nd algorithm will read columnB and columnC, then computes/appends
> columnD to the DataFrame.
> - Users need to read columnD in the final DataFrame.
>
>
> > Regards,
> > Dian
> >
> >
> >
> > On Tue, Feb 7, 2023 at 12:37 PM Dong Lin  wrote:
> >
> > > Hi all,
> > >
> > > If there is no question related to this FLIP, we will start the voting
> > > thread on 2/10.
> > >
> > > Regards,
> > > Dong
> > >
> > > On Wed, Feb 1, 2023 at 8:38 PM Dong Lin  wrote:
> > >
> > > > Hi all,
> > > >
> > > > Fan, Jiang, Zhipeng, and I have created FLIP-289: Support online
> > > inference
> > > > (Flink ML).
> > > >
> > > > The goal of this FLIP is to enable users to use the model trained by
> > > Flink
> > > > ML to do online inference. More details can be found in the FLIP doc
> at
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240881268
> > > > .
> > > >
> > > > We are looking forward to your comments.
> > > >
> > > > Regards,
> > > > Dong
> > > >
> > >
> >
>


[jira] [Created] (FLINK-31006) job is not finished when using pipeline mode to run bounded source like kafka/pulsar

2023-02-09 Thread jackylau (Jira)
jackylau created FLINK-31006:


 Summary: job is not finished when using pipeline mode to run 
bounded source like kafka/pulsar
 Key: FLINK-31006
 URL: https://issues.apache.org/jira/browse/FLINK-31006
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Connectors / Pulsar
Affects Versions: 1.17.0
Reporter: jackylau
 Fix For: 1.17.0
 Attachments: image-2023-02-10-13-20-52-890.png, 
image-2023-02-10-13-23-38-430.png, image-2023-02-10-13-24-46-929.png

when i do failover works like kill jm/tm when using  pipeline mode to run 
bounded source like kafka, i found job is not finished, when every partition 
data has consumed.

 

After dig into code, i found this logical not run when JM recover. the 
partition infos are not changed. so noMoreNewPartitionSplits is not set to 
true. then this will not run 

 

!image-2023-02-10-13-23-38-430.png!

 

!image-2023-02-10-13-24-46-929.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Confusion about some overlapping functionality of SupportsProjectionPushDown and SupportsReadingMetadata

2023-02-09 Thread Shengkai Fang
Hi, Ran.

I think it's a little difficult to split the rule into two parts. Because
the ProjectDown and ReadingMetadata both need to reorder the fields. The
ReadingMetadata requires the metadata columns to be at the last and the
ProjectPushDown now is responsible to reorder the columns as the user
specified.

One more concern, the push-down optimization occurs in the logical phase
that uses the volcano planner. I am not sure whether the rule will be
applied at last because metadata push-down doesn't change the cost.

Best,
Shengkai


[jira] [Created] (FLINK-31005) Release Testing: Verify FLIP-281 Supports speculative execution of sinks

2023-02-09 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-31005:
---

 Summary: Release Testing: Verify FLIP-281 Supports speculative 
execution of sinks
 Key: FLINK-31005
 URL: https://issues.apache.org/jira/browse/FLINK-31005
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.17.0


This task aims to verify [FLIP-281 Supports speculative execution of 
sinks|https://issues.apache.org/jira/browse/FLINK-30725].
The documentation can be found 
[here|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution]
 .

Things to verify:
1. If a sink implements the decorative interface 
{{SupportsConcurrentExecutionAttempts}, Speculative executions can be 
performed, otherwise not. Sinks to verify includes SinkFunction, OutputFormat 
and Sink(V2).
2. These built-in sinks supports speculative execution: DiscardingSink, 
PrintSinkFunction, PrintSink, FileSink, FileSystemOutputFormat, HiveTableSink

If it's hard to construct a case that speculative execution would happen, 
especially for those built-in sinks, the speculative execution configuration 
can be tuned to allow it easier to happen, e.g. set 
{{slow-task-detector.execution-time.baseline-lower-bound}} and 
{{slow-task-detector.execution-time.baseline-ratio}} to {{0}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31004) Introduce data input and output stream for table store

2023-02-09 Thread Shammon (Jira)
Shammon created FLINK-31004:
---

 Summary: Introduce data input and output stream for table store
 Key: FLINK-31004
 URL: https://issues.apache.org/jira/browse/FLINK-31004
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: Shammon


Introduce data input/output stream for table store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31003) Flink SQL IF / CASE WHEN Funcation incorrect

2023-02-09 Thread weiqinpan (Jira)
weiqinpan created FLINK-31003:
-

 Summary: Flink SQL IF / CASE WHEN Funcation incorrect
 Key: FLINK-31003
 URL: https://issues.apache.org/jira/browse/FLINK-31003
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.16.1, 1.15.3, 1.15.2, 1.16.0, 1.15.1, 1.15.0
Reporter: weiqinpan


When I execute the below sql using sql-client,i found something wrong.

 
{code:java}
CREATE TEMPORARY TABLE source (
  mktgmsg_biz_type STRING,
  marketing_flow_id STRING,
  mktgmsg_campaign_id STRING
)
WITH
(
  'connector' = 'filesystem',
  'path' = 'file:///Users/xxx/Desktop/demo.json',
  'format' = 'json'
); 

-- return correct value('marketing_flow_id') 
SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM source;

-- return incorrect value('')
SELECT IF(`marketing_flow_id` IS  NULL, '', `marketing_flow_id`) FROM 
source;{code}
The demo.json data is 

 
{code:java}
{"mktgmsg_biz_type": "marketing_flow", "marketing_flow_id": 
"marketing_flow_id", "mktgmsg_campaign_id": "mktgmsg_campaign_id"} {code}
 

 

BTW, use case when + if / ifnull also have something wrong.

 
{code:java}
-- return wrong value(''), expect return marketing_flow_id
select CASE
  WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN IF(`marketing_flow_id` IS 
NULL, `marketing_flow_id`, '')
  WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN IF(`mktgmsg_campaign_id` 
IS NULL, '', `mktgmsg_campaign_id`)
  ELSE ''
  END AS `message_campaign_instance_id` FROM source;

-- return wrong value('')
select CASE
  WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
IFNULL(`marketing_flow_id`, '')
  WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
IFNULL(`mktgmsg_campaign_id`, '')
  ELSE ''
  END AS `message_campaign_instance_id` FROM source;

-- return correct value, the difference is [else return ' ']
select CASE
  WHEN `mktgmsg_biz_type` = 'marketing_flow'     THEN 
IFNULL(`marketing_flow_id`, '')
  WHEN `mktgmsg_biz_type` = 'mktgmsg_campaign'   THEN 
IFNULL(`mktgmsg_campaign_id`, '')
  ELSE ' '
  END AS `message_campaign_instance_id` FROM source;
{code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

2023-02-09 Thread Shengkai Fang
Hi Feng.

I think your idea is very interesting!

1. I just wonder after initializing the Catalog, will the Session reuse the
same Catalog instance or build a new one for later usage? If we reuse the
same Catalog, I think it's more like lazy initialization. I am a
little prone to rebuild a new one because it's easier for us to catalog jar
hot updates.

2. Users use the `CREATE CATALOG` statement in the CatalogManager. In this
case, do we need to instantiate the Catalog immediately or defer to the
usage?

Best,
Shengkai

Feng Jin  于2023年2月9日周四 20:13写道:

> Thanks for your reply.
>
> @Timo
>
> >  2) avoid  the default in-memory catalog and offer their catalog before
> a  TableEnvironment session starts
> >  3) whether this can be disabled and SHOW CATALOGS  can be used for
> listing first without having a default catalog.
>
>
> Regarding 2 and 3, I think this problem can be solved by introducing
> catalog providers, and users can control some default catalog
> behavior.
>
>
> > We could also use the org.apache.flink.table.factories.Factory infra
> and  allow catalog providers via pure string properties
>
> I think this is also very useful. In our usage scenarios, it is
> usually multi-cluster management, and it is also necessary to pass
> different configurations through parameters.
>
>
> @Jark @Huang
>
> >  About the lazy catalog initialization
>
> Our needs may be different. If these properties already exist in an
> external system, especially when there may be thousands of these
> catalog properties, I don’t think it is necessary to register all
> these properties in the Flink env at startup, but we need is that we
> can register a catalog  when it needs and we can get the properties
> from the external meta system .
>
>
> >  It may be hard to avoid conflicts  and duplicates between
> CatalogProvider and CatalogManager
>
> It is indeed easy to conflict. My idea is that if we separate the
> catalog management of the current CatalogManager as the default
> CatalogProvider behavior, at the same time, only one CatalogProvider
> exists in a Flink Env.  This may avoid catalog conflicts.
>
>
> Best,
> Feng
>
> On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan  wrote:
> >
> > Hi Feng,
> > I agree with what Jark said. I think what you are looking for is lazy
> > initialization.
> >
> > I don't think we should introduce the new interface CatalogProvider for
> > lazy initialization. What we should do is to store the catalog properties
> > and initialize the catalog when we need it. Could you please introduce
> some
> > other scenarios that we need the CatalogProvider besides the lazy
> > initialization?
> >
> > If we really need the CatalogProvider, I think it is better to be a
> single
> > instance. Multiple instances are difficult to manage and there are name
> > conflicts among providers.
> >
> > Best,
> > Hang
> >
> > Jark Wu  于2023年2月7日周二 10:48写道:
> >
> > > Hi Feng,
> > >
> > > I think this feature makes a lot of sense. If I understand correctly,
> what
> > > you are looking for is lazy catalog initialization.
> > >
> > > However, I have some concerns about introducing CatalogProvider, which
> > > delegates catalog management to users. It may be hard to avoid
> conflicts
> > > and duplicates between CatalogProvider and CatalogManager. Is it
> possible
> > > to have a built-in CatalogProvider to instantiate catalogs lazily?
> > >
> > > An idea in my mind is to introduce another catalog registration API
> > > without instantiating the catalog, e.g., registerCatalog(String
> > > catalogName, Map catalogProperties). The catalog
> > > information is stored in CatalogManager as pure strings. The catalog is
> > > instantiated and initialized when used.
> > >
> > > This new API is very similar to other pure-string metadata
> registration,
> > > such as "createTable(String path, TableDescriptor descriptor)" and
> > > "createFunction(String path, String className, List
> > > resourceUris)".
> > >
> > > Can this approach satisfy your requirement?
> > >
> > > Best,
> > > Jark
> > >
> > > On Mon, 6 Feb 2023 at 22:53, Timo Walther  wrote:
> > >
> > > > Hi Feng,
> > > >
> > > > this is indeed a good proposal.
> > > >
> > > > 1) It makes sense to improve the catalog listing for platform
> providers.
> > > >
> > > > 2) Other feedback from the past has shown that users would like to
> avoid
> > > > the default in-memory catalog and offer their catalog before a
> > > > TableEnvironment session starts.
> > > >
> > > > 3) Also we might reconsider whether a default catalog and default
> > > > database make sense. Or whether this can be disabled and SHOW
> CATALOGS
> > > > can be used for listing first without having a default catalog.
> > > >
> > > > What do you think about option 2 and 3?
> > > >
> > > > In any case, I would propose we pass a CatalogProvider to
> > > > EnvironmentSettings and only allow a single instance. Catalogs should
> > > > never shadow other catalogs.
> > > >
> > > > We could also use the 

[jira] [Created] (FLINK-31002) Provide data sampling query

2023-02-09 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-31002:


 Summary: Provide data sampling query
 Key: FLINK-31002
 URL: https://issues.apache.org/jira/browse/FLINK-31002
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Reporter: Jingsong Lee


Want to take several randomly from each partition, but the limit is always 
fixed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31001) Introduce Hive writer

2023-02-09 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-31001:


 Summary: Introduce Hive writer
 Key: FLINK-31001
 URL: https://issues.apache.org/jira/browse/FLINK-31001
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Reporter: Jingsong Lee






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31000) Upgrade test units in flink-table-store-common to junit5

2023-02-09 Thread Shammon (Jira)
Shammon created FLINK-31000:
---

 Summary: Upgrade test units in flink-table-store-common to junit5
 Key: FLINK-31000
 URL: https://issues.apache.org/jira/browse/FLINK-31000
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: Shammon






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30999) Introduce flink-table-store-test-utils for table store

2023-02-09 Thread Shammon (Jira)
Shammon created FLINK-30999:
---

 Summary: Introduce flink-table-store-test-utils for table store
 Key: FLINK-30999
 URL: https://issues.apache.org/jira/browse/FLINK-30999
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: Shammon


Introduce flink-table-store-test-utils module for table store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Release flink-connector-kafka, release candidate #1

2023-02-09 Thread Tzu-Li (Gordon) Tai
+1 (binding)

- Verified legals (license headers and root LICENSE / NOTICE file). AFAICT
no dependencies require explicit acknowledgement in the NOTICE files.
- No binaries in staging area
- Built source with tests
- Verified signatures and hashes
- Web PR changes LGTM

Thanks Martijn!

Cheers,
Gordon

On Mon, Feb 6, 2023 at 6:12 PM Mason Chen  wrote:

> That makes sense, thanks for the clarification!
>
> Best,
> Mason
>
> On Wed, Feb 1, 2023 at 7:16 AM Martijn Visser 
> wrote:
>
> > Hi Mason,
> >
> > Thanks, [4] is indeed a copy-paste error and you've made the right
> > assumption that
> >
> >
> https://repository.apache.org/content/repositories/orgapacheflink-1582/org/apache/flink/
> > is the correct maven central link.
> >
> > I think we should use FLINK-30052 to move the Kafka connector code from
> the
> > 1.17 release also over the Kafka connector repo (especially since there's
> > now a v3.0 branch for the Kafka connector, so it can be merged in main).
> > When those commits have been merged, we can make a next Kafka connector
> > release (which is equivalent to the 1.17 release, which can only be done
> > when 1.17 is done because of the split level watermark alignment) and
> then
> > FLINK-30859 can be finished.
> >
> > Best regards,
> >
> > Martijn
> >
> > Op wo 1 feb. 2023 om 09:16 schreef Mason Chen :
> >
> > > +1 (non-binding)
> > >
> > > * Verified hashes and signatures
> > > * Verified no binaries
> > > * Verified LICENSE and NOTICE files
> > > * Verified poms point to 3.0.0-1.16
> > > * Reviewed web PR
> > > * Built from source
> > > * Verified git tag
> > >
> > > I think [4] your is a copy-paste error and I did all the verification
> > > assuming that
> > >
> > >
> >
> https://repository.apache.org/content/repositories/orgapacheflink-1582/org/apache/flink/
> > > is the correct maven central link.
> > >
> > > Regarding the release notes, should we close
> > > https://issues.apache.org/jira/browse/FLINK-30052 and link it there?
> > I've
> > > created https://issues.apache.org/jira/browse/FLINK-30859 to remove
> the
> > > existing code from the master branch.
> > >
> > > Best,
> > > Mason
> > >
> > > On Tue, Jan 31, 2023 at 6:23 AM Martijn Visser <
> martijnvis...@apache.org
> > >
> > > wrote:
> > >
> > > > Hi everyone,
> > > > Please review and vote on the release candidate #1 for
> > > > flink-connector-kafka version 3.0.0, as follows:
> > > > [ ] +1, Approve the release
> > > > [ ] -1, Do not approve the release (please provide specific comments)
> > > >
> > > > Note: this is the same code as the Kafka connector for the Flink 1.16
> > > > release.
> > > >
> > > > The complete staging area is available for your review, which
> includes:
> > > > * JIRA release notes [1],
> > > > * the official Apache source release to be deployed to
> dist.apache.org
> > > > [2],
> > > > which are signed with the key with fingerprint
> > > > A5F3BCE4CBE993573EC5966A65321B8382B219AF [3],
> > > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > > * source code tag v3.0.0-rc1 [5],
> > > > * website pull request listing the new release [6].
> > > >
> > > > The vote will be open for at least 72 hours. It is adopted by
> majority
> > > > approval, with at least 3 PMC affirmative votes.
> > > >
> > > > Thanks,
> > > > Release Manager
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352577
> > > > [2]
> > > >
> > > >
> > >
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-kafka-3.0.0-rc1
> > > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > > [4]
> > > >
> > > >
> > >
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-kafka-3.0.0-rc1/
> > > > [5]
> > > >
> > https://github.com/apache/flink-connector-kafka/releases/tag/v3.0.0-rc1
> > > > [6] https://github.com/apache/flink-web/pull/606
> > > >
> > >
> >
>


[jira] [Created] (FLINK-30998) Add optional exception handler to flink-connector-opensearch

2023-02-09 Thread Leonid Ilyevsky (Jira)
Leonid Ilyevsky created FLINK-30998:
---

 Summary: Add optional exception handler to 
flink-connector-opensearch
 Key: FLINK-30998
 URL: https://issues.apache.org/jira/browse/FLINK-30998
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Opensearch
Affects Versions: 1.16.1
Reporter: Leonid Ilyevsky


Currently, when there is a failure coming from Opensearch, the 
FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
This makes the Flink pipeline fail. There is no way to handle the exception in 
the client code.

I suggest to add an option to set a failure handler, similar to the way it is 
done in elasticsearch connector. This way the client code has a chance to 
examine the failure and handle it.

Here is the use case example when it will be very useful. We are using streams 
on Opensearch side, and we are setting our own document IDs. Sometimes these 
IDs are duplicated; we need to ignore this situation and continue (this way it 
works for us with Elastisearch).
However, with opensearch connector, the error comes back, saying that the batch 
failed (even though most of the documents were indexed, only the ones with 
duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30997) Refactor tests in connector to extends AbstractTestBase

2023-02-09 Thread Shammon (Jira)
Shammon created FLINK-30997:
---

 Summary: Refactor tests in connector to extends AbstractTestBase
 Key: FLINK-30997
 URL: https://issues.apache.org/jira/browse/FLINK-30997
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: Shammon


Refactor tests in connector to extends `AbstractTestBase`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30996) Sync Kafka 1.17 commits from apache/flink repo to flink-connector-kafka

2023-02-09 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-30996:
--

 Summary: Sync Kafka 1.17 commits from apache/flink repo to 
flink-connector-kafka
 Key: FLINK-30996
 URL: https://issues.apache.org/jira/browse/FLINK-30996
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka
Affects Versions: kafka-4.0.0
Reporter: Martijn Visser
Assignee: Martijn Visser






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30995) Introduce ByteSerializer for Table Store

2023-02-09 Thread Feng Wang (Jira)
Feng Wang created FLINK-30995:
-

 Summary: Introduce ByteSerializer for Table Store
 Key: FLINK-30995
 URL: https://issues.apache.org/jira/browse/FLINK-30995
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Reporter: Feng Wang
 Fix For: table-store-0.4.0


Introduce ByteSerializer for Table Store{+}{+}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30993) Introduce FloatSerializer for Table Store

2023-02-09 Thread Feng Wang (Jira)
Feng Wang created FLINK-30993:
-

 Summary: Introduce FloatSerializer for Table Store
 Key: FLINK-30993
 URL: https://issues.apache.org/jira/browse/FLINK-30993
 Project: Flink
  Issue Type: New Feature
Reporter: Feng Wang
 Fix For: table-store-0.4.0


Introduce FloatSerializer for Table Store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30994) Introduce DoubleSerializer for Table Store

2023-02-09 Thread Feng Wang (Jira)
Feng Wang created FLINK-30994:
-

 Summary: Introduce DoubleSerializer for Table Store
 Key: FLINK-30994
 URL: https://issues.apache.org/jira/browse/FLINK-30994
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Reporter: Feng Wang
 Fix For: table-store-0.4.0


Introduce DoubleSerializer for Table Store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30992) Introduce ShortSerializer for Table Store

2023-02-09 Thread Feng Wang (Jira)
Feng Wang created FLINK-30992:
-

 Summary: Introduce ShortSerializer for Table Store
 Key: FLINK-30992
 URL: https://issues.apache.org/jira/browse/FLINK-30992
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Reporter: Feng Wang
 Fix For: table-store-0.4.0


Introduce ShortSerializer for Table Store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30991) Introduce LongSerializer for Table Store

2023-02-09 Thread Feng Wang (Jira)
Feng Wang created FLINK-30991:
-

 Summary: Introduce LongSerializer for Table Store
 Key: FLINK-30991
 URL: https://issues.apache.org/jira/browse/FLINK-30991
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Reporter: Feng Wang
 Fix For: table-store-0.4.0


Introduce LongSerializer for Table Store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30990) Performance of has worsened DownstreamTasks.BATCH

2023-02-09 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-30990:
--

 Summary: Performance of has worsened DownstreamTasks.BATCH
 Key: FLINK-30990
 URL: https://issues.apache.org/jira/browse/FLINK-30990
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.17.0
Reporter: Martijn Visser


See 
http://codespeed.dak8s.net:8000/timeline/#/?exe=8=deployDownstreamTasks.BATCH=on=on=off=2=200

It appears that the trend has now stabilized upwards, from 60-70 ms/op to 80-90 
ms/op, while less is better. 

Not sure if this has meaningful impact in a production setup, but I would like 
to get this verified. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Fwd: [FLINK-30596] Duplicate jobs on /run with the same jobId

2023-02-09 Thread Chesnay Schepler
FYI, the original mail never actually made it to the dev mailing list 
(maybe got stuck in moderation queue).


On 08/02/2023 18:21, Mohsen Rezaei wrote:

Hi,

I was hoping to resurface this issue given that it affects any application
using the REST endpoints to run/submit jobs, causing confusion on the
actual state of the submitted jobs in a cluster.

I've created a PR against the "master" branch, and I'd like to be able to
port it back to 1.16.2 and 1.15.4 to close the loop on the issue for the
last three major releases:
https://github.com/apache/flink/pull/21849

Cheers,
Mohsen Rezaei

-- Forwarded message -
From: Mohsen Rezaei 
Date: Sat, Jan 21, 2023 at 5:58 PM
Subject: Duplicate jobs on /run with the same jobId
To: 


Hi everyone,

I have run into an issue with the Dispatcher's runJob() that accepts
multiple jobs and starts them with the same jobId.

The filtering fix in waitForTerminatingJob [1] fixes the issue, but I need
a consensus from devs to ensure this is the proper fix, before submitting a
PR.

Cheers,
Mohsen Rezaei

[1] https://issues.apache.org/jira/browse/FLINK-30596





Re: [Discuss] :Introduce Catalog dynamic registration in flink catalog manager.

2023-02-09 Thread Feng Jin
Thanks for your reply.

@Timo

>  2) avoid  the default in-memory catalog and offer their catalog before a  
> TableEnvironment session starts
>  3) whether this can be disabled and SHOW CATALOGS  can be used for listing 
> first without having a default catalog.


Regarding 2 and 3, I think this problem can be solved by introducing
catalog providers, and users can control some default catalog
behavior.


> We could also use the org.apache.flink.table.factories.Factory infra and  
> allow catalog providers via pure string properties

I think this is also very useful. In our usage scenarios, it is
usually multi-cluster management, and it is also necessary to pass
different configurations through parameters.


@Jark @Huang

>  About the lazy catalog initialization

Our needs may be different. If these properties already exist in an
external system, especially when there may be thousands of these
catalog properties, I don’t think it is necessary to register all
these properties in the Flink env at startup, but we need is that we
can register a catalog  when it needs and we can get the properties
from the external meta system .


>  It may be hard to avoid conflicts  and duplicates between CatalogProvider 
> and CatalogManager

It is indeed easy to conflict. My idea is that if we separate the
catalog management of the current CatalogManager as the default
CatalogProvider behavior, at the same time, only one CatalogProvider
exists in a Flink Env.  This may avoid catalog conflicts.


Best,
Feng

On Tue, Feb 7, 2023 at 1:01 PM Hang Ruan  wrote:
>
> Hi Feng,
> I agree with what Jark said. I think what you are looking for is lazy
> initialization.
>
> I don't think we should introduce the new interface CatalogProvider for
> lazy initialization. What we should do is to store the catalog properties
> and initialize the catalog when we need it. Could you please introduce some
> other scenarios that we need the CatalogProvider besides the lazy
> initialization?
>
> If we really need the CatalogProvider, I think it is better to be a single
> instance. Multiple instances are difficult to manage and there are name
> conflicts among providers.
>
> Best,
> Hang
>
> Jark Wu  于2023年2月7日周二 10:48写道:
>
> > Hi Feng,
> >
> > I think this feature makes a lot of sense. If I understand correctly, what
> > you are looking for is lazy catalog initialization.
> >
> > However, I have some concerns about introducing CatalogProvider, which
> > delegates catalog management to users. It may be hard to avoid conflicts
> > and duplicates between CatalogProvider and CatalogManager. Is it possible
> > to have a built-in CatalogProvider to instantiate catalogs lazily?
> >
> > An idea in my mind is to introduce another catalog registration API
> > without instantiating the catalog, e.g., registerCatalog(String
> > catalogName, Map catalogProperties). The catalog
> > information is stored in CatalogManager as pure strings. The catalog is
> > instantiated and initialized when used.
> >
> > This new API is very similar to other pure-string metadata registration,
> > such as "createTable(String path, TableDescriptor descriptor)" and
> > "createFunction(String path, String className, List
> > resourceUris)".
> >
> > Can this approach satisfy your requirement?
> >
> > Best,
> > Jark
> >
> > On Mon, 6 Feb 2023 at 22:53, Timo Walther  wrote:
> >
> > > Hi Feng,
> > >
> > > this is indeed a good proposal.
> > >
> > > 1) It makes sense to improve the catalog listing for platform providers.
> > >
> > > 2) Other feedback from the past has shown that users would like to avoid
> > > the default in-memory catalog and offer their catalog before a
> > > TableEnvironment session starts.
> > >
> > > 3) Also we might reconsider whether a default catalog and default
> > > database make sense. Or whether this can be disabled and SHOW CATALOGS
> > > can be used for listing first without having a default catalog.
> > >
> > > What do you think about option 2 and 3?
> > >
> > > In any case, I would propose we pass a CatalogProvider to
> > > EnvironmentSettings and only allow a single instance. Catalogs should
> > > never shadow other catalogs.
> > >
> > > We could also use the org.apache.flink.table.factories.Factory infra and
> > > allow catalog providers via pure string properties. Not sure if we need
> > > this in the first version though.
> > >
> > > Cheers,
> > > Timo
> > >
> > >
> > > On 06.02.23 11:21, Feng Jin wrote:
> > > > Hi everyone,
> > > >
> > > > The original discussion address is
> > > > https://issues.apache.org/jira/browse/FLINK-30126
> > > >
> > > > Currently, Flink has access to many systems, including kafka, hive,
> > > > iceberg, hudi, elasticsearch, mysql...  The corresponding catalog name
> > > > might be:
> > > > kafka_cluster1, kafka_cluster2, hive_cluster1, hive_cluster2,
> > > > iceberg_cluster2, elasticsearch_cluster1,  mysql_database1_xxx,
> > > > mysql_database2_
> > > >
> > > > As the platform of the Flink SQL job, we need to maintain 

Fwd: [FLINK-30596] Duplicate jobs on /run with the same jobId

2023-02-09 Thread Mohsen Rezaei
Hi,

I was hoping to resurface this issue given that it affects any application
using the REST endpoints to run/submit jobs, causing confusion on the
actual state of the submitted jobs in a cluster.

I've created a PR against the "master" branch, and I'd like to be able to
port it back to 1.16.2 and 1.15.4 to close the loop on the issue for the
last three major releases:
https://github.com/apache/flink/pull/21849

Cheers,
Mohsen Rezaei

-- Forwarded message -
From: Mohsen Rezaei 
Date: Sat, Jan 21, 2023 at 5:58 PM
Subject: Duplicate jobs on /run with the same jobId
To: 


Hi everyone,

I have run into an issue with the Dispatcher's runJob() that accepts
multiple jobs and starts them with the same jobId.

The filtering fix in waitForTerminatingJob [1] fixes the issue, but I need
a consensus from devs to ensure this is the proper fix, before submitting a
PR.

Cheers,
Mohsen Rezaei

[1] https://issues.apache.org/jira/browse/FLINK-30596


[jira] [Created] (FLINK-30989) Configuration table.exec.spill-compression.block-size not take effect in batch job

2023-02-09 Thread shen (Jira)
shen created FLINK-30989:


 Summary: Configuration table.exec.spill-compression.block-size not 
take effect in batch job
 Key: FLINK-30989
 URL: https://issues.apache.org/jira/browse/FLINK-30989
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream, Runtime / Configuration
Affects Versions: 1.16.1
Reporter: shen
 Attachments: image-2023-02-09-19-37-44-927.png

h1. Description

I tried to config table.exec.spill-compression.block-size in TableEnv in my job 
and failed. I  attached to TaskManager and found conf passed to constructor of 
[BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204]
 is empty:

!image-2023-02-09-19-37-44-927.png|width=306,height=185!
h1. How to reproduce

A simple code to reproduce this problem:
{code:java}
// App.java

package test.flink403;

import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;

import java.util.Arrays; public class App {

  public static void main(String argc[]) throws Exception {

Configuration config = new Configuration();
config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, 
true);
config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true);
config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); // 
< cannot take effect
config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f));
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1, config);

final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 
m"); // < cannot take effect
final DataStream orderA =
env.fromCollection(
Arrays.asList(
new Order(1L, "beer", 3),
new Order(1L, "diaper", 4),
new Order(3L, "rubber", 2)));

final Table tableA = tableEnv.fromDataStream(orderA);

final Table result =
tableEnv.sqlQuery(
"SELECT * FROM "
+ tableA
+ " "
+ " order by user");

tableEnv.toDataStream(result, Order.class).print();
env.execute();
  }
}

// ---
// Order.java
package test.flink403;

public class Order {
  public Long user;
  public String product;
  public int amount;

  // for POJO detection in DataStream API
  public Order() {}

  // for structured type detection in Table API
  public Order(Long user, String product, int amount) {
this.user = user;
this.product = product;
this.amount = amount;
  }

  @Override
  public String toString() {
return "Order{"
+ "user="
+ user
+ ", product='"
+ product
+ '\''
+ ", amount="
+ amount
+ '}';
  }
}{code}
 

I think it is because 
[SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88]
 try to get conf from JobConfiguration, which should be set in JobGraph. 
Following are the Classes use the same method to get conf from JobConfiguration:
 * BinaryExternalSorter
 ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED
 ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES
 ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
 ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
 * BinaryHashTable,BaseHybridHashTable
 ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED
 ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE
 * SortDataInput
 ** AlgorithmOptions.SORT_SPILLING_THRESHOLD
 ** AlgorithmOptions.SPILLING_MAX_FAN
 ** AlgorithmOptions.USE_LARGE_RECORDS_HANDLER



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30988) Refactor E2E tests to get rid of managed table

2023-02-09 Thread yuzelin (Jira)
yuzelin created FLINK-30988:
---

 Summary: Refactor E2E tests to get rid of managed table
 Key: FLINK-30988
 URL: https://issues.apache.org/jira/browse/FLINK-30988
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: yuzelin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Confusion about some overlapping functionality of SupportsProjectionPushDown and SupportsReadingMetadata

2023-02-09 Thread Ran Tao
hi, yuxia, thanks for your feedback.

The point I am talking about here is that when implementing two interfaces
at the same time, the SupportsProjectionPushDown method can actually be
left blank, just use SupportsProjectionPushDown to mark it,
and then SupportsReadingMetadata can complete the physical column pushdown
and metadata processing at the same time.

Another point is that if the methods of two interfaces are implemented at
the same time,
the developer may have unexpected behaviors due to the sequence problem. Of
course I agree to update the java doc notes.

yuxia  于2023年2月9日周四 16:09写道:

> Hi, Ran Tao.
> Thanks for bring it up.
> TBH, to me, it's not as so confusing.
> Is that the fact that the applyReadableMetadata and applyProjection all
> will pass producedDataType and the source conneector developer will
> need to choose which one as the finnal output type?
>
> As the Java doc of applyReadableMetadata said, use the producedDataType in
> this method instead of applyProjection.
>
> For you question, I think the responsibilities of these two interfaces are
> quite independent. What kind of independence are you expecting?
>
> Btw, to aovid confusing, i think we may need to specific it that the
> applyProjection will be called before method applyReadableMetadata
> in the java doc of applyReadableMetadata.
>
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Ran Tao" 
> 收件人: "dev" 
> 发送时间: 星期三, 2023年 2 月 08日 下午 8:46:45
> 主题: Confusion about some overlapping functionality of
> SupportsProjectionPushDown and SupportsReadingMetadata
>
> Currently we use SupportsProjectionPushDown to push down physical columns,
> and SupportsReadingMetadata is used to read metadata columns.
> There is no problem when implementing one of the interfaces alone. If two
> interfaces are implemented at the same time, there will be confusing
> semantics.
>
> For example, if we update the schema or producedDataType in
> SupportsProjectionPushDown#applyProjection and
> SupportsReadingMetadata#applyReadableMetadata at the same time, the former
> is actually invalid, because the former is called first, and then the
> latter will overwrite it.
>
> There are some similar usage notes in the interface's documentation. But
> this is very confusing. In this case, you only need to implement
> SupportsReadingMetadata#applyReadableMetadata (only implement
> SupportsProjectionPushDown, the override method is empty), and the rule
> match logic of SupportsReadingMetadata will push down the physical column
> and metadata columns to generate producedDataType and return it.
>
> At this point SupportsProjectionPushDown is more like a marker interface.
> In addition, if some member variables are relied on in the implementation
> of SupportsReadingMetadata, and the member variables are also updated in
> SupportsProjectionPushDown, unexpected problems may occur. Developers
> should clearly read the implementation of these two interfaces and
> understand that these overlapping functions will cause a certain
> development cost to the developer of the connector (normally, the two
> interfaces should be isolated functions, developers see the meaning of the
> name ).
>
> I wonder if the community has considered making the responsibilities of
> these two interfaces more independent and clear in subsequent updates.
> Maybe my understanding is not very sufficient, looking forward to your
> opinions.
>
> --
> Best Regards,
> Ran Tao
> https://github.com/chucheng92
>


-- 
Best Regards,
Ran Tao
https://github.com/chucheng92


[jira] [Created] (FLINK-30987) output source exception for SocketStreamIterator

2023-02-09 Thread chenyuzhi (Jira)
chenyuzhi created FLINK-30987:
-

 Summary: output source exception for SocketStreamIterator
 Key: FLINK-30987
 URL: https://issues.apache.org/jira/browse/FLINK-30987
 Project: Flink
  Issue Type: Improvement
Reporter: chenyuzhi
 Attachments: image-2023-02-09-16-47-49-928.png

Sometime we could meet some error when using `

org.apache.flink.streaming.experimental.

SocketStreamIterator

` for testing or output.

 

Howerver, we can't got the source exception on the log info. May be we could 
throw the source exception directly

 

!image-2023-02-09-16-47-49-928.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30986) Refactor CreateTableITCase and DropTableITCase to get rid of managed table

2023-02-09 Thread yuzelin (Jira)
yuzelin created FLINK-30986:
---

 Summary: Refactor CreateTableITCase and DropTableITCase to get rid 
of managed table
 Key: FLINK-30986
 URL: https://issues.apache.org/jira/browse/FLINK-30986
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: yuzelin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Confusion about some overlapping functionality of SupportsProjectionPushDown and SupportsReadingMetadata

2023-02-09 Thread yuxia
Hi, Ran Tao.
Thanks for bring it up. 
TBH, to me, it's not as so confusing. 
Is that the fact that the applyReadableMetadata and applyProjection all will 
pass producedDataType and the source conneector developer will
need to choose which one as the finnal output type? 

As the Java doc of applyReadableMetadata said, use the producedDataType in this 
method instead of applyProjection.

For you question, I think the responsibilities of these two interfaces are 
quite independent. What kind of independence are you expecting?

Btw, to aovid confusing, i think we may need to specific it that the 
applyProjection will be called before method applyReadableMetadata
in the java doc of applyReadableMetadata.


Best regards,
Yuxia

- 原始邮件 -
发件人: "Ran Tao" 
收件人: "dev" 
发送时间: 星期三, 2023年 2 月 08日 下午 8:46:45
主题: Confusion about some overlapping functionality of 
SupportsProjectionPushDown and SupportsReadingMetadata

Currently we use SupportsProjectionPushDown to push down physical columns,
and SupportsReadingMetadata is used to read metadata columns.
There is no problem when implementing one of the interfaces alone. If two
interfaces are implemented at the same time, there will be confusing
semantics.

For example, if we update the schema or producedDataType in
SupportsProjectionPushDown#applyProjection and
SupportsReadingMetadata#applyReadableMetadata at the same time, the former
is actually invalid, because the former is called first, and then the
latter will overwrite it.

There are some similar usage notes in the interface's documentation. But
this is very confusing. In this case, you only need to implement
SupportsReadingMetadata#applyReadableMetadata (only implement
SupportsProjectionPushDown, the override method is empty), and the rule
match logic of SupportsReadingMetadata will push down the physical column
and metadata columns to generate producedDataType and return it.

At this point SupportsProjectionPushDown is more like a marker interface.
In addition, if some member variables are relied on in the implementation
of SupportsReadingMetadata, and the member variables are also updated in
SupportsProjectionPushDown, unexpected problems may occur. Developers
should clearly read the implementation of these two interfaces and
understand that these overlapping functions will cause a certain
development cost to the developer of the connector (normally, the two
interfaces should be isolated functions, developers see the meaning of the
name ).

I wonder if the community has considered making the responsibilities of
these two interfaces more independent and clear in subsequent updates.
Maybe my understanding is not very sufficient, looking forward to your
opinions.

-- 
Best Regards,
Ran Tao
https://github.com/chucheng92


[jira] [Created] (FLINK-30985) [Flink][table-store] Change the Splits allocation algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm.

2023-02-09 Thread ming li (Jira)
ming li created FLINK-30985:
---

 Summary: [Flink][table-store] Change the Splits allocation 
algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm.
 Key: FLINK-30985
 URL: https://issues.apache.org/jira/browse/FLINK-30985
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Reporter: ming li


Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in 
{{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the 
number of buckets is fixed, the order of traversal is also fixed.
{code:java}
private void assignSplits() {
bucketSplits.forEach(
(bucket, splits) -> {
if (splits.size() > 0) {
// To ensure the order of consumption, the data of the same 
bucket is given
// to a task to be consumed.
int task = bucket % context.currentParallelism();
if (readersAwaitingSplit.remove(task)) {
// if the reader that requested another split has 
failed in the
// meantime, remove
// it from the list of waiting readers
if (!context.registeredReaders().containsKey(task)) {
return;
}
context.assignSplit(splits.poll(), task);
}
}
});
}{code}
Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough 
split in each {{bucket}} , so that the first {{bucket}} will always be assigned 
to the task, and other buckets may not be consumed for a long time, resulting 
in uneven consumption and difficulty in advancing {{{}watermark{}}}. So I think 
we should change the split allocation algorithm to a fair algorithm.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30984) Remove explicit cast required by 3.1.x janino

2023-02-09 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-30984:
---

 Summary: Remove explicit cast required by 3.1.x janino 
 Key: FLINK-30984
 URL: https://issues.apache.org/jira/browse/FLINK-30984
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Runtime
Reporter: Sergey Nuyanzin


This is a follow up task.

Currently in 3.1.x Janino there is  
[https://github.com/janino-compiler/janino/issues/188] leading to fail several 
Flink tests. Once it is fixed on janino side WAs should be removed together 
with janino's update



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30983) the security.ssl.algorithms configuration does not take effect in rest ssl

2023-02-09 Thread luyuan (Jira)
luyuan created FLINK-30983:
--

 Summary: the security.ssl.algorithms configuration does not take 
effect in rest ssl
 Key: FLINK-30983
 URL: https://issues.apache.org/jira/browse/FLINK-30983
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.16.0
Reporter: luyuan
 Attachments: image-2023-02-09-15-58-36-254.png, 
image-2023-02-09-15-58-43-963.png

The security.ssl.algorithms configuration does not take effect in rest ssl.

 

SSLUtils#createRestNettySSLContext does not call SslContextBuilder#ciphers as  
SSLUtils#createInternalNettySSLContext.

!image-2023-02-09-15-58-36-254.png!

 

!image-2023-02-09-15-58-43-963.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)