[jira] [Assigned] (FLINK-22113) UniqueKey constraint is lost with multiple sources join in SQL

2021-04-14 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-22113:
---

Assignee: Xu Guangheng

> UniqueKey constraint is lost with multiple sources join in SQL
> --
>
> Key: FLINK-22113
> URL: https://issues.apache.org/jira/browse/FLINK-22113
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Fu Kai
>Assignee: Xu Guangheng
>Priority: Major
> Fix For: 1.14.0
>
>
> Hi team,
>   
>  We have a use case to join multiple data sources to generate a continuous 
> updated view. We defined primary key constraint on all the input sources and 
> all the keys are the subsets in the join condition. All joins are left join.
>   
>  In our case, the first two inputs can produce *JoinKeyContainsUniqueKey* 
> input sepc, which is good and performant. While when it comes to the third 
> input source, it's joined with the intermediate output table of the first two 
> input tables, and the intermediate table does not carry key constraint 
> information(although the thrid source input table does), so it results in a 
> *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance 
> implications per the[ Force Join Unique 
> Key|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651]
>  email thread, we want to know if there is any mitigation solution for this.
>  
> Example:
> Take the example from 
> [https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md]
> {code:java}
> CREATE TEMPORARY TABLE passengers (
>   passenger_key STRING,
>   first_name STRING,
>   last_name STRING,
>   update_time TIMESTAMP(3),
>   PRIMARY KEY (passenger_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'passengers',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE stations (
>   station_key STRING,
>   update_time TIMESTAMP(3),
>   city STRING,
>   PRIMARY KEY (station_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'stations',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE booking_channels (
>   booking_channel_key STRING,
>   update_time TIMESTAMP(3),
>   channel STRING,
>   PRIMARY KEY (booking_channel_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'booking_channels',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE train_activities (
>   scheduled_departure_time TIMESTAMP(3),
>   actual_departure_date TIMESTAMP(3),
>   passenger_key STRING,
>   origin_station_key STRING,
>   destination_station_key STRING,
>   booking_channel_key STRING,
>   PRIMARY KEY (booking_channel_key, origin_station_key, 
> destination_station_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'train_activities',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'json',
>   'value.format' = 'json'
> );
> SELECT 
>   t.actual_departure_date, 
>   p.first_name,
>   p.last_name,
>   b.channel, 
>   os.city AS origin_station,
>   ds.city AS destination_station
> FROM train_activities_1 t
> LEFT JOIN booking_channels b 
> ON t.booking_channel_key = b.booking_channel_key
> LEFT JOIN passengers p
> ON t.passenger_key = p.passenger_key
> LEFT JOIN stations os
> ON t.origin_station_key = os.station_key
> LEFT JOIN stations ds
> ON t.destination_station_key = ds.station_key
> {code}
>  
>  The query will generate exeuction plan of:
>  
> {code:java}
> Flink SQL> explain
> >  SELECT
> >t.actual_departure_date,
> >p.first_name,
> >p.last_name,
> >b.channel,
> >os.city AS origin_station,
> >ds.city AS destination_station
> >  FROM train_activities_1 t
> >  LEFT JOIN booking_channels b
> >  ON t.booking_channel_key = b.booking_channel_key
> >  LEFT JOIN passengers p
> >  ON t.passenger_key = p.passenger_key
> >  LEFT JOIN stations os
> >  ON t.origin_station_key = os.station_key
> >  LEFT JOIN stations ds
> >  ON t.destination_station_key = ds.station_key;
> == Abstract Syntax Tree ==
> LogicalProject(actual_departure_date=[$1], first_name=[$10], last_name=[$11], 
> channel=[$8], origin_station=[$15], destination_station=[$18])
> +- LogicalJoin(condition=[=($4, $16)], joinType=[left])
>:- LogicalJoin(condition=[=($3, $13)], joinType=[left])
>:  :- LogicalJoin(condition=[=($2, $9)], joinType=[left])
>:  :  :- LogicalJoin(condition=[=($5, $6)], joinType=[left])
>:  :  :  :- 

[jira] [Closed] (FLINK-22267) Savepoint an application for source of upsert-kafka, then restart the application from the savepoint, state not be recovered.

2021-04-14 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-22267.
---
Resolution: Not A Problem

> Savepoint an application for source of upsert-kafka, then restart the 
> application from the savepoint, state not be recovered.  
> ---
>
> Key: FLINK-22267
> URL: https://issues.apache.org/jira/browse/FLINK-22267
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.12.2
>Reporter: Carl
>Priority: Major
> Attachments: image-2021-04-14-11-17-00-207.png
>
>
> My operation is as follows:
> 1. Savepoint an application for source of upsert-kafka
> 2. Delete the upsert Kafka topic data
> 3. restart the application from the savepoint
> 4. Log shows that the offset has been restored, but the state has not been 
> restored
> What I want to confirm is that restart from savepoint in the source 
> upsert-kafka application not restore the state from savepoint state but 
> restore the state from earliest offset kafka message?
> If so, why reset offset:
> !image-2021-04-14-11-17-00-207.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22054) Using a shared watcher for ConfigMap watching

2021-04-14 Thread hayden zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321916#comment-17321916
 ] 

hayden zhou edited comment on FLINK-22054 at 4/15/21, 5:53 AM:
---

[~trohrmann] [~yittg] I can't submit batch jobs more than almost 60, I 
submitted one job every 5 minutes if it is watching connection is exhausted.  
did the connection did not release to pool when one batch job is closed?


was (Author: hayden zhou):
[~trohrmann] [~yittg] I can't submit batch jobs more than almost 60, I 
submitted one job every 5 minutes if it is watching connection is exhausted.  
did the connection did not close when one batch job is closed?

> Using a shared watcher for ConfigMap watching
> -
>
> Key: FLINK-22054
> URL: https://issues.apache.org/jira/browse/FLINK-22054
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.2, 1.13.0
>Reporter: Yi Tang
>Assignee: Yi Tang
>Priority: Major
>  Labels: k8s-ha, pull-request-available
> Fix For: 1.14.0
>
>
> While using K8s HA service, the watching for ConfigMap is separate for each 
> job. As the number of running jobs increases, this consumes a large amount of 
> connections.
> Here we proposal to use a shard watcher for each FlinkKubeClient, and 
> dispatch events to different listeners. At the same time, we should keep the 
> same semantic with watching separately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22267) Savepoint an application for source of upsert-kafka, then restart the application from the savepoint, state not be recovered.

2021-04-14 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321920#comment-17321920
 ] 

Jark Wu commented on FLINK-22267:
-

>From the log information, it did restore source offset state from the 
>savepoint, however, the restored source offset is not existed in the kafka 
>topic (cleaned by your step 2). In this case, Kafka will reset offset to 
>earliest by default, see Kafka property {{auto.offset.reset}}.

> Savepoint an application for source of upsert-kafka, then restart the 
> application from the savepoint, state not be recovered.  
> ---
>
> Key: FLINK-22267
> URL: https://issues.apache.org/jira/browse/FLINK-22267
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.12.2
>Reporter: Carl
>Priority: Major
> Attachments: image-2021-04-14-11-17-00-207.png
>
>
> My operation is as follows:
> 1. Savepoint an application for source of upsert-kafka
> 2. Delete the upsert Kafka topic data
> 3. restart the application from the savepoint
> 4. Log shows that the offset has been restored, but the state has not been 
> restored
> What I want to confirm is that restart from savepoint in the source 
> upsert-kafka application not restore the state from savepoint state but 
> restore the state from earliest offset kafka message?
> If so, why reset offset:
> !image-2021-04-14-11-17-00-207.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22169) Beautify the CliTableauResultView when print

2021-04-14 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-22169.
--
Resolution: Fixed

fixed: 326a564ebc0e397654ccd4e5a83a77ef811c6a76

> Beautify the CliTableauResultView when print
> 
>
> Key: FLINK-22169
> URL: https://issues.apache.org/jira/browse/FLINK-22169
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.0
>Reporter: Shengkai Fang
>Assignee: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
> Attachments: print.png
>
>
> In batch mode, the print is not as same as before.
>  !print.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KurtYoung merged pull request #15603: [FLINK-22169][sql-client] Beautify the CliTableauResultView when print

2021-04-14 Thread GitBox


KurtYoung merged pull request #15603:
URL: https://github.com/apache/flink/pull/15603


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KurtYoung commented on a change in pull request #15603: [FLINK-22169][sql-client] Beautify the CliTableauResultView when print

2021-04-14 Thread GitBox


KurtYoung commented on a change in pull request #15603:
URL: https://github.com/apache/flink/pull/15603#discussion_r613735406



##
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
##
@@ -233,22 +233,7 @@ public void testCancelBatchResult() throws Exception {
 furture.get(5, TimeUnit.SECONDS);
 
 Assert.assertEquals(
-
"+-+-+--++++"
-+ System.lineSeparator()
-+ "| boolean | int |   bigint |
varchar | decimal(10, 5) |  timestamp |"
-+ System.lineSeparator()
-+ 
"+-+-+--++++"
-+ System.lineSeparator()
-+ "|  (NULL) |   1 |2 |
abc |   1.23 |  2020-03-01 18:39:14.0 |"
-+ System.lineSeparator()
-+ "|   false |  (NULL) |0 |
|  1 |  2020-03-01 18:39:14.1 |"
-+ System.lineSeparator()
-+ "|true |  2147483647 |   (NULL) |
abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |"
-+ System.lineSeparator()
-+ "|   false | -2147483648 |  9223372036854775807 |
 (NULL) |12345.06789 |2020-03-01 18:39:14.123 |"
-+ System.lineSeparator()
-+ "Query terminated, received a total of 4 rows"
-+ System.lineSeparator(),
+"Query terminated, received a total of 0 row" + 
System.lineSeparator(),

Review comment:
   I think this is unexpected. The test is trying to verify that we can 
actually receive part of the data until users terminates the batch query.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15607: [FLINK-20779][python][docs] Add documentation for row-based operation in Python Table API

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15607:
URL: https://github.com/apache/flink/pull/15607#issuecomment-819349925


   
   ## CI report:
   
   * 8a62da0c014fc75b3bad145c2fd46fc1809a9c28 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16580)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15595: [FLINK-22171][sql-client][docs] Update the SQL Client doc

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15595:
URL: https://github.com/apache/flink/pull/15595#issuecomment-818681408


   
   ## CI report:
   
   * 6e06c0cb3d52f23f31ab7855880da5177f7d1627 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16532)
 
   * b7a82f07683d5a76f6743d68cb4cd174c0453bca UNKNOWN
   * 0b78a5c66116de0181cc645da264115b4b1ee79c Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16582)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-22054) Using a shared watcher for ConfigMap watching

2021-04-14 Thread hayden zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321916#comment-17321916
 ] 

hayden zhou edited comment on FLINK-22054 at 4/15/21, 5:48 AM:
---

[~trohrmann] [~yittg] I can't submit batch jobs more than almost 60, I 
submitted one job every 5 minutes if it is watching connection is exhausted.  
did the connection did not close when one batch job is closed?


was (Author: hayden zhou):
[~trohrmann] [~yittg] I can't submit batch jobs more than almost 60, I 
submitted one job every 5 minutes if it is watching connection is exhausted.  
did the connection did not close when on connection job is closed?

> Using a shared watcher for ConfigMap watching
> -
>
> Key: FLINK-22054
> URL: https://issues.apache.org/jira/browse/FLINK-22054
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.2, 1.13.0
>Reporter: Yi Tang
>Assignee: Yi Tang
>Priority: Major
>  Labels: k8s-ha, pull-request-available
> Fix For: 1.14.0
>
>
> While using K8s HA service, the watching for ConfigMap is separate for each 
> job. As the number of running jobs increases, this consumes a large amount of 
> connections.
> Here we proposal to use a shard watcher for each FlinkKubeClient, and 
> dispatch events to different listeners. At the same time, we should keep the 
> same semantic with watching separately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15438: [FLINK-22000] Set a default character set in InputStreamReader to sol…

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15438:
URL: https://github.com/apache/flink/pull/15438#issuecomment-810469495


   
   ## CI report:
   
   * d16ff5596d58f30aa2752007efc8f7908059596d Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16353)
 
   * 6459e87724e47a8c6cf7c0478f498c48d860c809 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)

2021-04-14 Thread xx chai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321917#comment-17321917
 ] 

xx chai commented on FLINK-22281:
-

Yes, I have added this parameter correctly

> flink sql consumer kakfa canal-json message then sum(amount)  
> --
>
> Key: FLINK-22281
> URL: https://issues.apache.org/jira/browse/FLINK-22281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: flink 1.12 local
>Reporter: xx chai
>Priority: Major
> Attachments: screenshot-1.png
>
>
> I use flink sql to consumer kafka canal-json message  the sql is 
> CREATE TABLE kafka_mall_order_info (
>   id int,
>   amount double,
>PRIMARY KEY (  id) NOT ENFORCED
>) WITH (
>   'connector' = 'kafka',
>   'topic' = 'topic_yx-dc-3-102_3306',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'kafka_to_hive',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'canal-json');
>   create table t2 (amount double) with ('connector' = 'print');
>   
>  
> insert into t2 select sum(amount) from kafka_mall_order_info ;
> but the result is not i think 
> the result in image



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22054) Using a shared watcher for ConfigMap watching

2021-04-14 Thread hayden zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321916#comment-17321916
 ] 

hayden zhou commented on FLINK-22054:
-

[~trohrmann] [~yittg] I can't submit batch jobs more than almost 60, I 
submitted one job every 5 minutes if it is watching connection is exhausted.  
did the connection did not close when on connection job is closed?

> Using a shared watcher for ConfigMap watching
> -
>
> Key: FLINK-22054
> URL: https://issues.apache.org/jira/browse/FLINK-22054
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.12.2, 1.13.0
>Reporter: Yi Tang
>Assignee: Yi Tang
>Priority: Major
>  Labels: k8s-ha, pull-request-available
> Fix For: 1.14.0
>
>
> While using K8s HA service, the watching for ConfigMap is separate for each 
> job. As the number of running jobs increases, this consumes a large amount of 
> connections.
> Here we proposal to use a shard watcher for each FlinkKubeClient, and 
> dispatch events to different listeners. At the same time, we should keep the 
> same semantic with watching separately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)

2021-04-14 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321914#comment-17321914
 ] 

Jark Wu commented on FLINK-22281:
-

[~chaixiaoxue] you should add the configuration by 
{{TableConfig#getConfiguration#set(...)}}.

> flink sql consumer kakfa canal-json message then sum(amount)  
> --
>
> Key: FLINK-22281
> URL: https://issues.apache.org/jira/browse/FLINK-22281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: flink 1.12 local
>Reporter: xx chai
>Priority: Major
> Attachments: screenshot-1.png
>
>
> I use flink sql to consumer kafka canal-json message  the sql is 
> CREATE TABLE kafka_mall_order_info (
>   id int,
>   amount double,
>PRIMARY KEY (  id) NOT ENFORCED
>) WITH (
>   'connector' = 'kafka',
>   'topic' = 'topic_yx-dc-3-102_3306',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'kafka_to_hive',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'canal-json');
>   create table t2 (amount double) with ('connector' = 'print');
>   
>  
> insert into t2 select sum(amount) from kafka_mall_order_info ;
> but the result is not i think 
> the result in image



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15595: [FLINK-22171][sql-client][docs] Update the SQL Client doc

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15595:
URL: https://github.com/apache/flink/pull/15595#issuecomment-818681408


   
   ## CI report:
   
   * 6e06c0cb3d52f23f31ab7855880da5177f7d1627 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16532)
 
   * b7a82f07683d5a76f6743d68cb4cd174c0453bca UNKNOWN
   * 0b78a5c66116de0181cc645da264115b4b1ee79c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kezhuw commented on a change in pull request #15605: [FLINK-21996][coordination] - Part 3&4: Ensure OperatorEvent transport losses are handled

2021-04-14 Thread GitBox


kezhuw commented on a change in pull request #15605:
URL: https://github.com/apache/flink/pull/15605#discussion_r613758218



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
##
@@ -64,6 +69,17 @@
 subtaskAccess.createEventSendAction(serializedEvent);
 
 final CompletableFuture result = new 
CompletableFuture<>();
+result.whenCompleteAsync(

Review comment:
   It takes me some time to conclude this, I hope I was not wrong at 
somewhere(forgive me then please  ). It is just hard to evaluate all things 
with multiple futures. But thanks to all existing works, almost every things 
are single threaded now, it is much better than asynchronous counterparts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] kezhuw commented on a change in pull request #15605: [FLINK-21996][coordination] - Part 3&4: Ensure OperatorEvent transport losses are handled

2021-04-14 Thread GitBox


kezhuw commented on a change in pull request #15605:
URL: https://github.com/apache/flink/pull/15605#discussion_r613755154



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
##
@@ -64,6 +69,17 @@
 subtaskAccess.createEventSendAction(serializedEvent);
 
 final CompletableFuture result = new 
CompletableFuture<>();
+result.whenCompleteAsync(

Review comment:
   I guess I see the correctness in my brain modeling.
   * `SubtaskGateway` is eagerly binding to `Execution` so that there will be 
no mis-targeting events. 
   * Both checkpoint-completion and event sending are sending ordered to main 
thread for execution. So, rendezvous in checkpointing from user side is 
preserved in holder side. This guarantees that "the events that are sent later 
cannot overtake that action".
   * `createEventSendAction` constructs a lazy sending action to execute in 
main thread. This sending action will inspect execution state(`Execution.state` 
or `IncompleteFuturesTracker.failureCause`) to **fail immediately to not become 
pending**.
   
   It might be nice(eg. correctness evaluation) in design part, but I would say 
it is pretty un-straightforward in implementation, at least for me  . Here is 
what I considered as un-straightforward:
   * `Execution.state` inspection in sending action is pretty important here. 
As it contribute to fail immediately. Normally, it should not. That say, a 
delayed failure should not contribute to correctness. I could not even know 
whether `taskExecution.getState()` is a purposed duplication for similar 
inspection in `sendOperatorEvent`.
   * Base on above(eg. delayed failure), 
`taskExecution.getTerminalStateFuture().thenAccept()` becomes crucial. It is 
not a safeguard to "speed up things", but a safeguard to correctness now. 
Without this, a delayed failure will contribute to next checkpoint.
   
   I think the key part is that `OperatorEventValve` is neutral to(eg. has no 
knowledge of) `Execution`, so some work has to be done outside it to avoid 
dated sending action contributing to incomplete futures which are inspected in 
next checkpoint. I have no strong objection to go current approach, but I would 
suggest to adjust docs a bit in `ExecutionSubtaskAccess` and/or other places to 
emphasize that this immediately failure is crucial  to correctness.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15595: [FLINK-22171][sql-client][docs] Update the SQL Client doc

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15595:
URL: https://github.com/apache/flink/pull/15595#issuecomment-818681408


   
   ## CI report:
   
   * 6e06c0cb3d52f23f31ab7855880da5177f7d1627 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16532)
 
   * b7a82f07683d5a76f6743d68cb4cd174c0453bca UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15546: [FLINK-21346][build system] Adding timeout to all tasks

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15546:
URL: https://github.com/apache/flink/pull/15546#issuecomment-816538766


   
   ## CI report:
   
   * e16ddc3fb9157cc0b70ea07023202a2ebdd9fe15 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16569)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15549: [FLINK-20761][hive] Escape the location path when creating input splits

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15549:
URL: https://github.com/apache/flink/pull/15549#issuecomment-816597084


   
   ## CI report:
   
   * 6bc0d4aa007b6fe629822142a3e6eb1d6c6fe7d9 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16293)
 
   * 387e94130b66c6fb2dd5ac36133b2c317d83060e Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16581)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22280) Add XML output format for Flink Table

2021-04-14 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321901#comment-17321901
 ] 

Kurt Young commented on FLINK-22280:


I also think this would be helpful. [~flacombe]

> Add XML output format for Flink Table
> -
>
> Key: FLINK-22280
> URL: https://issues.apache.org/jira/browse/FLINK-22280
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.12.1
>Reporter: François Lacombe
>Priority: Major
>
> Dear maintainers
> I'm looking for the ability to output xml files from Flink Table API, just 
> like csv and json already supported formats.
> To me, a new format could be required to make the appropriate serialization. 
> Am I missing any existing feature (or duplicate issue) that could allow it 
> without a dedicated format?
> Depending on your returns and if it makes sense, I could get involved in 
> writing the appropriate format based on the same logic as 
> `JsonRowDataSerializationSchema`.
> Best regards



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15605: [FLINK-21996][coordination] - Part 3&4: Ensure OperatorEvent transport losses are handled

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15605:
URL: https://github.com/apache/flink/pull/15605#issuecomment-819212302


   
   ## CI report:
   
   * e9f7f5526078a9ed65d6403eab5cde7fc82b177b Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16570)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15607: [FLINK-20779][python][docs] Add documentation for row-based operation in Python Table API

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15607:
URL: https://github.com/apache/flink/pull/15607#issuecomment-819349925


   
   ## CI report:
   
   * 58d729cf2d2ecb4239cd3a2ee380e64f484e7df0 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16512)
 
   * 8a62da0c014fc75b3bad145c2fd46fc1809a9c28 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16580)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15549: [FLINK-20761][hive] Escape the location path when creating input splits

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15549:
URL: https://github.com/apache/flink/pull/15549#issuecomment-816597084


   
   ## CI report:
   
   * 6bc0d4aa007b6fe629822142a3e6eb1d6c6fe7d9 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16293)
 
   * 387e94130b66c6fb2dd5ac36133b2c317d83060e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lirui-apache commented on a change in pull request #15549: [FLINK-20761][hive] Escape the location path when creating input splits

2021-04-14 Thread GitBox


lirui-apache commented on a change in pull request #15549:
URL: https://github.com/apache/flink/pull/15549#discussion_r613742399



##
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
##
@@ -442,6 +448,41 @@ public void testDynamicPartWithOrderBy() throws Exception {
 }
 }
 
+@Test
+public void testLocationWithComma() throws Exception {
+TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+File location = tempFolder.newFolder(",tbl1,location,");
+try {
+// test table location
+tableEnv.executeSql(
+String.format(
+"create table tbl1 (x int) location '%s'", 
location.getAbsolutePath()));
+tableEnv.executeSql("insert into tbl1 values (1),(2)").await();
+List results =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select * from 
tbl1").collect());
+assertEquals("[+I[1], +I[2]]", results.toString());
+// test partition location
+tableEnv.executeSql("create table tbl2 (x int) partitioned by (p 
string)");
+location = tempFolder.newFolder(",");
+tableEnv.executeSql(
+String.format(
+"alter table tbl2 add partition (p='a') location 
'%s'",

Review comment:
   It's the location path that causes the issue. But I added a test case 
where both partition value and path contain comma anyways.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22262) Flink on Kubernetes ConfigMaps are created without OwnerReference

2021-04-14 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321885#comment-17321885
 ] 

Yang Wang commented on FLINK-22262:
---

Could you please share the JobManager logs when you cancel the Flink job 
successfully and still have the residual ConfigMaps? I think you could use 
{{kubectl logs podname}} to get the logs.

 

I have used the following steps to start/stop Flink applications on K8s with HA 
enable in my minikube. And it works well.

1. Start the native Flink K8a application 

 
{code:java}
$FLINK_HOME/bin/flink run-application -d -t kubernetes-application \
-Dkubernetes.cluster-id=$CLUSTER_ID \
-Dkubernetes.namespace=$NAMESPACE \
-Dkubernetes.container.image=wangyang09180523/flink:1.13.0-rc0 \
-Dkubernetes.container.image.pull-policy=Always \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dkubernetes.jobmanager.cpu=0.5 -Djobmanager.memory.process.size=1700m \
-Dkubernetes.jobmanager.service-account=default \
-Dkubernetes.taskmanager.cpu=0.5 -Dtaskmanager.memory.process.size=1500m 
-Dtaskmanager.numberOfTaskSlots=4 \
-Dstate.checkpoints.dir=$HA_STORAGE \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
-Dhigh-availability.storageDir=$HA_STORAGE \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=1 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.13.0.jar
 
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.13.0.jar
 \
-Dstate.savepoints.dir=$HA_STORAGE \
local:///opt/flink/examples/streaming/StateMachineExample.jar
{code}
 

2. Cancel the Flink job with savepoint. All the K8s resources will be deleted. 
I do not find residual HA ConfigMaps after canceled successfully.

 
{code:java}
./bin/flink cancel --target kubernetes-application --withSavepoint 
-Dkubernetes.cluster-id=k8s-app-ha-1-113-rc1 -Dkubernetes.namespace=default 

... ...
Cancelled job . Savepoint stored in 
oss://flink-debug-yiqi/flink-ha/savepoint-00-8741523cb1d1.
{code}
3. Maybe change the user codes and resubmit the Flink application with stored 
savepoint

 
{code:java}
$FLINK_HOME/bin/flink run-application -d -t kubernetes-application \
--fromSavepoint oss://flink-debug-yiqi/flink-ha/savepoint-00-8741523cb1d1 \
... ...
local:///opt/flink/examples/streaming/StateMachineExample.jar{code}
 

 

> Flink on Kubernetes ConfigMaps are created without OwnerReference
> -
>
> Key: FLINK-22262
> URL: https://issues.apache.org/jira/browse/FLINK-22262
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.13.0
>Reporter: Andrea Peruffo
>Priority: Major
>
> According to the documentation:
> [https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#manual-resource-cleanup]
> The ConfigMaps created along with the Flink deployment is supposed to have an 
> OwnerReference pointing to the Deployment itself, unfortunately, this doesn't 
> happen and causes all sorts of issues when the classpath and the jars of the 
> job are updated.
> i.e.:
> Without manually removing the ConfigMap of the Job I cannot update the Jars 
> of the Job.
> Can you please give guidance if there are additional caveats on manually 
> removing the ConfigMap? Any other workaround that can be used?
> Thanks in advance.
> Example ConfigMap:
> {{apiVersion: v1}}
> {{data:}}
> {{ address: akka.tcp://flink@10.0.2.13:6123/user/rpc/jobmanager_2}}
> {{ checkpointID-049: 
> rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAABOEtzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAUC9tbnQvZmxpbmsvc3RvcmFnZS9rc2hhL3RheGktcmlkZS1mYXJlLXByb2Nlc3Nvci9jb21wbGV0ZWRDaGVja3BvaW50MDQ0YTc2OWRkNDgxeA==}}
> {{ counter: "50"}}
> {{ sessionId: 0c2b69ee-6b41-48d3-b7fd-1bf2eda94f0f}}
> {{kind: ConfigMap}}
> {{metadata:}}
> {{ annotations:}}
> {{ control-plane.alpha.kubernetes.io/leader: 
> '\{"holderIdentity":"0f25a2cc-e212-46b0-8ba9-faac0732a316","leaseDuration":15.0,"acquireTime":"2021-04-13T14:30:51.439000Z","renewTime":"2021-04-13T14:39:32.011000Z","leaderTransitions":105}'}}
> {{ creationTimestamp: "2021-04-13T14:30:51Z"}}
> {{ labels:}}
> {{ app: taxi-ride-fare-processor}}
> {{ configmap-type: high-availability}}
> {{ type: 

[jira] [Updated] (FLINK-22262) Flink on Kubernetes ConfigMaps are created without OwnerReference

2021-04-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-22262:
--
Component/s: Deployment / Kubernetes

> Flink on Kubernetes ConfigMaps are created without OwnerReference
> -
>
> Key: FLINK-22262
> URL: https://issues.apache.org/jira/browse/FLINK-22262
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.13.0
>Reporter: Andrea Peruffo
>Priority: Major
>
> According to the documentation:
> [https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#manual-resource-cleanup]
> The ConfigMaps created along with the Flink deployment is supposed to have an 
> OwnerReference pointing to the Deployment itself, unfortunately, this doesn't 
> happen and causes all sorts of issues when the classpath and the jars of the 
> job are updated.
> i.e.:
> Without manually removing the ConfigMap of the Job I cannot update the Jars 
> of the Job.
> Can you please give guidance if there are additional caveats on manually 
> removing the ConfigMap? Any other workaround that can be used?
> Thanks in advance.
> Example ConfigMap:
> {{apiVersion: v1}}
> {{data:}}
> {{ address: akka.tcp://flink@10.0.2.13:6123/user/rpc/jobmanager_2}}
> {{ checkpointID-049: 
> rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAABOEtzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAUC9tbnQvZmxpbmsvc3RvcmFnZS9rc2hhL3RheGktcmlkZS1mYXJlLXByb2Nlc3Nvci9jb21wbGV0ZWRDaGVja3BvaW50MDQ0YTc2OWRkNDgxeA==}}
> {{ counter: "50"}}
> {{ sessionId: 0c2b69ee-6b41-48d3-b7fd-1bf2eda94f0f}}
> {{kind: ConfigMap}}
> {{metadata:}}
> {{ annotations:}}
> {{ control-plane.alpha.kubernetes.io/leader: 
> '\{"holderIdentity":"0f25a2cc-e212-46b0-8ba9-faac0732a316","leaseDuration":15.0,"acquireTime":"2021-04-13T14:30:51.439000Z","renewTime":"2021-04-13T14:39:32.011000Z","leaderTransitions":105}'}}
> {{ creationTimestamp: "2021-04-13T14:30:51Z"}}
> {{ labels:}}
> {{ app: taxi-ride-fare-processor}}
> {{ configmap-type: high-availability}}
> {{ type: flink-native-kubernetes}}
> {{ name: 
> taxi-ride-fare-processor--jobmanager-leader}}
> {{ namespace: taxi-ride-fare}}
> {{ resourceVersion: "64100"}}
> {{ selfLink: 
> /api/v1/namespaces/taxi-ride-fare/configmaps/taxi-ride-fare-processor--jobmanager-leader}}
> {{ uid: 9f912495-382a-45de-a789-fd5ad2a2459d}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15607: [FLINK-20779][python][docs] Add documentation for row-based operation in Python Table API

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15607:
URL: https://github.com/apache/flink/pull/15607#issuecomment-819349925


   
   ## CI report:
   
   * 58d729cf2d2ecb4239cd3a2ee380e64f484e7df0 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16512)
 
   * 8a62da0c014fc75b3bad145c2fd46fc1809a9c28 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15525: [FLINK-22154][table-planner-blink] Fix bug where PushFilterIntoTableSourceScanRule fails to deal with IN expressions

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15525:
URL: https://github.com/apache/flink/pull/15525#issuecomment-815836876


   
   ## CI report:
   
   * 00492ee3748f2830378d9f7c73afa8b6e3e48fcb Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16504)
 
   * 98d1ce0ed7ea00fd3cc89854ae38e02522926cf4 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16579)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15020: [FLINK-21445][kubernetes] Application mode does not set the configuration when building PackagedProgram

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15020:
URL: https://github.com/apache/flink/pull/15020#issuecomment-785804760


   
   ## CI report:
   
   * 4a4f8a643c603007519ebefc4c1e732347b6d3d2 UNKNOWN
   * 6ebcd56d7080224fcd21f0f22ee467a951feb2e0 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16545)
 
   * de3bd6e46144fc21952f2ea1035d278a6dc1076f Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16578)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #14484: [FLINK-20722][hive] HiveTableSink should copy the record when convert…

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #14484:
URL: https://github.com/apache/flink/pull/14484#issuecomment-750730068


   
   ## CI report:
   
   * adaac76c0ae0a62ca7f156340653128a43bf7b20 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12645)
 
   * af484144ed52eb8074b78710915bff0fb049dc97 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16577)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KurtYoung commented on a change in pull request #15603: [FLINK-22169][sql-client] Beautify the CliTableauResultView when print

2021-04-14 Thread GitBox


KurtYoung commented on a change in pull request #15603:
URL: https://github.com/apache/flink/pull/15603#discussion_r613735406



##
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
##
@@ -233,22 +233,7 @@ public void testCancelBatchResult() throws Exception {
 furture.get(5, TimeUnit.SECONDS);
 
 Assert.assertEquals(
-
"+-+-+--++++"
-+ System.lineSeparator()
-+ "| boolean | int |   bigint |
varchar | decimal(10, 5) |  timestamp |"
-+ System.lineSeparator()
-+ 
"+-+-+--++++"
-+ System.lineSeparator()
-+ "|  (NULL) |   1 |2 |
abc |   1.23 |  2020-03-01 18:39:14.0 |"
-+ System.lineSeparator()
-+ "|   false |  (NULL) |0 |
|  1 |  2020-03-01 18:39:14.1 |"
-+ System.lineSeparator()
-+ "|true |  2147483647 |   (NULL) |
abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |"
-+ System.lineSeparator()
-+ "|   false | -2147483648 |  9223372036854775807 |
 (NULL) |12345.06789 |2020-03-01 18:39:14.123 |"
-+ System.lineSeparator()
-+ "Query terminated, received a total of 4 rows"
-+ System.lineSeparator(),
+"Query terminated, received a total of 0 row" + 
System.lineSeparator(),

Review comment:
   I think this is unexpected. The test is trying to verify that we can 
actually receive part of the data until users terminates the batch query.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KurtYoung commented on a change in pull request #15603: [FLINK-22169][sql-client] Beautify the CliTableauResultView when print

2021-04-14 Thread GitBox


KurtYoung commented on a change in pull request #15603:
URL: https://github.com/apache/flink/pull/15603#discussion_r613735406



##
File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java
##
@@ -233,22 +233,7 @@ public void testCancelBatchResult() throws Exception {
 furture.get(5, TimeUnit.SECONDS);
 
 Assert.assertEquals(
-
"+-+-+--++++"
-+ System.lineSeparator()
-+ "| boolean | int |   bigint |
varchar | decimal(10, 5) |  timestamp |"
-+ System.lineSeparator()
-+ 
"+-+-+--++++"
-+ System.lineSeparator()
-+ "|  (NULL) |   1 |2 |
abc |   1.23 |  2020-03-01 18:39:14.0 |"
-+ System.lineSeparator()
-+ "|   false |  (NULL) |0 |
|  1 |  2020-03-01 18:39:14.1 |"
-+ System.lineSeparator()
-+ "|true |  2147483647 |   (NULL) |
abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |"
-+ System.lineSeparator()
-+ "|   false | -2147483648 |  9223372036854775807 |
 (NULL) |12345.06789 |2020-03-01 18:39:14.123 |"
-+ System.lineSeparator()
-+ "Query terminated, received a total of 4 rows"
-+ System.lineSeparator(),
+"Query terminated, received a total of 0 row" + 
System.lineSeparator(),

Review comment:
   I think this is unexpected. The test is trying to verify that we can 
actually receive part of the data until users terminate the batch query.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KurtYoung commented on a change in pull request #15549: [FLINK-20761][hive] Escape the location path when creating input splits

2021-04-14 Thread GitBox


KurtYoung commented on a change in pull request #15549:
URL: https://github.com/apache/flink/pull/15549#discussion_r613734122



##
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
##
@@ -442,6 +448,41 @@ public void testDynamicPartWithOrderBy() throws Exception {
 }
 }
 
+@Test
+public void testLocationWithComma() throws Exception {
+TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+File location = tempFolder.newFolder(",tbl1,location,");
+try {
+// test table location
+tableEnv.executeSql(
+String.format(
+"create table tbl1 (x int) location '%s'", 
location.getAbsolutePath()));
+tableEnv.executeSql("insert into tbl1 values (1),(2)").await();
+List results =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select * from 
tbl1").collect());
+assertEquals("[+I[1], +I[2]]", results.toString());
+// test partition location
+tableEnv.executeSql("create table tbl2 (x int) partitioned by (p 
string)");
+location = tempFolder.newFolder(",");
+tableEnv.executeSql(
+String.format(
+"alter table tbl2 add partition (p='a') location 
'%s'",

Review comment:
   According to the jira description, I was assuming that you want to 
address both of these

##
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
##
@@ -442,6 +448,41 @@ public void testDynamicPartWithOrderBy() throws Exception {
 }
 }
 
+@Test
+public void testLocationWithComma() throws Exception {
+TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+File location = tempFolder.newFolder(",tbl1,location,");
+try {
+// test table location
+tableEnv.executeSql(
+String.format(
+"create table tbl1 (x int) location '%s'", 
location.getAbsolutePath()));
+tableEnv.executeSql("insert into tbl1 values (1),(2)").await();
+List results =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select * from 
tbl1").collect());
+assertEquals("[+I[1], +I[2]]", results.toString());
+// test partition location
+tableEnv.executeSql("create table tbl2 (x int) partitioned by (p 
string)");
+location = tempFolder.newFolder(",");
+tableEnv.executeSql(
+String.format(
+"alter table tbl2 add partition (p='a') location 
'%s'",

Review comment:
   According to the jira description, I was assuming that you want to 
address both of them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22140) Test the unified binary savepoint

2021-04-14 Thread Yun Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321879#comment-17321879
 ] 

Yun Gao commented on FLINK-22140:
-

Tested the unified binary save-point with [an artificial 
job|https://github.com/gaoyunhaii/flink1.13test]:
 # The job has a source generate the tuple (index % numberOfKeys, index / 
numberOfKeys) for index in [0, numberOfRecords). The input is feed into 
operators with different type of keyed states. There are 5 types of states in 
total, namely value state, reducing state, aggregating state, list state and 
map state.
 # During the execution of the job, we execute stop with savepoint to create a 
savepoint.
 # Then we start another job with the savepoint to continue execution. The new 
job would continue to run until the expected number of records is emitted. Then 
the operators write their final state content into files, and the files would 
be compared with the expected content.

We tested the cases that 
 # Starts the first job with one statebackend in (HashMap, Rocksdb and 
Incremental Rocksdb), and start the second job with another statebackend. We 
tests all the 9 cases.
 # For the 9 cases in the first item, we start the second job with a larger 
parallelism.
 # For the 9 cases in the first item, we start the second job with a smaller 
parallelism.
 # For all the above three items, we change the key type and value type to 
customized user types.

We verified that
 # The savepoint is taken successfully, and both jobs are finished normally. 
 # The resulted statebackend content is as expected.
 # The checkpoints of the two jobs are successfully taken.
 # There is no unexpected behavior during the test process.

The test result is good for all cases with only two minor issues:
 # In Web UI we do have show the configuration of statebackend type and storage 
type. It is not easy for user to verify which statebackend is using now.
 # Not print the stack trace if the checkpoints are failed due to not all tasks 
are running: https://issues.apache.org/jira/browse/FLINK-22117, otherwise the 
log would be overwhelming with this kind of exceptions. 
  

 

> Test the unified binary savepoint
> -
>
> Key: FLINK-22140
> URL: https://issues.apache.org/jira/browse/FLINK-22140
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / State Backends
>Affects Versions: 1.13.0
>Reporter: Dawid Wysakowicz
>Assignee: Yun Gao
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.13.0
>
>
> With https://issues.apache.org/jira/browse/FLINK-20976 we introduced a 
> unified binary savepoint format which should let you switch between different 
> state backends. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15618: [FLINK-22125][python] Returns None when a key doesn't exist when calling MapState.get()

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15618:
URL: https://github.com/apache/flink/pull/15618#issuecomment-820024114


   
   ## CI report:
   
   * a5030e29742e58a6df2dbca7d7e1ce27a90e8722 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16576)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15525: [FLINK-22154][table-planner-blink] Fix bug where PushFilterIntoTableSourceScanRule fails to deal with IN expressions

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15525:
URL: https://github.com/apache/flink/pull/15525#issuecomment-815836876


   
   ## CI report:
   
   * 00492ee3748f2830378d9f7c73afa8b6e3e48fcb Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16504)
 
   * 98d1ce0ed7ea00fd3cc89854ae38e02522926cf4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-22113) UniqueKey constraint is lost with multiple sources join in SQL

2021-04-14 Thread Xu Guangheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321868#comment-17321868
 ] 

Xu Guangheng edited comment on FLINK-22113 at 4/15/21, 3:25 AM:


Hello [~jark],

Thanks for you reply. 

Yes, {{FlinkRelMdColumnUniqueness}} has implemented {{areColumnsUnique}} for 
TableScan node, but there is a {{TODO}} inside the implementation. It seems the 
implementation is not complete.

Can you assign the issue to me, I can try to fix it.


was (Author: xuguangheng):
Hello [~jark],

Thanks for you reply. 

Yes, FlinkRelMdColumnUniqueness has implemented areColumnsUnique for TableScan 
node, but there is a TODO inside the implementation. It seems the 
implementation is not complete.

Can you assign the issue to me, I can try to fix it.

> UniqueKey constraint is lost with multiple sources join in SQL
> --
>
> Key: FLINK-22113
> URL: https://issues.apache.org/jira/browse/FLINK-22113
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Fu Kai
>Priority: Major
> Fix For: 1.14.0
>
>
> Hi team,
>   
>  We have a use case to join multiple data sources to generate a continuous 
> updated view. We defined primary key constraint on all the input sources and 
> all the keys are the subsets in the join condition. All joins are left join.
>   
>  In our case, the first two inputs can produce *JoinKeyContainsUniqueKey* 
> input sepc, which is good and performant. While when it comes to the third 
> input source, it's joined with the intermediate output table of the first two 
> input tables, and the intermediate table does not carry key constraint 
> information(although the thrid source input table does), so it results in a 
> *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance 
> implications per the[ Force Join Unique 
> Key|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651]
>  email thread, we want to know if there is any mitigation solution for this.
>  
> Example:
> Take the example from 
> [https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md]
> {code:java}
> CREATE TEMPORARY TABLE passengers (
>   passenger_key STRING,
>   first_name STRING,
>   last_name STRING,
>   update_time TIMESTAMP(3),
>   PRIMARY KEY (passenger_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'passengers',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE stations (
>   station_key STRING,
>   update_time TIMESTAMP(3),
>   city STRING,
>   PRIMARY KEY (station_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'stations',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE booking_channels (
>   booking_channel_key STRING,
>   update_time TIMESTAMP(3),
>   channel STRING,
>   PRIMARY KEY (booking_channel_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'booking_channels',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE train_activities (
>   scheduled_departure_time TIMESTAMP(3),
>   actual_departure_date TIMESTAMP(3),
>   passenger_key STRING,
>   origin_station_key STRING,
>   destination_station_key STRING,
>   booking_channel_key STRING,
>   PRIMARY KEY (booking_channel_key, origin_station_key, 
> destination_station_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'train_activities',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'json',
>   'value.format' = 'json'
> );
> SELECT 
>   t.actual_departure_date, 
>   p.first_name,
>   p.last_name,
>   b.channel, 
>   os.city AS origin_station,
>   ds.city AS destination_station
> FROM train_activities_1 t
> LEFT JOIN booking_channels b 
> ON t.booking_channel_key = b.booking_channel_key
> LEFT JOIN passengers p
> ON t.passenger_key = p.passenger_key
> LEFT JOIN stations os
> ON t.origin_station_key = os.station_key
> LEFT JOIN stations ds
> ON t.destination_station_key = ds.station_key
> {code}
>  
>  The query will generate exeuction plan of:
>  
> {code:java}
> Flink SQL> explain
> >  SELECT
> >t.actual_departure_date,
> >p.first_name,
> >p.last_name,
> >b.channel,
> >os.city AS origin_station,
> >ds.city AS destination_station
> >  FROM train_activities_1 t
> >  LEFT JOIN booking_channels b
> >  ON t.booking_channel_key = b.booking_channel_key
> >  LEFT JOIN passengers p
> >  ON t.passenger_key = 

[jira] [Commented] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)

2021-04-14 Thread xx chai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321878#comment-17321878
 ] 

xx chai commented on FLINK-22281:
-

thanks [~jark] I solve the question .

> flink sql consumer kakfa canal-json message then sum(amount)  
> --
>
> Key: FLINK-22281
> URL: https://issues.apache.org/jira/browse/FLINK-22281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: flink 1.12 local
>Reporter: xx chai
>Priority: Major
> Attachments: screenshot-1.png
>
>
> I use flink sql to consumer kafka canal-json message  the sql is 
> CREATE TABLE kafka_mall_order_info (
>   id int,
>   amount double,
>PRIMARY KEY (  id) NOT ENFORCED
>) WITH (
>   'connector' = 'kafka',
>   'topic' = 'topic_yx-dc-3-102_3306',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'kafka_to_hive',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'canal-json');
>   create table t2 (amount double) with ('connector' = 'print');
>   
>  
> insert into t2 select sum(amount) from kafka_mall_order_info ;
> but the result is not i think 
> the result in image



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #14484: [FLINK-20722][hive] HiveTableSink should copy the record when convert…

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #14484:
URL: https://github.com/apache/flink/pull/14484#issuecomment-750730068


   
   ## CI report:
   
   * adaac76c0ae0a62ca7f156340653128a43bf7b20 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12645)
 
   * af484144ed52eb8074b78710915bff0fb049dc97 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-22275) Datagen add a max lag option for a series of timestamp-related types

2021-04-14 Thread Yi Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321872#comment-17321872
 ] 

Yi Tang edited comment on FLINK-22275 at 4/15/21, 3:23 AM:
---

Of course.

How about introduce a {{fields.#.max-lag}} integer option with default value as 
{{0}} (expected to nonnegative)? It is obvious that this option can only be 
applied to {{random}} kind and for {{TimestampType}} / {{ZonedTimestampType}} / 
{{LocalZonedTimestampType}} types which all return the current timestamp for 
now.

If one user configured the max-lag, then the generated timestamp will be 
{{currentTimeMillis - random(0, max-lag)}} instead of the original 
{{currentTimeMillis}} only.

For example fields.atime.max-lag = 5000, then the generated timestamp maybe 
consists of (one per second)
|timestamp|1618455622000|1618455623000|1618455624000|1618455625000|
|lag|0|1000|3000|2000|
|generated|1618455622000|1618455622000|1618455621000|1618455623000|


was (Author: yittg):
Of course.

How about introduce a {{fields.#.max-lag}} integer option with default value as 
{{0}} (expected to nonnegative)? It is obvious that this option can only be 
applied to {{random}} kind and for {{TimestampType}} / {{ZonedTimestampType}} / 
{{LocalZonedTimestampType}} types which all return the current timestamp for 
now.

If one user configured the max-lag, then the generated timestamp will be 
{{currentTimeMillis - random(0, max-lag)}} instead of the original 
{{currentTimeMillis}} only.

For example fields.atime.max-lag = 5000, then the generated timestamp maybe 
consists of (one per second)
|timestamp|1618455622000|1618455623000|1618455624000|1618455625000|
|delta|0|1000|3000|2000|
|generated|1618455622000|1618455622000|1618455621000|1618455623000|

> Datagen add a max lag option for a series of timestamp-related types
> 
>
> Key: FLINK-22275
> URL: https://issues.apache.org/jira/browse/FLINK-22275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Reporter: Yi Tang
>Priority: Minor
>
> For now, the {{datagen}} connector always resolves to the current timestamp 
> for timestamp-related types.
> Here proposals to add a max lag option for these types, which will generate a 
> timestamp with a random lag (with max lag as option) compared with the 
> current timestamp. Leave it as before if the option is not configured.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22275) Datagen add a max lag option for a series of timestamp-related types

2021-04-14 Thread Yi Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321872#comment-17321872
 ] 

Yi Tang edited comment on FLINK-22275 at 4/15/21, 3:22 AM:
---

Of course.

How about introduce a {{fields.#.max-lag}} integer option with default value as 
{{0}} (expected to nonnegative)? It is obvious that this option can only be 
applied to {{random}} kind and for {{TimestampType}} / {{ZonedTimestampType}} / 
{{LocalZonedTimestampType}} types which all return the current timestamp for 
now.

If one user configured the max-lag, then the generated timestamp will be 
{{currentTimeMillis - random(0, max-lag)}} instead of the original 
{{currentTimeMillis}} only.

For example fields.atime.max-lag = 5000, then the generated timestamp maybe 
consists of (one per second)
|timestamp|1618455622000|1618455623000|1618455624000|1618455625000|
|delta|0|1000|3000|2000|
|generated|1618455622000|1618455622000|1618455621000|1618455623000|


was (Author: yittg):
Of course.

First of all, it is obvious that this option can only be applied to \{{ random 
}} kind.

How about introduce a {{fields.#.max-lag}} integer option with default value as 
{{0}} (expected to nonnegative)? The option can be applied for 
{{TimestampType}} / {{ZonedTimestampType}} / {{LocalZonedTimestampType}} types 
which all return the current timestamp for now.

If one user configured the max-lag, then the generated timestamp will be 
{{currentTimeMillis - random(0, max-lag)}} instead of the original 
{{currentTimeMillis}} only.

For example fields.atime.max-lag = 5000, then the generated timestamp maybe 
consists of (one per second)
|timestamp|1618455622000|1618455623000|1618455624000|1618455625000|
|delta|0|1000|3000|2000|
|generated|1618455622000|1618455622000|1618455621000|1618455623000|

> Datagen add a max lag option for a series of timestamp-related types
> 
>
> Key: FLINK-22275
> URL: https://issues.apache.org/jira/browse/FLINK-22275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Reporter: Yi Tang
>Priority: Minor
>
> For now, the {{datagen}} connector always resolves to the current timestamp 
> for timestamp-related types.
> Here proposals to add a max lag option for these types, which will generate a 
> timestamp with a random lag (with max lag as option) compared with the 
> current timestamp. Leave it as before if the option is not configured.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-6113) Implement split/select with Side Outputs

2021-04-14 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-6113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321877#comment-17321877
 ] 

Chen Qin commented on FLINK-6113:
-

Seems community already get rid of split and select transformation in master 
branch. So this Jira seems no longer make sense.

attaching patch for curious minds.

[^split_select.patch]

 

> Implement split/select with Side Outputs
> 
>
> Key: FLINK-6113
> URL: https://issues.apache.org/jira/browse/FLINK-6113
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.3.0
>Reporter: Chen Qin
>Priority: Minor
>  Labels: stale-minor
> Attachments: split_select.patch
>
>
> With completion of FLINK-4460(side outputs), this is one of follow up item 
> towards deprecate string tag based split/select with OutputTag based 
> split/select.
> In Flink 2.0, we might consider eventually deprecate split/select 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-6113) Implement split/select with Side Outputs

2021-04-14 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-6113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin closed FLINK-6113.
---
Resolution: Fixed

> Implement split/select with Side Outputs
> 
>
> Key: FLINK-6113
> URL: https://issues.apache.org/jira/browse/FLINK-6113
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.3.0
>Reporter: Chen Qin
>Priority: Minor
>  Labels: stale-minor
> Attachments: split_select.patch
>
>
> With completion of FLINK-4460(side outputs), this is one of follow up item 
> towards deprecate string tag based split/select with OutputTag based 
> split/select.
> In Flink 2.0, we might consider eventually deprecate split/select 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-6113) Implement split/select with Side Outputs

2021-04-14 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-6113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-6113:

Attachment: split_select.patch

> Implement split/select with Side Outputs
> 
>
> Key: FLINK-6113
> URL: https://issues.apache.org/jira/browse/FLINK-6113
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.3.0
>Reporter: Chen Qin
>Priority: Minor
>  Labels: stale-minor
> Attachments: split_select.patch
>
>
> With completion of FLINK-4460(side outputs), this is one of follow up item 
> towards deprecate string tag based split/select with OutputTag based 
> split/select.
> In Flink 2.0, we might consider eventually deprecate split/select 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22284) Null address will be logged when channel is closed in NettyPartitionRequestClient

2021-04-14 Thread Zhilong Hong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhilong Hong updated FLINK-22284:
-
Attachment: exception.png

> Null address will be logged when channel is closed in 
> NettyPartitionRequestClient
> -
>
> Key: FLINK-22284
> URL: https://issues.apache.org/jira/browse/FLINK-22284
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.13.0
>Reporter: Zhilong Hong
>Priority: Minor
> Fix For: 1.13.0
>
> Attachments: exception.png
>
>
> In NettyPartitionRequestClient#requestSubpartition, when a channel is closed, 
> the channel will throw a LocalTransportException with the error message 
> "Sending the partition request to 'null' failed.". The message is confusing 
> since we wouldn't know where the remote client connected to this channel 
> locates, and we couldn't track down to that TaskExecutor and find out what 
> happened.
>  
> Also I'm wondering that should we use TransportException instead of 
> LocalTransportException here, because it's a little confusing to see a 
> LocalTransportException is thrown out when a remote channel is closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22284) Null address will be logged when channel is closed in NettyPartitionRequestClient

2021-04-14 Thread Zhilong Hong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhilong Hong updated FLINK-22284:
-
Description: 
In NettyPartitionRequestClient#requestSubpartition, when a channel is closed, 
the channel will throw a LocalTransportException with the error message 
"Sending the partition request to 'null' failed.". The message is confusing 
since we wouldn't know where the remote client connected to this channel 
locates, and we couldn't track down to that TaskExecutor and find out what 
happened.

 

Also I'm wondering that should we use TransportException instead of 
LocalTransportException here, because it's a little confusing to see a 
LocalTransportException is thrown out when a remote channel is closed.

 

!exception.png!

  was:
In NettyPartitionRequestClient#requestSubpartition, when a channel is closed, 
the channel will throw a LocalTransportException with the error message 
"Sending the partition request to 'null' failed.". The message is confusing 
since we wouldn't know where the remote client connected to this channel 
locates, and we couldn't track down to that TaskExecutor and find out what 
happened.

 

Also I'm wondering that should we use TransportException instead of 
LocalTransportException here, because it's a little confusing to see a 
LocalTransportException is thrown out when a remote channel is closed.


> Null address will be logged when channel is closed in 
> NettyPartitionRequestClient
> -
>
> Key: FLINK-22284
> URL: https://issues.apache.org/jira/browse/FLINK-22284
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.13.0
>Reporter: Zhilong Hong
>Priority: Minor
> Fix For: 1.13.0
>
> Attachments: exception.png
>
>
> In NettyPartitionRequestClient#requestSubpartition, when a channel is closed, 
> the channel will throw a LocalTransportException with the error message 
> "Sending the partition request to 'null' failed.". The message is confusing 
> since we wouldn't know where the remote client connected to this channel 
> locates, and we couldn't track down to that TaskExecutor and find out what 
> happened.
>  
> Also I'm wondering that should we use TransportException instead of 
> LocalTransportException here, because it's a little confusing to see a 
> LocalTransportException is thrown out when a remote channel is closed.
>  
> !exception.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] kezhuw commented on a change in pull request #15601: [FLINK-18071][FLINK-21996][coordination] - Part two: Ensure reliable OperatorEvent to running Task matching

2021-04-14 Thread GitBox


kezhuw commented on a change in pull request #15601:
URL: https://github.com/apache/flink/pull/15601#discussion_r613727138



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
##
@@ -285,41 +348,130 @@ public void handleEventFromOperator(int subtask, 
OperatorEvent event) throws Exc
 String.format("Don't recognize event '%s' from task 
%d.", event, subtask));
 }
 
-if (periodicTask != null) {
-throw new Exception("periodic already running");
+synchronized (recoveredTaskRunning) {
+// signal the previous task that its recovered task is now 
running
+final CompletableFuture prevTaskFuture = 
recoveredTaskRunning.peekLast();
+if (prevTaskFuture != null) {
+prevTaskFuture.complete(null);
+}
+// add a future for this task
+recoveredTaskRunning.addLast(new CompletableFuture<>());
 }
-periodicTask =
-executor.scheduleWithFixedDelay(this, delay, delay, 
TimeUnit.MILLISECONDS);
+
+// first, we hand this over to the mailbox thread, so we preserve 
order on operations,
+// even if the action is only to do a thread safe scheduling into 
the scheduledExecutor
+runInMailbox(
+() -> {
+checkState(!workLoopRunning);
+checkState(subtaskGateway != null);
+
+workLoopRunning = true;
+scheduleSingleAction();
+});
 }
 
 @Override
 public void subtaskFailed(int subtask, @Nullable Throwable reason) {

Review comment:
   It should require some minor changes to `subtaskReset`, 
`notifyCheckpointComplete` and `notifyCheckpointAborted`. Personally, I think 
`SubtaskGateway` is more vital to subtask failure than job failure. For solely 
job failure, recreating a brand new coordinator on `resetToCheckpoint` should 
pass this test also.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-22284) Null address will be logged when channel is closed in NettyPartitionRequestClient

2021-04-14 Thread Zhilong Hong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhilong Hong updated FLINK-22284:
-
Description: 
In NettyPartitionRequestClient#requestSubpartition, when a channel is closed, 
the channel will throw a LocalTransportException with the error message 
"Sending the partition request to 'null' failed.". The message is confusing 
since we wouldn't know where the remote client connected to this channel 
locates, and we couldn't track down to that TaskExecutor and find out what 
happened.

 

Also I'm wondering that should we use TransportException instead of 
LocalTransportException here, because it's a little confusing to see a 
LocalTransportException is thrown out when a remote channel is closed.

  was:
In NettyPartitionRequestClient#requestSubpartition, when channel is closed, the 
channel will throw a LocalTransportException with the error message "Sending 
the partition request to 'null' failed.". The message is confusing since we 
wouldn't know where the remote client connected to this channel locates, and we 
couldn't track down to that TaskExecutor and find out what happened.

 

Also I'm wondering that should we use TransportException instead of 
LocalTransportException here, because it's a little confusing to see a 
LocalTransportException is thrown out when a remote channel is closed.


> Null address will be logged when channel is closed in 
> NettyPartitionRequestClient
> -
>
> Key: FLINK-22284
> URL: https://issues.apache.org/jira/browse/FLINK-22284
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.13.0
>Reporter: Zhilong Hong
>Priority: Minor
> Fix For: 1.13.0
>
>
> In NettyPartitionRequestClient#requestSubpartition, when a channel is closed, 
> the channel will throw a LocalTransportException with the error message 
> "Sending the partition request to 'null' failed.". The message is confusing 
> since we wouldn't know where the remote client connected to this channel 
> locates, and we couldn't track down to that TaskExecutor and find out what 
> happened.
>  
> Also I'm wondering that should we use TransportException instead of 
> LocalTransportException here, because it's a little confusing to see a 
> LocalTransportException is thrown out when a remote channel is closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #15618: [FLINK-22125][python] Returns None when a key doesn't exist when calling MapState.get()

2021-04-14 Thread GitBox


flinkbot commented on pull request #15618:
URL: https://github.com/apache/flink/pull/15618#issuecomment-820024114


   
   ## CI report:
   
   * a5030e29742e58a6df2dbca7d7e1ce27a90e8722 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-22284) Null address will be logged when channel is closed in NettyPartitionRequestClient

2021-04-14 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-22284:


 Summary: Null address will be logged when channel is closed in 
NettyPartitionRequestClient
 Key: FLINK-22284
 URL: https://issues.apache.org/jira/browse/FLINK-22284
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


In NettyPartitionRequestClient#requestSubpartition, when channel is closed, the 
channel will throw a LocalTransportException with the error message "Sending 
the partition request to 'null' failed.". The message is confusing since we 
wouldn't know where the remote client connected to this channel locates, and we 
couldn't track down to that TaskExecutor and find out what happened.

 

Also I'm wondering that should we use TransportException instead of 
LocalTransportException here, because it's a little confusing to see a 
LocalTransportException is thrown out when a remote channel is closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22275) Datagen add a max lag option for a series of timestamp-related types

2021-04-14 Thread Yi Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321872#comment-17321872
 ] 

Yi Tang commented on FLINK-22275:
-

Of course.

First of all, it is obvious that this option can only be applied to \{{ random 
}} kind.

How about introduce a {{fields.#.max-lag}} integer option with default value as 
{{0}} (expected to nonnegative)? The option can be applied for 
{{TimestampType}} / {{ZonedTimestampType}} / {{LocalZonedTimestampType}} types 
which all return the current timestamp for now.

If one user configured the max-lag, then the generated timestamp will be 
{{currentTimeMillis - random(0, max-lag)}} instead of the original 
{{currentTimeMillis}} only.

For example fields.atime.max-lag = 5000, then the generated timestamp maybe 
consists of (one per second)
|timestamp|1618455622000|1618455623000|1618455624000|1618455625000|
|delta|0|1000|3000|2000|
|generated|1618455622000|1618455622000|1618455621000|1618455623000|

> Datagen add a max lag option for a series of timestamp-related types
> 
>
> Key: FLINK-22275
> URL: https://issues.apache.org/jira/browse/FLINK-22275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Reporter: Yi Tang
>Priority: Minor
>
> For now, the {{datagen}} connector always resolves to the current timestamp 
> for timestamp-related types.
> Here proposals to add a max lag option for these types, which will generate a 
> timestamp with a random lag (with max lag as option) compared with the 
> current timestamp. Leave it as before if the option is not configured.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15020: [FLINK-21445][kubernetes] Application mode does not set the configuration when building PackagedProgram

2021-04-14 Thread GitBox


flinkbot edited a comment on pull request #15020:
URL: https://github.com/apache/flink/pull/15020#issuecomment-785804760


   
   ## CI report:
   
   * 4a4f8a643c603007519ebefc4c1e732347b6d3d2 UNKNOWN
   * 6ebcd56d7080224fcd21f0f22ee467a951feb2e0 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16545)
 
   * de3bd6e46144fc21952f2ea1035d278a6dc1076f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-22283) GraphiteReporter Metrics named xxx alredy exists

2021-04-14 Thread HideOnBush (Jira)
HideOnBush created FLINK-22283:
--

 Summary: GraphiteReporter Metrics named xxx alredy exists
 Key: FLINK-22283
 URL: https://issues.apache.org/jira/browse/FLINK-22283
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.9.0
Reporter: HideOnBush


When I was using Flink 1.9 to monitor GraphiteReporter, an error was reported, 
which caused the relevant indicators of my taskmanager to not be collected.
Error message: a metric named xxx alredy exists



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22113) UniqueKey constraint is lost with multiple sources join in SQL

2021-04-14 Thread Xu Guangheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321868#comment-17321868
 ] 

Xu Guangheng edited comment on FLINK-22113 at 4/15/21, 3:00 AM:


Hello [~jark],

Thanks for you reply. 

Yes, FlinkRelMdColumnUniqueness has implemented areColumnsUnique for TableScan 
node, but there is a TODO inside the implementation. It seems the 
implementation is not complete.

Can you assign the issue to me, I can try to fix it.


was (Author: xuguangheng):
Hello [~jark],

Thanks for you reply. 

Yes, FlinkRelMdColumnUniqueness has implemented areColumnsUnique for TableScan 
node, but there is a TODO inside the implementation. It seems the 
implementation is not complete.

Can you assign the issue to me, I cam try to fix it.

> UniqueKey constraint is lost with multiple sources join in SQL
> --
>
> Key: FLINK-22113
> URL: https://issues.apache.org/jira/browse/FLINK-22113
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Fu Kai
>Priority: Major
> Fix For: 1.14.0
>
>
> Hi team,
>   
>  We have a use case to join multiple data sources to generate a continuous 
> updated view. We defined primary key constraint on all the input sources and 
> all the keys are the subsets in the join condition. All joins are left join.
>   
>  In our case, the first two inputs can produce *JoinKeyContainsUniqueKey* 
> input sepc, which is good and performant. While when it comes to the third 
> input source, it's joined with the intermediate output table of the first two 
> input tables, and the intermediate table does not carry key constraint 
> information(although the thrid source input table does), so it results in a 
> *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance 
> implications per the[ Force Join Unique 
> Key|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651]
>  email thread, we want to know if there is any mitigation solution for this.
>  
> Example:
> Take the example from 
> [https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md]
> {code:java}
> CREATE TEMPORARY TABLE passengers (
>   passenger_key STRING,
>   first_name STRING,
>   last_name STRING,
>   update_time TIMESTAMP(3),
>   PRIMARY KEY (passenger_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'passengers',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE stations (
>   station_key STRING,
>   update_time TIMESTAMP(3),
>   city STRING,
>   PRIMARY KEY (station_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'stations',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE booking_channels (
>   booking_channel_key STRING,
>   update_time TIMESTAMP(3),
>   channel STRING,
>   PRIMARY KEY (booking_channel_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'booking_channels',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE train_activities (
>   scheduled_departure_time TIMESTAMP(3),
>   actual_departure_date TIMESTAMP(3),
>   passenger_key STRING,
>   origin_station_key STRING,
>   destination_station_key STRING,
>   booking_channel_key STRING,
>   PRIMARY KEY (booking_channel_key, origin_station_key, 
> destination_station_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'train_activities',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'json',
>   'value.format' = 'json'
> );
> SELECT 
>   t.actual_departure_date, 
>   p.first_name,
>   p.last_name,
>   b.channel, 
>   os.city AS origin_station,
>   ds.city AS destination_station
> FROM train_activities_1 t
> LEFT JOIN booking_channels b 
> ON t.booking_channel_key = b.booking_channel_key
> LEFT JOIN passengers p
> ON t.passenger_key = p.passenger_key
> LEFT JOIN stations os
> ON t.origin_station_key = os.station_key
> LEFT JOIN stations ds
> ON t.destination_station_key = ds.station_key
> {code}
>  
>  The query will generate exeuction plan of:
>  
> {code:java}
> Flink SQL> explain
> >  SELECT
> >t.actual_departure_date,
> >p.first_name,
> >p.last_name,
> >b.channel,
> >os.city AS origin_station,
> >ds.city AS destination_station
> >  FROM train_activities_1 t
> >  LEFT JOIN booking_channels b
> >  ON t.booking_channel_key = b.booking_channel_key
> >  LEFT JOIN passengers p
> >  ON t.passenger_key = p.passenger_key
> >  

[jira] [Commented] (FLINK-22113) UniqueKey constraint is lost with multiple sources join in SQL

2021-04-14 Thread Xu Guangheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321868#comment-17321868
 ] 

Xu Guangheng commented on FLINK-22113:
--

Hello [~jark],

Thanks for you reply. 

Yes, FlinkRelMdColumnUniqueness has implemented areColumnsUnique for TableScan 
node, but there is a TODO inside the implementation. It seems the 
implementation is not complete.

Can you assign the issue to me, I cam try to fix it.

> UniqueKey constraint is lost with multiple sources join in SQL
> --
>
> Key: FLINK-22113
> URL: https://issues.apache.org/jira/browse/FLINK-22113
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Fu Kai
>Priority: Major
> Fix For: 1.14.0
>
>
> Hi team,
>   
>  We have a use case to join multiple data sources to generate a continuous 
> updated view. We defined primary key constraint on all the input sources and 
> all the keys are the subsets in the join condition. All joins are left join.
>   
>  In our case, the first two inputs can produce *JoinKeyContainsUniqueKey* 
> input sepc, which is good and performant. While when it comes to the third 
> input source, it's joined with the intermediate output table of the first two 
> input tables, and the intermediate table does not carry key constraint 
> information(although the thrid source input table does), so it results in a 
> *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance 
> implications per the[ Force Join Unique 
> Key|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651]
>  email thread, we want to know if there is any mitigation solution for this.
>  
> Example:
> Take the example from 
> [https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md]
> {code:java}
> CREATE TEMPORARY TABLE passengers (
>   passenger_key STRING,
>   first_name STRING,
>   last_name STRING,
>   update_time TIMESTAMP(3),
>   PRIMARY KEY (passenger_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'passengers',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE stations (
>   station_key STRING,
>   update_time TIMESTAMP(3),
>   city STRING,
>   PRIMARY KEY (station_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'stations',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE booking_channels (
>   booking_channel_key STRING,
>   update_time TIMESTAMP(3),
>   channel STRING,
>   PRIMARY KEY (booking_channel_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'booking_channels',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'raw',
>   'value.format' = 'json'
> );
> CREATE TEMPORARY TABLE train_activities (
>   scheduled_departure_time TIMESTAMP(3),
>   actual_departure_date TIMESTAMP(3),
>   passenger_key STRING,
>   origin_station_key STRING,
>   destination_station_key STRING,
>   booking_channel_key STRING,
>   PRIMARY KEY (booking_channel_key, origin_station_key, 
> destination_station_key) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka',
>   'topic' = 'train_activities',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'key.format' = 'json',
>   'value.format' = 'json'
> );
> SELECT 
>   t.actual_departure_date, 
>   p.first_name,
>   p.last_name,
>   b.channel, 
>   os.city AS origin_station,
>   ds.city AS destination_station
> FROM train_activities_1 t
> LEFT JOIN booking_channels b 
> ON t.booking_channel_key = b.booking_channel_key
> LEFT JOIN passengers p
> ON t.passenger_key = p.passenger_key
> LEFT JOIN stations os
> ON t.origin_station_key = os.station_key
> LEFT JOIN stations ds
> ON t.destination_station_key = ds.station_key
> {code}
>  
>  The query will generate exeuction plan of:
>  
> {code:java}
> Flink SQL> explain
> >  SELECT
> >t.actual_departure_date,
> >p.first_name,
> >p.last_name,
> >b.channel,
> >os.city AS origin_station,
> >ds.city AS destination_station
> >  FROM train_activities_1 t
> >  LEFT JOIN booking_channels b
> >  ON t.booking_channel_key = b.booking_channel_key
> >  LEFT JOIN passengers p
> >  ON t.passenger_key = p.passenger_key
> >  LEFT JOIN stations os
> >  ON t.origin_station_key = os.station_key
> >  LEFT JOIN stations ds
> >  ON t.destination_station_key = ds.station_key;
> == Abstract Syntax Tree ==
> LogicalProject(actual_departure_date=[$1], first_name=[$10], last_name=[$11], 
> channel=[$8], origin_station=[$15], destination_station=[$18])
> +- 

[jira] [Updated] (FLINK-22125) Revisit the return value of MapState.get when a key doesn't exist

2021-04-14 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-22125:

Summary: Revisit the return value of MapState.get when a key doesn't exist  
(was: Revisit the return value of MapState.get when the key doesn't exist)

> Revisit the return value of MapState.get when a key doesn't exist
> -
>
> Key: FLINK-22125
> URL: https://issues.apache.org/jira/browse/FLINK-22125
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.13.0
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> Currently, it will thrown KeyError if the key doesn't exist for MapState in 
> Python DataStream API. However, it returns null in the Java DataStream API. 
> Maybe we should keep the behavior the same across Python DataStream API and 
> Java DataStream API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22125) Revisit the return value of MapState.get when the key doesn't exist

2021-04-14 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-22125:
---

Assignee: Dian Fu

> Revisit the return value of MapState.get when the key doesn't exist
> ---
>
> Key: FLINK-22125
> URL: https://issues.apache.org/jira/browse/FLINK-22125
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.13.0
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> Currently, it will thrown KeyError if the key doesn't exist for MapState in 
> Python DataStream API. However, it returns null in the Java DataStream API. 
> Maybe we should keep the behavior the same across Python DataStream API and 
> Java DataStream API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22273) Add documentation for General Python Group Window Aggregation in Python Table API

2021-04-14 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-22273.
---
Resolution: Fixed

Merged to master via 00a92b4fb188a8c1504279c07504b8fa10ca46e6

> Add documentation for General Python Group Window Aggregation in Python Table 
> API
> -
>
> Key: FLINK-22273
> URL: https://issues.apache.org/jira/browse/FLINK-22273
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Documentation
>Affects Versions: 1.13.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] dianfu closed pull request #15609: [FLINK-22273][python][docs] Add documentation for General Python Group Window Aggregation in Python Table API

2021-04-14 Thread GitBox


dianfu closed pull request #15609:
URL: https://github.com/apache/flink/pull/15609


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #15618: [FLINK-22125][python] Returns None when a key doesn't exist when calling MapState.get()

2021-04-14 Thread GitBox


flinkbot commented on pull request #15618:
URL: https://github.com/apache/flink/pull/15618#issuecomment-820020073


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit a5030e29742e58a6df2dbca7d7e1ce27a90e8722 (Thu Apr 15 
02:55:52 UTC 2021)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-22125).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-22125) Revisit the return value of MapState.get when the key doesn't exist

2021-04-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-22125:
---
Labels: pull-request-available  (was: )

> Revisit the return value of MapState.get when the key doesn't exist
> ---
>
> Key: FLINK-22125
> URL: https://issues.apache.org/jira/browse/FLINK-22125
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.13.0
>Reporter: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> Currently, it will thrown KeyError if the key doesn't exist for MapState in 
> Python DataStream API. However, it returns null in the Java DataStream API. 
> Maybe we should keep the behavior the same across Python DataStream API and 
> Java DataStream API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] dianfu opened a new pull request #15618: [FLINK-22125][python] Returns None when a key doesn't exist when calling MapState.get()

2021-04-14 Thread GitBox


dianfu opened a new pull request #15618:
URL: https://github.com/apache/flink/pull/15618


   
   ## What is the purpose of the change
   
   *Currently it will throw KeyError when a key doesn't exist when calling 
MapState.get. This pull request change the behavior of MapState.get to None 
when a key doesn't exist.*
   
   
   ## Verifying this change
   
   This change is a trivial rework without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-22125) Revisit the return value of MapState.get when the key doesn't exist

2021-04-14 Thread Roc Marshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321843#comment-17321843
 ] 

Roc Marshal edited comment on FLINK-22125 at 4/15/21, 2:42 AM:
---

When we are judging whether mapstate is empty or not, there may be some logical 
loopholes, such as using 'is_ empty () 'method instead of`_ is_ empty ` variable


was (Author: rocmarshal):
When we judge whether mapstate is empty or not, there may be some logical 
loopholes, such as using 'is_ empty () 'method instead of`_ is_ empty ` variable

> Revisit the return value of MapState.get when the key doesn't exist
> ---
>
> Key: FLINK-22125
> URL: https://issues.apache.org/jira/browse/FLINK-22125
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.13.0
>Reporter: Dian Fu
>Priority: Major
> Fix For: 1.13.0
>
>
> Currently, it will thrown KeyError if the key doesn't exist for MapState in 
> Python DataStream API. However, it returns null in the Java DataStream API. 
> Maybe we should keep the behavior the same across Python DataStream API and 
> Java DataStream API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22125) Revisit the return value of MapState.get when the key doesn't exist

2021-04-14 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321863#comment-17321863
 ] 

Dian Fu commented on FLINK-22125:
-

Hi [~RocMarshal], there is a method named is_empty() in MapState. You could 
refer to 
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/state.py#L262
 for more details. Not sure what do you mean about _is_empty()?

> Revisit the return value of MapState.get when the key doesn't exist
> ---
>
> Key: FLINK-22125
> URL: https://issues.apache.org/jira/browse/FLINK-22125
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.13.0
>Reporter: Dian Fu
>Priority: Major
> Fix For: 1.13.0
>
>
> Currently, it will thrown KeyError if the key doesn't exist for MapState in 
> Python DataStream API. However, it returns null in the Java DataStream API. 
> Maybe we should keep the behavior the same across Python DataStream API and 
> Java DataStream API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)

2021-04-14 Thread xx chai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321856#comment-17321856
 ] 

xx chai commented on FLINK-22281:
-

i add the Parameter
 
streamTableEnvironment.getConfig().addJobParameter("table.exec.source.cdc-events-duplicate","true");
 
but the result is not change

> flink sql consumer kakfa canal-json message then sum(amount)  
> --
>
> Key: FLINK-22281
> URL: https://issues.apache.org/jira/browse/FLINK-22281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: flink 1.12 local
>Reporter: xx chai
>Priority: Major
> Attachments: screenshot-1.png
>
>
> I use flink sql to consumer kafka canal-json message  the sql is 
> CREATE TABLE kafka_mall_order_info (
>   id int,
>   amount double,
>PRIMARY KEY (  id) NOT ENFORCED
>) WITH (
>   'connector' = 'kafka',
>   'topic' = 'topic_yx-dc-3-102_3306',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'kafka_to_hive',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'canal-json');
>   create table t2 (amount double) with ('connector' = 'print');
>   
>  
> insert into t2 select sum(amount) from kafka_mall_order_info ;
> but the result is not i think 
> the result in image



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16325) A connection check is required, and it needs to be reopened when the JDBC connection is interrupted

2021-04-14 Thread todd (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

todd closed FLINK-16325.

Resolution: Invalid

>  A connection check is required, and it needs to be reopened when the JDBC 
> connection is interrupted
> 
>
> Key: FLINK-16325
> URL: https://issues.apache.org/jira/browse/FLINK-16325
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.10.0
>Reporter: todd
>Priority: Minor
>
> JDBCOutputFormat#writeRecord.
> When writing data, if the JDBC connection has been disconnected, the data 
> will be lost.Therefore, a connectivity judgment is required in the 
> writeRecord method.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22279) Upload the pipeline.classpaths file in yarn application mode

2021-04-14 Thread todd (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

todd closed FLINK-22279.

Resolution: Invalid

> Upload the pipeline.classpaths file in yarn application mode
> 
>
> Key: FLINK-22279
> URL: https://issues.apache.org/jira/browse/FLINK-22279
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.12.0
>Reporter: todd
>Priority: Major
>
> pipeline.classpaths is a local resource package. If this file is used, all 
> nodes must include this file. I think that pipeline.jars should be uploaded 
> from the client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22279) Upload the pipeline.classpaths file in yarn application mode

2021-04-14 Thread todd (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321854#comment-17321854
 ] 

todd commented on FLINK-22279:
--

Thank you very much, what you said is right. I did not pay attention to this 
parameter.  [~fly_in_gis]

> Upload the pipeline.classpaths file in yarn application mode
> 
>
> Key: FLINK-22279
> URL: https://issues.apache.org/jira/browse/FLINK-22279
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.12.0
>Reporter: todd
>Priority: Major
>
> pipeline.classpaths is a local resource package. If this file is used, all 
> nodes must include this file. I think that pipeline.jars should be uploaded 
> from the client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14927) Remove LegacyTimestampTypeInfo/LegacyLocalDateTimeTypeInfo/LegacyInstantTypeInfo when the conversion is not needed

2021-04-14 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-14927:

Priority: Major  (was: Minor)

> Remove 
> LegacyTimestampTypeInfo/LegacyLocalDateTimeTypeInfo/LegacyInstantTypeInfo 
> when the conversion is not needed
> --
>
> Key: FLINK-14927
> URL: https://issues.apache.org/jira/browse/FLINK-14927
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Zhenghua Gao
>Priority: Major
>  Labels: stale-minor
>
> 1) PhysicalTableSourceScan.getSourceTransformation use 
> TypeInfoDataTypeConverter.fromDataTypeToTypeInfo, which bypass 
> TypeConversions to support precision
> 2) Conversion from DateType to TypeInformation (and back) exists in 
> TableSourceUtil.computeIndexMapping



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14927) Remove LegacyTimestampTypeInfo/LegacyLocalDateTimeTypeInfo/LegacyInstantTypeInfo when the conversion is not needed

2021-04-14 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-14927:

Labels:   (was: stale-minor)

> Remove 
> LegacyTimestampTypeInfo/LegacyLocalDateTimeTypeInfo/LegacyInstantTypeInfo 
> when the conversion is not needed
> --
>
> Key: FLINK-14927
> URL: https://issues.apache.org/jira/browse/FLINK-14927
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Zhenghua Gao
>Priority: Major
>
> 1) PhysicalTableSourceScan.getSourceTransformation use 
> TypeInfoDataTypeConverter.fromDataTypeToTypeInfo, which bypass 
> TypeConversions to support precision
> 2) Conversion from DateType to TypeInformation (and back) exists in 
> TableSourceUtil.computeIndexMapping



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16120) Update the hive section of the connectors doc

2021-04-14 Thread Rui Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321852#comment-17321852
 ] 

Rui Li commented on FLINK-16120:


Hive connector doc has been moved to the {{Table & SQL Connectors}}. Closing 
this one.

> Update the hive section of the connectors doc
> -
>
> Key: FLINK-16120
> URL: https://issues.apache.org/jira/browse/FLINK-16120
> Project: Flink
>  Issue Type: Task
>  Components: Connectors / Hive, Documentation
>Reporter: Rui Li
>Priority: Minor
>  Labels: stale-minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16120) Update the hive section of the connectors doc

2021-04-14 Thread Rui Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Li updated FLINK-16120:
---
Release Note:   (was: Hive connector doc has been moved to the {{Table & 
SQL Connectors}}. Closing this one.)

> Update the hive section of the connectors doc
> -
>
> Key: FLINK-16120
> URL: https://issues.apache.org/jira/browse/FLINK-16120
> Project: Flink
>  Issue Type: Task
>  Components: Connectors / Hive, Documentation
>Reporter: Rui Li
>Priority: Minor
>  Labels: stale-minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-16120) Update the hive section of the connectors doc

2021-04-14 Thread Rui Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Li resolved FLINK-16120.

Release Note: Hive connector doc has been moved to the {{Table & SQL 
Connectors}}. Closing this one.
  Resolution: Not A Problem

> Update the hive section of the connectors doc
> -
>
> Key: FLINK-16120
> URL: https://issues.apache.org/jira/browse/FLINK-16120
> Project: Flink
>  Issue Type: Task
>  Components: Connectors / Hive, Documentation
>Reporter: Rui Li
>Priority: Minor
>  Labels: stale-minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22275) Datagen add a max lag option for a series of timestamp-related types

2021-04-14 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321851#comment-17321851
 ] 

Jark Wu commented on FLINK-22275:
-

We should discuss about the API/options and behaviors first. 

> Datagen add a max lag option for a series of timestamp-related types
> 
>
> Key: FLINK-22275
> URL: https://issues.apache.org/jira/browse/FLINK-22275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Reporter: Yi Tang
>Priority: Minor
>
> For now, the {{datagen}} connector always resolves to the current timestamp 
> for timestamp-related types.
> Here proposals to add a max lag option for these types, which will generate a 
> timestamp with a random lag (with max lag as option) compared with the 
> current timestamp. Leave it as before if the option is not configured.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22275) Datagen add a max lag option for a series of timestamp-related types

2021-04-14 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-22275:
---

Assignee: Yi Tang

> Datagen add a max lag option for a series of timestamp-related types
> 
>
> Key: FLINK-22275
> URL: https://issues.apache.org/jira/browse/FLINK-22275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Reporter: Yi Tang
>Assignee: Yi Tang
>Priority: Minor
>
> For now, the {{datagen}} connector always resolves to the current timestamp 
> for timestamp-related types.
> Here proposals to add a max lag option for these types, which will generate a 
> timestamp with a random lag (with max lag as option) compared with the 
> current timestamp. Leave it as before if the option is not configured.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22275) Datagen add a max lag option for a series of timestamp-related types

2021-04-14 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-22275:
---

Assignee: (was: Yi Tang)

> Datagen add a max lag option for a series of timestamp-related types
> 
>
> Key: FLINK-22275
> URL: https://issues.apache.org/jira/browse/FLINK-22275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Reporter: Yi Tang
>Priority: Minor
>
> For now, the {{datagen}} connector always resolves to the current timestamp 
> for timestamp-related types.
> Here proposals to add a max lag option for these types, which will generate a 
> timestamp with a random lag (with max lag as option) compared with the 
> current timestamp. Leave it as before if the option is not configured.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)

2021-04-14 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-22281.
---
Resolution: Not A Problem

> flink sql consumer kakfa canal-json message then sum(amount)  
> --
>
> Key: FLINK-22281
> URL: https://issues.apache.org/jira/browse/FLINK-22281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: flink 1.12 local
>Reporter: xx chai
>Priority: Major
> Attachments: screenshot-1.png
>
>
> I use flink sql to consumer kafka canal-json message  the sql is 
> CREATE TABLE kafka_mall_order_info (
>   id int,
>   amount double,
>PRIMARY KEY (  id) NOT ENFORCED
>) WITH (
>   'connector' = 'kafka',
>   'topic' = 'topic_yx-dc-3-102_3306',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'kafka_to_hive',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'canal-json');
>   create table t2 (amount double) with ('connector' = 'print');
>   
>  
> insert into t2 select sum(amount) from kafka_mall_order_info ;
> but the result is not i think 
> the result in image



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)

2021-04-14 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321850#comment-17321850
 ] 

Jark Wu commented on FLINK-22281:
-

If you don't have the full historical data, only the incremental binlog data, 
then aggregating on it will get wrong results, because the input data is 
incorrect (update an non-existing data). 

You can try to turn on {{table.exec.source.cdc-events-duplicate=true}} to 
convert the in-complete changelog into a normalized changelog, e.g. a 
non-existing update will be converted into an insert. 

See 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/canal.html#duplicate-change-events

> flink sql consumer kakfa canal-json message then sum(amount)  
> --
>
> Key: FLINK-22281
> URL: https://issues.apache.org/jira/browse/FLINK-22281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: flink 1.12 local
>Reporter: xx chai
>Priority: Major
> Attachments: screenshot-1.png
>
>
> I use flink sql to consumer kafka canal-json message  the sql is 
> CREATE TABLE kafka_mall_order_info (
>   id int,
>   amount double,
>PRIMARY KEY (  id) NOT ENFORCED
>) WITH (
>   'connector' = 'kafka',
>   'topic' = 'topic_yx-dc-3-102_3306',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'kafka_to_hive',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'canal-json');
>   create table t2 (amount double) with ('connector' = 'print');
>   
>  
> insert into t2 select sum(amount) from kafka_mall_order_info ;
> but the result is not i think 
> the result in image



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-12085) Improve Mesos job cluster entry point 'MesosJobClusterEntrypoint'

2021-04-14 Thread Jacky Yin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacky Yin updated FLINK-12085:
--
Labels:   (was: stale-minor)

> Improve Mesos job cluster entry point  'MesosJobClusterEntrypoint'
> --
>
> Key: FLINK-12085
> URL: https://issues.apache.org/jira/browse/FLINK-12085
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Mesos
>Affects Versions: 1.7.2
>Reporter: Jacky Yin
>Assignee: Jacky Yin
>Priority: Minor
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Currently, the Mesos job cluster is not that easy to use. You have to 
> manually serialize the job graph and specify the job graph file path. And the 
> only way to include the user code jars is to put them in the `lib` folder. 
> `StandaloneJobClusterEntrypoint` is a good example to learn. It can include 
> the user code jars and allow to specify the entry class.  That way we would 
> not need to generate the JobGraph yourself and then serialize it. I would 
> like to enhance `MesosJobClusterEntrypoint` similarly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] lirui-apache commented on a change in pull request #15549: [FLINK-20761][hive] Escape the location path when creating input splits

2021-04-14 Thread GitBox


lirui-apache commented on a change in pull request #15549:
URL: https://github.com/apache/flink/pull/15549#discussion_r613709681



##
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
##
@@ -442,6 +448,41 @@ public void testDynamicPartWithOrderBy() throws Exception {
 }
 }
 
+@Test
+public void testLocationWithComma() throws Exception {
+TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+File location = tempFolder.newFolder(",tbl1,location,");
+try {
+// test table location
+tableEnv.executeSql(
+String.format(
+"create table tbl1 (x int) location '%s'", 
location.getAbsolutePath()));
+tableEnv.executeSql("insert into tbl1 values (1),(2)").await();
+List results =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select * from 
tbl1").collect());
+assertEquals("[+I[1], +I[2]]", results.toString());
+// test partition location
+tableEnv.executeSql("create table tbl2 (x int) partitioned by (p 
string)");
+location = tempFolder.newFolder(",");
+tableEnv.executeSql(
+String.format(
+"alter table tbl2 add partition (p='a') location 
'%s'",

Review comment:
   The location path has comma and hive's partition value doesn't have to 
be the same as the location path. But I can also add a test case where the 
partition value also contains comma.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-22279) Upload the pipeline.classpaths file in yarn application mode

2021-04-14 Thread Yang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Wang updated FLINK-22279:
--
Priority: Major  (was: Blocker)

> Upload the pipeline.classpaths file in yarn application mode
> 
>
> Key: FLINK-22279
> URL: https://issues.apache.org/jira/browse/FLINK-22279
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.12.0
>Reporter: renjianxu
>Priority: Major
>
> pipeline.classpaths is a local resource package. If this file is used, all 
> nodes must include this file. I think that pipeline.jars should be uploaded 
> from the client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22279) Upload the pipeline.classpaths file in yarn application mode

2021-04-14 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321846#comment-17321846
 ] 

Yang Wang commented on FLINK-22279:
---

I do not think this is a bug of Flink. The config option 
{{pipeline.classpaths}} is designed for working same as CLI option 
{{-C,--classpath}}. If you want to ship files, then you should use 
{{yarn.ship-files}}.

 
{code:java}
 -C,--classpath  Adds a URL to each user code
  classloader  on all nodes in the
  cluster. The paths must specify a
  protocol (e.g. file://) and be
  accessible on all nodes (e.g. by means
  of a NFS share). You can use this
  option multiple times for specifying
  more than one URL. The protocol must
  be supported by the {@link
  java.net.URLClassLoader}.
{code}

> Upload the pipeline.classpaths file in yarn application mode
> 
>
> Key: FLINK-22279
> URL: https://issues.apache.org/jira/browse/FLINK-22279
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client
>Affects Versions: 1.12.0
>Reporter: renjianxu
>Priority: Blocker
>
> pipeline.classpaths is a local resource package. If this file is used, all 
> nodes must include this file. I think that pipeline.jars should be uploaded 
> from the client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22280) Add XML output format for Flink Table

2021-04-14 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-22280:

Component/s: (was: Table SQL / API)
 Table SQL / Ecosystem
 Formats (JSON, Avro, Parquet, ORC, SequenceFile)

> Add XML output format for Flink Table
> -
>
> Key: FLINK-22280
> URL: https://issues.apache.org/jira/browse/FLINK-22280
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.12.1
>Reporter: François Lacombe
>Priority: Major
>
> Dear maintainers
> I'm looking for the ability to output xml files from Flink Table API, just 
> like csv and json already supported formats.
> To me, a new format could be required to make the appropriate serialization. 
> Am I missing any existing feature (or duplicate issue) that could allow it 
> without a dedicated format?
> Depending on your returns and if it makes sense, I could get involved in 
> writing the appropriate format based on the same logic as 
> `JsonRowDataSerializationSchema`.
> Best regards



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22280) Add XML output format for Flink Table

2021-04-14 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321845#comment-17321845
 ] 

Jark Wu commented on FLINK-22280:
-

Yes. I think this can be a xml format. 

> Add XML output format for Flink Table
> -
>
> Key: FLINK-22280
> URL: https://issues.apache.org/jira/browse/FLINK-22280
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>Affects Versions: 1.12.1
>Reporter: François Lacombe
>Priority: Major
>
> Dear maintainers
> I'm looking for the ability to output xml files from Flink Table API, just 
> like csv and json already supported formats.
> To me, a new format could be required to make the appropriate serialization. 
> Am I missing any existing feature (or duplicate issue) that could allow it 
> without a dedicated format?
> Depending on your returns and if it makes sense, I could get involved in 
> writing the appropriate format based on the same logic as 
> `JsonRowDataSerializationSchema`.
> Best regards



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22125) Revisit the return value of MapState.get when the key doesn't exist

2021-04-14 Thread Roc Marshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321843#comment-17321843
 ] 

Roc Marshal commented on FLINK-22125:
-

When we judge whether mapstate is empty or not, there may be some logical 
loopholes, such as using 'is_ empty () 'method instead of`_ is_ empty ` variable

> Revisit the return value of MapState.get when the key doesn't exist
> ---
>
> Key: FLINK-22125
> URL: https://issues.apache.org/jira/browse/FLINK-22125
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.13.0
>Reporter: Dian Fu
>Priority: Major
> Fix For: 1.13.0
>
>
> Currently, it will thrown KeyError if the key doesn't exist for MapState in 
> Python DataStream API. However, it returns null in the Java DataStream API. 
> Maybe we should keep the behavior the same across Python DataStream API and 
> Java DataStream API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread

2021-04-14 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-22282:
-
Issue Type: Bug  (was: Task)

> Move creation of SplitEnumerator to the SourceCoordinator thread
> 
>
> Key: FLINK-22282
> URL: https://issues.apache.org/jira/browse/FLINK-22282
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.2
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 1.13.0, 1.12.3
>
>
> Currently the creation of the SplitEnumerator is in the JM main thread. In 
> case the SplitEnumerator instantiation takes long, the job execution will 
> timeout. The fix is moving the SplitEnumerator creation to the coordinator 
> thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread

2021-04-14 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-22282:
-
Priority: Critical  (was: Major)

> Move creation of SplitEnumerator to the SourceCoordinator thread
> 
>
> Key: FLINK-22282
> URL: https://issues.apache.org/jira/browse/FLINK-22282
> Project: Flink
>  Issue Type: Task
>  Components: Connectors / Common
>Affects Versions: 1.12.2
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 1.13.0, 1.12.3
>
>
> Currently the creation of the SplitEnumerator is in the JM main thread. In 
> case the SplitEnumerator instantiation takes long, the job execution will 
> timeout. The fix is moving the SplitEnumerator creation to the coordinator 
> thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread

2021-04-14 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-22282:
-
Issue Type: Improvement  (was: Bug)

> Move creation of SplitEnumerator to the SourceCoordinator thread
> 
>
> Key: FLINK-22282
> URL: https://issues.apache.org/jira/browse/FLINK-22282
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.12.2
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 1.13.0, 1.12.3
>
>
> Currently the creation of the SplitEnumerator is in the JM main thread. In 
> case the SplitEnumerator instantiation takes long, the job execution will 
> timeout. The fix is moving the SplitEnumerator creation to the coordinator 
> thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread

2021-04-14 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated FLINK-22282:
-
Fix Version/s: 1.12.3
   1.13.0

> Move creation of SplitEnumerator to the SourceCoordinator thread
> 
>
> Key: FLINK-22282
> URL: https://issues.apache.org/jira/browse/FLINK-22282
> Project: Flink
>  Issue Type: Task
>  Components: Connectors / Common
>Affects Versions: 1.12.2
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
> Fix For: 1.13.0, 1.12.3
>
>
> Currently the creation of the SplitEnumerator is in the JM main thread. In 
> case the SplitEnumerator instantiation takes long, the job execution will 
> timeout. The fix is moving the SplitEnumerator creation to the coordinator 
> thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread

2021-04-14 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-22282:


Assignee: Jiangjie Qin

> Move creation of SplitEnumerator to the SourceCoordinator thread
> 
>
> Key: FLINK-22282
> URL: https://issues.apache.org/jira/browse/FLINK-22282
> Project: Flink
>  Issue Type: Task
>  Components: Connectors / Common
>Affects Versions: 1.12.2
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
>
> Currently the creation of the SplitEnumerator is in the JM main thread. In 
> case the SplitEnumerator instantiation takes long, the job execution will 
> timeout. The fix is moving the SplitEnumerator creation to the coordinator 
> thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)

2021-04-14 Thread xx chai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xx chai updated FLINK-22281:

Description: 
I use flink sql to consumer kafka canal-json message  the sql is 
CREATE TABLE kafka_mall_order_info (
  id int,
  amount double,
   PRIMARY KEY (  id) NOT ENFORCED
   ) WITH (
  'connector' = 'kafka',
  'topic' = 'topic_yx-dc-3-102_3306',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'kafka_to_hive',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'canal-json');
  create table t2 (amount double) with ('connector' = 'print');
  
 
insert into t2 select sum(amount) from kafka_mall_order_info ;

but the result is not i think 
the result in image



  was:
I use flink sql to consumer kafka canal-json message  the sql is 
CREATE TABLE kafka_mall_order_info (
  id int,
  amount double,
   PRIMARY KEY (  id) NOT ENFORCED
   ) WITH (
  'connector' = 'kafka',
  'topic' = 'topic_yx-dc-3-102_3306',
  'properties.bootstrap.servers' = 
'192.168.3.100:9092,192.168.3.101:9092,192.168.3.102:9092',
  'properties.group.id' = 'kafka_to_hive',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'canal-json');
  create table t2 (amount double) with ('connector' = 'print');
  
 
insert into t2 select sum(amount) from kafka_mall_order_info ;

but the result is not i think 
the result in image




> flink sql consumer kakfa canal-json message then sum(amount)  
> --
>
> Key: FLINK-22281
> URL: https://issues.apache.org/jira/browse/FLINK-22281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: flink 1.12 local
>Reporter: xx chai
>Priority: Major
> Attachments: screenshot-1.png
>
>
> I use flink sql to consumer kafka canal-json message  the sql is 
> CREATE TABLE kafka_mall_order_info (
>   id int,
>   amount double,
>PRIMARY KEY (  id) NOT ENFORCED
>) WITH (
>   'connector' = 'kafka',
>   'topic' = 'topic_yx-dc-3-102_3306',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'kafka_to_hive',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'canal-json');
>   create table t2 (amount double) with ('connector' = 'print');
>   
>  
> insert into t2 select sum(amount) from kafka_mall_order_info ;
> but the result is not i think 
> the result in image



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)

2021-04-14 Thread xx chai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xx chai updated FLINK-22281:

Description: 
I use flink sql to consumer kafka canal-json message  the sql is 
CREATE TABLE kafka_mall_order_info (
  id int,
  amount double,
   PRIMARY KEY (  id) NOT ENFORCED
   ) WITH (
  'connector' = 'kafka',
  'topic' = 'topic_yx-dc-3-102_3306',
  'properties.bootstrap.servers' = 
'192.168.3.100:9092,192.168.3.101:9092,192.168.3.102:9092',
  'properties.group.id' = 'kafka_to_hive',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'canal-json');
  create table t2 (amount double) with ('connector' = 'print');
  
 
insert into t2 select sum(amount) from kafka_mall_order_info ;

but the result is not i think 
the result in image



  was:
I use flink sql to consumer kafka canal-json message  the sql is 
CREATE TABLE kafka_mall_order_info (
  id int,
  amount double,
   PRIMARY KEY (  id) NOT ENFORCED
   ) WITH (
  'connector' = 'kafka',
  'topic' = 'topic_yx-dc-3-102_3306',
  'properties.bootstrap.servers' = 
'192.168.3.100:9092,192.168.3.101:9092,192.168.3.102:9092',
  'properties.group.id' = 'kafka_to_hive',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'canal-json');
  create table t2 (amount double) with ('connector' = 'print');
  
 
insert into t2 select sum(amount) from kafka_mall_order_info ;

but the result is not i think 




> flink sql consumer kakfa canal-json message then sum(amount)  
> --
>
> Key: FLINK-22281
> URL: https://issues.apache.org/jira/browse/FLINK-22281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: flink 1.12 local
>Reporter: xx chai
>Priority: Major
> Attachments: screenshot-1.png
>
>
> I use flink sql to consumer kafka canal-json message  the sql is 
> CREATE TABLE kafka_mall_order_info (
>   id int,
>   amount double,
>PRIMARY KEY (  id) NOT ENFORCED
>) WITH (
>   'connector' = 'kafka',
>   'topic' = 'topic_yx-dc-3-102_3306',
>   'properties.bootstrap.servers' = 
> '192.168.3.100:9092,192.168.3.101:9092,192.168.3.102:9092',
>   'properties.group.id' = 'kafka_to_hive',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'canal-json');
>   create table t2 (amount double) with ('connector' = 'print');
>   
>  
> insert into t2 select sum(amount) from kafka_mall_order_info ;
> but the result is not i think 
> the result in image



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)

2021-04-14 Thread xx chai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xx chai updated FLINK-22281:

Attachment: screenshot-1.png

> flink sql consumer kakfa canal-json message then sum(amount)  
> --
>
> Key: FLINK-22281
> URL: https://issues.apache.org/jira/browse/FLINK-22281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: flink 1.12 local
>Reporter: xx chai
>Priority: Major
> Attachments: screenshot-1.png
>
>
> I use flink sql to consumer kafka canal-json message  the sql is 
> CREATE TABLE kafka_mall_order_info (
>   id int,
>   amount double,
>PRIMARY KEY (  id) NOT ENFORCED
>) WITH (
>   'connector' = 'kafka',
>   'topic' = 'topic_yx-dc-3-102_3306',
>   'properties.bootstrap.servers' = 
> '192.168.3.100:9092,192.168.3.101:9092,192.168.3.102:9092',
>   'properties.group.id' = 'kafka_to_hive',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'canal-json');
>   create table t2 (amount double) with ('connector' = 'print');
>   
>  
> insert into t2 select sum(amount) from kafka_mall_order_info ;
> but the result is not i think 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)

2021-04-14 Thread xx chai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xx chai updated FLINK-22281:

Description: 
I use flink sql to consumer kafka canal-json message  the sql is 
CREATE TABLE kafka_mall_order_info (
  id int,
  amount double,
   PRIMARY KEY (  id) NOT ENFORCED
   ) WITH (
  'connector' = 'kafka',
  'topic' = 'topic_yx-dc-3-102_3306',
  'properties.bootstrap.servers' = 
'192.168.3.100:9092,192.168.3.101:9092,192.168.3.102:9092',
  'properties.group.id' = 'kafka_to_hive',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'canal-json');
  create table t2 (amount double) with ('connector' = 'print');
  
 
insert into t2 select sum(amount) from kafka_mall_order_info ;

but the result is not i think 



  was:I use flink sql to consumer kafka canal-json message 

Environment: flink 1.12 local

> flink sql consumer kakfa canal-json message then sum(amount)  
> --
>
> Key: FLINK-22281
> URL: https://issues.apache.org/jira/browse/FLINK-22281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: flink 1.12 local
>Reporter: xx chai
>Priority: Major
>
> I use flink sql to consumer kafka canal-json message  the sql is 
> CREATE TABLE kafka_mall_order_info (
>   id int,
>   amount double,
>PRIMARY KEY (  id) NOT ENFORCED
>) WITH (
>   'connector' = 'kafka',
>   'topic' = 'topic_yx-dc-3-102_3306',
>   'properties.bootstrap.servers' = 
> '192.168.3.100:9092,192.168.3.101:9092,192.168.3.102:9092',
>   'properties.group.id' = 'kafka_to_hive',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'canal-json');
>   create table t2 (amount double) with ('connector' = 'print');
>   
>  
> insert into t2 select sum(amount) from kafka_mall_order_info ;
> but the result is not i think 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread

2021-04-14 Thread Jiangjie Qin (Jira)
Jiangjie Qin created FLINK-22282:


 Summary: Move creation of SplitEnumerator to the SourceCoordinator 
thread
 Key: FLINK-22282
 URL: https://issues.apache.org/jira/browse/FLINK-22282
 Project: Flink
  Issue Type: Task
  Components: Connectors / Common
Affects Versions: 1.12.2
Reporter: Jiangjie Qin


Currently the creation of the SplitEnumerator is in the JM main thread. In case 
the SplitEnumerator instantiation takes long, the job execution will timeout. 
The fix is moving the SplitEnumerator creation to the coordinator thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22275) Datagen add a max lag option for a series of timestamp-related types

2021-04-14 Thread Yi Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321842#comment-17321842
 ] 

Yi Tang commented on FLINK-22275:
-

[~jark] You can assign it to me, if no further more discussion needed.

> Datagen add a max lag option for a series of timestamp-related types
> 
>
> Key: FLINK-22275
> URL: https://issues.apache.org/jira/browse/FLINK-22275
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Reporter: Yi Tang
>Priority: Minor
>
> For now, the {{datagen}} connector always resolves to the current timestamp 
> for timestamp-related types.
> Here proposals to add a max lag option for these types, which will generate a 
> timestamp with a random lag (with max lag as option) compared with the 
> current timestamp. Leave it as before if the option is not configured.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)

2021-04-14 Thread xx chai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xx chai updated FLINK-22281:

  Component/s: Table SQL / API
Affects Version/s: 1.12.0
  Description: I use flink sql to consumer kafka canal-json message 
   Issue Type: Bug  (was: Improvement)
  Summary: flink sql consumer kakfa canal-json message then 
sum(amount)(was: flink sql)

> flink sql consumer kakfa canal-json message then sum(amount)  
> --
>
> Key: FLINK-22281
> URL: https://issues.apache.org/jira/browse/FLINK-22281
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
>Reporter: xx chai
>Priority: Major
>
> I use flink sql to consumer kafka canal-json message 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22281) flink sql

2021-04-14 Thread xx chai (Jira)
xx chai created FLINK-22281:
---

 Summary: flink sql
 Key: FLINK-22281
 URL: https://issues.apache.org/jira/browse/FLINK-22281
 Project: Flink
  Issue Type: Improvement
Reporter: xx chai






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KurtYoung commented on a change in pull request #15549: [FLINK-20761][hive] Escape the location path when creating input splits

2021-04-14 Thread GitBox


KurtYoung commented on a change in pull request #15549:
URL: https://github.com/apache/flink/pull/15549#discussion_r613693906



##
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
##
@@ -442,6 +448,41 @@ public void testDynamicPartWithOrderBy() throws Exception {
 }
 }
 
+@Test
+public void testLocationWithComma() throws Exception {
+TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+File location = tempFolder.newFolder(",tbl1,location,");
+try {
+// test table location
+tableEnv.executeSql(
+String.format(
+"create table tbl1 (x int) location '%s'", 
location.getAbsolutePath()));
+tableEnv.executeSql("insert into tbl1 values (1),(2)").await();
+List results =
+CollectionUtil.iteratorToList(
+tableEnv.executeSql("select * from 
tbl1").collect());
+assertEquals("[+I[1], +I[2]]", results.toString());
+// test partition location
+tableEnv.executeSql("create table tbl2 (x int) partitioned by (p 
string)");
+location = tempFolder.newFolder(",");
+tableEnv.executeSql(
+String.format(
+"alter table tbl2 add partition (p='a') location 
'%s'",

Review comment:
   The partition path should have comma?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   5   6   7   8   9   10   >