[jira] [Assigned] (FLINK-22113) UniqueKey constraint is lost with multiple sources join in SQL
[ https://issues.apache.org/jira/browse/FLINK-22113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-22113: --- Assignee: Xu Guangheng > UniqueKey constraint is lost with multiple sources join in SQL > -- > > Key: FLINK-22113 > URL: https://issues.apache.org/jira/browse/FLINK-22113 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.0 >Reporter: Fu Kai >Assignee: Xu Guangheng >Priority: Major > Fix For: 1.14.0 > > > Hi team, > > We have a use case to join multiple data sources to generate a continuous > updated view. We defined primary key constraint on all the input sources and > all the keys are the subsets in the join condition. All joins are left join. > > In our case, the first two inputs can produce *JoinKeyContainsUniqueKey* > input sepc, which is good and performant. While when it comes to the third > input source, it's joined with the intermediate output table of the first two > input tables, and the intermediate table does not carry key constraint > information(although the thrid source input table does), so it results in a > *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance > implications per the[ Force Join Unique > Key|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651] > email thread, we want to know if there is any mitigation solution for this. > > Example: > Take the example from > [https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md] > {code:java} > CREATE TEMPORARY TABLE passengers ( > passenger_key STRING, > first_name STRING, > last_name STRING, > update_time TIMESTAMP(3), > PRIMARY KEY (passenger_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'passengers', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE stations ( > station_key STRING, > update_time TIMESTAMP(3), > city STRING, > PRIMARY KEY (station_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'stations', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE booking_channels ( > booking_channel_key STRING, > update_time TIMESTAMP(3), > channel STRING, > PRIMARY KEY (booking_channel_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'booking_channels', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE train_activities ( > scheduled_departure_time TIMESTAMP(3), > actual_departure_date TIMESTAMP(3), > passenger_key STRING, > origin_station_key STRING, > destination_station_key STRING, > booking_channel_key STRING, > PRIMARY KEY (booking_channel_key, origin_station_key, > destination_station_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'train_activities', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'json', > 'value.format' = 'json' > ); > SELECT > t.actual_departure_date, > p.first_name, > p.last_name, > b.channel, > os.city AS origin_station, > ds.city AS destination_station > FROM train_activities_1 t > LEFT JOIN booking_channels b > ON t.booking_channel_key = b.booking_channel_key > LEFT JOIN passengers p > ON t.passenger_key = p.passenger_key > LEFT JOIN stations os > ON t.origin_station_key = os.station_key > LEFT JOIN stations ds > ON t.destination_station_key = ds.station_key > {code} > > The query will generate exeuction plan of: > > {code:java} > Flink SQL> explain > > SELECT > >t.actual_departure_date, > >p.first_name, > >p.last_name, > >b.channel, > >os.city AS origin_station, > >ds.city AS destination_station > > FROM train_activities_1 t > > LEFT JOIN booking_channels b > > ON t.booking_channel_key = b.booking_channel_key > > LEFT JOIN passengers p > > ON t.passenger_key = p.passenger_key > > LEFT JOIN stations os > > ON t.origin_station_key = os.station_key > > LEFT JOIN stations ds > > ON t.destination_station_key = ds.station_key; > == Abstract Syntax Tree == > LogicalProject(actual_departure_date=[$1], first_name=[$10], last_name=[$11], > channel=[$8], origin_station=[$15], destination_station=[$18]) > +- LogicalJoin(condition=[=($4, $16)], joinType=[left]) >:- LogicalJoin(condition=[=($3, $13)], joinType=[left]) >: :- LogicalJoin(condition=[=($2, $9)], joinType=[left]) >: : :- LogicalJoin(condition=[=($5, $6)], joinType=[left]) >: : : :-
[jira] [Closed] (FLINK-22267) Savepoint an application for source of upsert-kafka, then restart the application from the savepoint, state not be recovered.
[ https://issues.apache.org/jira/browse/FLINK-22267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-22267. --- Resolution: Not A Problem > Savepoint an application for source of upsert-kafka, then restart the > application from the savepoint, state not be recovered. > --- > > Key: FLINK-22267 > URL: https://issues.apache.org/jira/browse/FLINK-22267 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.12.2 >Reporter: Carl >Priority: Major > Attachments: image-2021-04-14-11-17-00-207.png > > > My operation is as follows: > 1. Savepoint an application for source of upsert-kafka > 2. Delete the upsert Kafka topic data > 3. restart the application from the savepoint > 4. Log shows that the offset has been restored, but the state has not been > restored > What I want to confirm is that restart from savepoint in the source > upsert-kafka application not restore the state from savepoint state but > restore the state from earliest offset kafka message? > If so, why reset offset: > !image-2021-04-14-11-17-00-207.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-22054) Using a shared watcher for ConfigMap watching
[ https://issues.apache.org/jira/browse/FLINK-22054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321916#comment-17321916 ] hayden zhou edited comment on FLINK-22054 at 4/15/21, 5:53 AM: --- [~trohrmann] [~yittg] I can't submit batch jobs more than almost 60, I submitted one job every 5 minutes if it is watching connection is exhausted. did the connection did not release to pool when one batch job is closed? was (Author: hayden zhou): [~trohrmann] [~yittg] I can't submit batch jobs more than almost 60, I submitted one job every 5 minutes if it is watching connection is exhausted. did the connection did not close when one batch job is closed? > Using a shared watcher for ConfigMap watching > - > > Key: FLINK-22054 > URL: https://issues.apache.org/jira/browse/FLINK-22054 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.2, 1.13.0 >Reporter: Yi Tang >Assignee: Yi Tang >Priority: Major > Labels: k8s-ha, pull-request-available > Fix For: 1.14.0 > > > While using K8s HA service, the watching for ConfigMap is separate for each > job. As the number of running jobs increases, this consumes a large amount of > connections. > Here we proposal to use a shard watcher for each FlinkKubeClient, and > dispatch events to different listeners. At the same time, we should keep the > same semantic with watching separately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22267) Savepoint an application for source of upsert-kafka, then restart the application from the savepoint, state not be recovered.
[ https://issues.apache.org/jira/browse/FLINK-22267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321920#comment-17321920 ] Jark Wu commented on FLINK-22267: - >From the log information, it did restore source offset state from the >savepoint, however, the restored source offset is not existed in the kafka >topic (cleaned by your step 2). In this case, Kafka will reset offset to >earliest by default, see Kafka property {{auto.offset.reset}}. > Savepoint an application for source of upsert-kafka, then restart the > application from the savepoint, state not be recovered. > --- > > Key: FLINK-22267 > URL: https://issues.apache.org/jira/browse/FLINK-22267 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.12.2 >Reporter: Carl >Priority: Major > Attachments: image-2021-04-14-11-17-00-207.png > > > My operation is as follows: > 1. Savepoint an application for source of upsert-kafka > 2. Delete the upsert Kafka topic data > 3. restart the application from the savepoint > 4. Log shows that the offset has been restored, but the state has not been > restored > What I want to confirm is that restart from savepoint in the source > upsert-kafka application not restore the state from savepoint state but > restore the state from earliest offset kafka message? > If so, why reset offset: > !image-2021-04-14-11-17-00-207.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-22169) Beautify the CliTableauResultView when print
[ https://issues.apache.org/jira/browse/FLINK-22169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-22169. -- Resolution: Fixed fixed: 326a564ebc0e397654ccd4e5a83a77ef811c6a76 > Beautify the CliTableauResultView when print > > > Key: FLINK-22169 > URL: https://issues.apache.org/jira/browse/FLINK-22169 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.13.0 >Reporter: Shengkai Fang >Assignee: Shengkai Fang >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > Attachments: print.png > > > In batch mode, the print is not as same as before. > !print.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] KurtYoung merged pull request #15603: [FLINK-22169][sql-client] Beautify the CliTableauResultView when print
KurtYoung merged pull request #15603: URL: https://github.com/apache/flink/pull/15603 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KurtYoung commented on a change in pull request #15603: [FLINK-22169][sql-client] Beautify the CliTableauResultView when print
KurtYoung commented on a change in pull request #15603: URL: https://github.com/apache/flink/pull/15603#discussion_r613735406 ## File path: flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java ## @@ -233,22 +233,7 @@ public void testCancelBatchResult() throws Exception { furture.get(5, TimeUnit.SECONDS); Assert.assertEquals( - "+-+-+--++++" -+ System.lineSeparator() -+ "| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |" -+ System.lineSeparator() -+ "+-+-+--++++" -+ System.lineSeparator() -+ "| (NULL) | 1 |2 | abc | 1.23 | 2020-03-01 18:39:14.0 |" -+ System.lineSeparator() -+ "| false | (NULL) |0 | | 1 | 2020-03-01 18:39:14.1 |" -+ System.lineSeparator() -+ "|true | 2147483647 | (NULL) | abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |" -+ System.lineSeparator() -+ "| false | -2147483648 | 9223372036854775807 | (NULL) |12345.06789 |2020-03-01 18:39:14.123 |" -+ System.lineSeparator() -+ "Query terminated, received a total of 4 rows" -+ System.lineSeparator(), +"Query terminated, received a total of 0 row" + System.lineSeparator(), Review comment: I think this is unexpected. The test is trying to verify that we can actually receive part of the data until users terminates the batch query. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15607: [FLINK-20779][python][docs] Add documentation for row-based operation in Python Table API
flinkbot edited a comment on pull request #15607: URL: https://github.com/apache/flink/pull/15607#issuecomment-819349925 ## CI report: * 8a62da0c014fc75b3bad145c2fd46fc1809a9c28 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16580) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15595: [FLINK-22171][sql-client][docs] Update the SQL Client doc
flinkbot edited a comment on pull request #15595: URL: https://github.com/apache/flink/pull/15595#issuecomment-818681408 ## CI report: * 6e06c0cb3d52f23f31ab7855880da5177f7d1627 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16532) * b7a82f07683d5a76f6743d68cb4cd174c0453bca UNKNOWN * 0b78a5c66116de0181cc645da264115b4b1ee79c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16582) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-22054) Using a shared watcher for ConfigMap watching
[ https://issues.apache.org/jira/browse/FLINK-22054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321916#comment-17321916 ] hayden zhou edited comment on FLINK-22054 at 4/15/21, 5:48 AM: --- [~trohrmann] [~yittg] I can't submit batch jobs more than almost 60, I submitted one job every 5 minutes if it is watching connection is exhausted. did the connection did not close when one batch job is closed? was (Author: hayden zhou): [~trohrmann] [~yittg] I can't submit batch jobs more than almost 60, I submitted one job every 5 minutes if it is watching connection is exhausted. did the connection did not close when on connection job is closed? > Using a shared watcher for ConfigMap watching > - > > Key: FLINK-22054 > URL: https://issues.apache.org/jira/browse/FLINK-22054 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.2, 1.13.0 >Reporter: Yi Tang >Assignee: Yi Tang >Priority: Major > Labels: k8s-ha, pull-request-available > Fix For: 1.14.0 > > > While using K8s HA service, the watching for ConfigMap is separate for each > job. As the number of running jobs increases, this consumes a large amount of > connections. > Here we proposal to use a shard watcher for each FlinkKubeClient, and > dispatch events to different listeners. At the same time, we should keep the > same semantic with watching separately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15438: [FLINK-22000] Set a default character set in InputStreamReader to sol…
flinkbot edited a comment on pull request #15438: URL: https://github.com/apache/flink/pull/15438#issuecomment-810469495 ## CI report: * d16ff5596d58f30aa2752007efc8f7908059596d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16353) * 6459e87724e47a8c6cf7c0478f498c48d860c809 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)
[ https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321917#comment-17321917 ] xx chai commented on FLINK-22281: - Yes, I have added this parameter correctly > flink sql consumer kakfa canal-json message then sum(amount) > -- > > Key: FLINK-22281 > URL: https://issues.apache.org/jira/browse/FLINK-22281 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: flink 1.12 local >Reporter: xx chai >Priority: Major > Attachments: screenshot-1.png > > > I use flink sql to consumer kafka canal-json message the sql is > CREATE TABLE kafka_mall_order_info ( > id int, > amount double, >PRIMARY KEY ( id) NOT ENFORCED >) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_yx-dc-3-102_3306', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'kafka_to_hive', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'canal-json'); > create table t2 (amount double) with ('connector' = 'print'); > > > insert into t2 select sum(amount) from kafka_mall_order_info ; > but the result is not i think > the result in image -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22054) Using a shared watcher for ConfigMap watching
[ https://issues.apache.org/jira/browse/FLINK-22054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321916#comment-17321916 ] hayden zhou commented on FLINK-22054: - [~trohrmann] [~yittg] I can't submit batch jobs more than almost 60, I submitted one job every 5 minutes if it is watching connection is exhausted. did the connection did not close when on connection job is closed? > Using a shared watcher for ConfigMap watching > - > > Key: FLINK-22054 > URL: https://issues.apache.org/jira/browse/FLINK-22054 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.2, 1.13.0 >Reporter: Yi Tang >Assignee: Yi Tang >Priority: Major > Labels: k8s-ha, pull-request-available > Fix For: 1.14.0 > > > While using K8s HA service, the watching for ConfigMap is separate for each > job. As the number of running jobs increases, this consumes a large amount of > connections. > Here we proposal to use a shard watcher for each FlinkKubeClient, and > dispatch events to different listeners. At the same time, we should keep the > same semantic with watching separately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)
[ https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321914#comment-17321914 ] Jark Wu commented on FLINK-22281: - [~chaixiaoxue] you should add the configuration by {{TableConfig#getConfiguration#set(...)}}. > flink sql consumer kakfa canal-json message then sum(amount) > -- > > Key: FLINK-22281 > URL: https://issues.apache.org/jira/browse/FLINK-22281 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: flink 1.12 local >Reporter: xx chai >Priority: Major > Attachments: screenshot-1.png > > > I use flink sql to consumer kafka canal-json message the sql is > CREATE TABLE kafka_mall_order_info ( > id int, > amount double, >PRIMARY KEY ( id) NOT ENFORCED >) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_yx-dc-3-102_3306', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'kafka_to_hive', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'canal-json'); > create table t2 (amount double) with ('connector' = 'print'); > > > insert into t2 select sum(amount) from kafka_mall_order_info ; > but the result is not i think > the result in image -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15595: [FLINK-22171][sql-client][docs] Update the SQL Client doc
flinkbot edited a comment on pull request #15595: URL: https://github.com/apache/flink/pull/15595#issuecomment-818681408 ## CI report: * 6e06c0cb3d52f23f31ab7855880da5177f7d1627 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16532) * b7a82f07683d5a76f6743d68cb4cd174c0453bca UNKNOWN * 0b78a5c66116de0181cc645da264115b4b1ee79c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kezhuw commented on a change in pull request #15605: [FLINK-21996][coordination] - Part 3&4: Ensure OperatorEvent transport losses are handled
kezhuw commented on a change in pull request #15605: URL: https://github.com/apache/flink/pull/15605#discussion_r613758218 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java ## @@ -64,6 +69,17 @@ subtaskAccess.createEventSendAction(serializedEvent); final CompletableFuture result = new CompletableFuture<>(); +result.whenCompleteAsync( Review comment: It takes me some time to conclude this, I hope I was not wrong at somewhere(forgive me then please ). It is just hard to evaluate all things with multiple futures. But thanks to all existing works, almost every things are single threaded now, it is much better than asynchronous counterparts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] kezhuw commented on a change in pull request #15605: [FLINK-21996][coordination] - Part 3&4: Ensure OperatorEvent transport losses are handled
kezhuw commented on a change in pull request #15605: URL: https://github.com/apache/flink/pull/15605#discussion_r613755154 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java ## @@ -64,6 +69,17 @@ subtaskAccess.createEventSendAction(serializedEvent); final CompletableFuture result = new CompletableFuture<>(); +result.whenCompleteAsync( Review comment: I guess I see the correctness in my brain modeling. * `SubtaskGateway` is eagerly binding to `Execution` so that there will be no mis-targeting events. * Both checkpoint-completion and event sending are sending ordered to main thread for execution. So, rendezvous in checkpointing from user side is preserved in holder side. This guarantees that "the events that are sent later cannot overtake that action". * `createEventSendAction` constructs a lazy sending action to execute in main thread. This sending action will inspect execution state(`Execution.state` or `IncompleteFuturesTracker.failureCause`) to **fail immediately to not become pending**. It might be nice(eg. correctness evaluation) in design part, but I would say it is pretty un-straightforward in implementation, at least for me . Here is what I considered as un-straightforward: * `Execution.state` inspection in sending action is pretty important here. As it contribute to fail immediately. Normally, it should not. That say, a delayed failure should not contribute to correctness. I could not even know whether `taskExecution.getState()` is a purposed duplication for similar inspection in `sendOperatorEvent`. * Base on above(eg. delayed failure), `taskExecution.getTerminalStateFuture().thenAccept()` becomes crucial. It is not a safeguard to "speed up things", but a safeguard to correctness now. Without this, a delayed failure will contribute to next checkpoint. I think the key part is that `OperatorEventValve` is neutral to(eg. has no knowledge of) `Execution`, so some work has to be done outside it to avoid dated sending action contributing to incomplete futures which are inspected in next checkpoint. I have no strong objection to go current approach, but I would suggest to adjust docs a bit in `ExecutionSubtaskAccess` and/or other places to emphasize that this immediately failure is crucial to correctness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15595: [FLINK-22171][sql-client][docs] Update the SQL Client doc
flinkbot edited a comment on pull request #15595: URL: https://github.com/apache/flink/pull/15595#issuecomment-818681408 ## CI report: * 6e06c0cb3d52f23f31ab7855880da5177f7d1627 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16532) * b7a82f07683d5a76f6743d68cb4cd174c0453bca UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15546: [FLINK-21346][build system] Adding timeout to all tasks
flinkbot edited a comment on pull request #15546: URL: https://github.com/apache/flink/pull/15546#issuecomment-816538766 ## CI report: * e16ddc3fb9157cc0b70ea07023202a2ebdd9fe15 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16569) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15549: [FLINK-20761][hive] Escape the location path when creating input splits
flinkbot edited a comment on pull request #15549: URL: https://github.com/apache/flink/pull/15549#issuecomment-816597084 ## CI report: * 6bc0d4aa007b6fe629822142a3e6eb1d6c6fe7d9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16293) * 387e94130b66c6fb2dd5ac36133b2c317d83060e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16581) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22280) Add XML output format for Flink Table
[ https://issues.apache.org/jira/browse/FLINK-22280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321901#comment-17321901 ] Kurt Young commented on FLINK-22280: I also think this would be helpful. [~flacombe] > Add XML output format for Flink Table > - > > Key: FLINK-22280 > URL: https://issues.apache.org/jira/browse/FLINK-22280 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.12.1 >Reporter: François Lacombe >Priority: Major > > Dear maintainers > I'm looking for the ability to output xml files from Flink Table API, just > like csv and json already supported formats. > To me, a new format could be required to make the appropriate serialization. > Am I missing any existing feature (or duplicate issue) that could allow it > without a dedicated format? > Depending on your returns and if it makes sense, I could get involved in > writing the appropriate format based on the same logic as > `JsonRowDataSerializationSchema`. > Best regards -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15605: [FLINK-21996][coordination] - Part 3&4: Ensure OperatorEvent transport losses are handled
flinkbot edited a comment on pull request #15605: URL: https://github.com/apache/flink/pull/15605#issuecomment-819212302 ## CI report: * e9f7f5526078a9ed65d6403eab5cde7fc82b177b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16570) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15607: [FLINK-20779][python][docs] Add documentation for row-based operation in Python Table API
flinkbot edited a comment on pull request #15607: URL: https://github.com/apache/flink/pull/15607#issuecomment-819349925 ## CI report: * 58d729cf2d2ecb4239cd3a2ee380e64f484e7df0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16512) * 8a62da0c014fc75b3bad145c2fd46fc1809a9c28 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16580) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15549: [FLINK-20761][hive] Escape the location path when creating input splits
flinkbot edited a comment on pull request #15549: URL: https://github.com/apache/flink/pull/15549#issuecomment-816597084 ## CI report: * 6bc0d4aa007b6fe629822142a3e6eb1d6c6fe7d9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16293) * 387e94130b66c6fb2dd5ac36133b2c317d83060e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lirui-apache commented on a change in pull request #15549: [FLINK-20761][hive] Escape the location path when creating input splits
lirui-apache commented on a change in pull request #15549: URL: https://github.com/apache/flink/pull/15549#discussion_r613742399 ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java ## @@ -442,6 +448,41 @@ public void testDynamicPartWithOrderBy() throws Exception { } } +@Test +public void testLocationWithComma() throws Exception { +TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); +File location = tempFolder.newFolder(",tbl1,location,"); +try { +// test table location +tableEnv.executeSql( +String.format( +"create table tbl1 (x int) location '%s'", location.getAbsolutePath())); +tableEnv.executeSql("insert into tbl1 values (1),(2)").await(); +List results = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select * from tbl1").collect()); +assertEquals("[+I[1], +I[2]]", results.toString()); +// test partition location +tableEnv.executeSql("create table tbl2 (x int) partitioned by (p string)"); +location = tempFolder.newFolder(","); +tableEnv.executeSql( +String.format( +"alter table tbl2 add partition (p='a') location '%s'", Review comment: It's the location path that causes the issue. But I added a test case where both partition value and path contain comma anyways. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22262) Flink on Kubernetes ConfigMaps are created without OwnerReference
[ https://issues.apache.org/jira/browse/FLINK-22262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321885#comment-17321885 ] Yang Wang commented on FLINK-22262: --- Could you please share the JobManager logs when you cancel the Flink job successfully and still have the residual ConfigMaps? I think you could use {{kubectl logs podname}} to get the logs. I have used the following steps to start/stop Flink applications on K8s with HA enable in my minikube. And it works well. 1. Start the native Flink K8a application {code:java} $FLINK_HOME/bin/flink run-application -d -t kubernetes-application \ -Dkubernetes.cluster-id=$CLUSTER_ID \ -Dkubernetes.namespace=$NAMESPACE \ -Dkubernetes.container.image=wangyang09180523/flink:1.13.0-rc0 \ -Dkubernetes.container.image.pull-policy=Always \ -Dkubernetes.rest-service.exposed.type=NodePort \ -Dkubernetes.jobmanager.cpu=0.5 -Djobmanager.memory.process.size=1700m \ -Dkubernetes.jobmanager.service-account=default \ -Dkubernetes.taskmanager.cpu=0.5 -Dtaskmanager.memory.process.size=1500m -Dtaskmanager.numberOfTaskSlots=4 \ -Dstate.checkpoints.dir=$HA_STORAGE \ -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \ -Dhigh-availability.storageDir=$HA_STORAGE \ -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=1 \ -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.13.0.jar -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.13.0.jar \ -Dstate.savepoints.dir=$HA_STORAGE \ local:///opt/flink/examples/streaming/StateMachineExample.jar {code} 2. Cancel the Flink job with savepoint. All the K8s resources will be deleted. I do not find residual HA ConfigMaps after canceled successfully. {code:java} ./bin/flink cancel --target kubernetes-application --withSavepoint -Dkubernetes.cluster-id=k8s-app-ha-1-113-rc1 -Dkubernetes.namespace=default ... ... Cancelled job . Savepoint stored in oss://flink-debug-yiqi/flink-ha/savepoint-00-8741523cb1d1. {code} 3. Maybe change the user codes and resubmit the Flink application with stored savepoint {code:java} $FLINK_HOME/bin/flink run-application -d -t kubernetes-application \ --fromSavepoint oss://flink-debug-yiqi/flink-ha/savepoint-00-8741523cb1d1 \ ... ... local:///opt/flink/examples/streaming/StateMachineExample.jar{code} > Flink on Kubernetes ConfigMaps are created without OwnerReference > - > > Key: FLINK-22262 > URL: https://issues.apache.org/jira/browse/FLINK-22262 > Project: Flink > Issue Type: Bug >Affects Versions: 1.13.0 >Reporter: Andrea Peruffo >Priority: Major > > According to the documentation: > [https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#manual-resource-cleanup] > The ConfigMaps created along with the Flink deployment is supposed to have an > OwnerReference pointing to the Deployment itself, unfortunately, this doesn't > happen and causes all sorts of issues when the classpath and the jars of the > job are updated. > i.e.: > Without manually removing the ConfigMap of the Job I cannot update the Jars > of the Job. > Can you please give guidance if there are additional caveats on manually > removing the ConfigMap? Any other workaround that can be used? > Thanks in advance. > Example ConfigMap: > {{apiVersion: v1}} > {{data:}} > {{ address: akka.tcp://flink@10.0.2.13:6123/user/rpc/jobmanager_2}} > {{ checkpointID-049: > rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAABOEtzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAUC9tbnQvZmxpbmsvc3RvcmFnZS9rc2hhL3RheGktcmlkZS1mYXJlLXByb2Nlc3Nvci9jb21wbGV0ZWRDaGVja3BvaW50MDQ0YTc2OWRkNDgxeA==}} > {{ counter: "50"}} > {{ sessionId: 0c2b69ee-6b41-48d3-b7fd-1bf2eda94f0f}} > {{kind: ConfigMap}} > {{metadata:}} > {{ annotations:}} > {{ control-plane.alpha.kubernetes.io/leader: > '\{"holderIdentity":"0f25a2cc-e212-46b0-8ba9-faac0732a316","leaseDuration":15.0,"acquireTime":"2021-04-13T14:30:51.439000Z","renewTime":"2021-04-13T14:39:32.011000Z","leaderTransitions":105}'}} > {{ creationTimestamp: "2021-04-13T14:30:51Z"}} > {{ labels:}} > {{ app: taxi-ride-fare-processor}} > {{ configmap-type: high-availability}} > {{ type:
[jira] [Updated] (FLINK-22262) Flink on Kubernetes ConfigMaps are created without OwnerReference
[ https://issues.apache.org/jira/browse/FLINK-22262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang updated FLINK-22262: -- Component/s: Deployment / Kubernetes > Flink on Kubernetes ConfigMaps are created without OwnerReference > - > > Key: FLINK-22262 > URL: https://issues.apache.org/jira/browse/FLINK-22262 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.13.0 >Reporter: Andrea Peruffo >Priority: Major > > According to the documentation: > [https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#manual-resource-cleanup] > The ConfigMaps created along with the Flink deployment is supposed to have an > OwnerReference pointing to the Deployment itself, unfortunately, this doesn't > happen and causes all sorts of issues when the classpath and the jars of the > job are updated. > i.e.: > Without manually removing the ConfigMap of the Job I cannot update the Jars > of the Job. > Can you please give guidance if there are additional caveats on manually > removing the ConfigMap? Any other workaround that can be used? > Thanks in advance. > Example ConfigMap: > {{apiVersion: v1}} > {{data:}} > {{ address: akka.tcp://flink@10.0.2.13:6123/user/rpc/jobmanager_2}} > {{ checkpointID-049: > rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAABOEtzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAUC9tbnQvZmxpbmsvc3RvcmFnZS9rc2hhL3RheGktcmlkZS1mYXJlLXByb2Nlc3Nvci9jb21wbGV0ZWRDaGVja3BvaW50MDQ0YTc2OWRkNDgxeA==}} > {{ counter: "50"}} > {{ sessionId: 0c2b69ee-6b41-48d3-b7fd-1bf2eda94f0f}} > {{kind: ConfigMap}} > {{metadata:}} > {{ annotations:}} > {{ control-plane.alpha.kubernetes.io/leader: > '\{"holderIdentity":"0f25a2cc-e212-46b0-8ba9-faac0732a316","leaseDuration":15.0,"acquireTime":"2021-04-13T14:30:51.439000Z","renewTime":"2021-04-13T14:39:32.011000Z","leaderTransitions":105}'}} > {{ creationTimestamp: "2021-04-13T14:30:51Z"}} > {{ labels:}} > {{ app: taxi-ride-fare-processor}} > {{ configmap-type: high-availability}} > {{ type: flink-native-kubernetes}} > {{ name: > taxi-ride-fare-processor--jobmanager-leader}} > {{ namespace: taxi-ride-fare}} > {{ resourceVersion: "64100"}} > {{ selfLink: > /api/v1/namespaces/taxi-ride-fare/configmaps/taxi-ride-fare-processor--jobmanager-leader}} > {{ uid: 9f912495-382a-45de-a789-fd5ad2a2459d}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15607: [FLINK-20779][python][docs] Add documentation for row-based operation in Python Table API
flinkbot edited a comment on pull request #15607: URL: https://github.com/apache/flink/pull/15607#issuecomment-819349925 ## CI report: * 58d729cf2d2ecb4239cd3a2ee380e64f484e7df0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16512) * 8a62da0c014fc75b3bad145c2fd46fc1809a9c28 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15525: [FLINK-22154][table-planner-blink] Fix bug where PushFilterIntoTableSourceScanRule fails to deal with IN expressions
flinkbot edited a comment on pull request #15525: URL: https://github.com/apache/flink/pull/15525#issuecomment-815836876 ## CI report: * 00492ee3748f2830378d9f7c73afa8b6e3e48fcb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16504) * 98d1ce0ed7ea00fd3cc89854ae38e02522926cf4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16579) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15020: [FLINK-21445][kubernetes] Application mode does not set the configuration when building PackagedProgram
flinkbot edited a comment on pull request #15020: URL: https://github.com/apache/flink/pull/15020#issuecomment-785804760 ## CI report: * 4a4f8a643c603007519ebefc4c1e732347b6d3d2 UNKNOWN * 6ebcd56d7080224fcd21f0f22ee467a951feb2e0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16545) * de3bd6e46144fc21952f2ea1035d278a6dc1076f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16578) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14484: [FLINK-20722][hive] HiveTableSink should copy the record when convert…
flinkbot edited a comment on pull request #14484: URL: https://github.com/apache/flink/pull/14484#issuecomment-750730068 ## CI report: * adaac76c0ae0a62ca7f156340653128a43bf7b20 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12645) * af484144ed52eb8074b78710915bff0fb049dc97 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16577) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KurtYoung commented on a change in pull request #15603: [FLINK-22169][sql-client] Beautify the CliTableauResultView when print
KurtYoung commented on a change in pull request #15603: URL: https://github.com/apache/flink/pull/15603#discussion_r613735406 ## File path: flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java ## @@ -233,22 +233,7 @@ public void testCancelBatchResult() throws Exception { furture.get(5, TimeUnit.SECONDS); Assert.assertEquals( - "+-+-+--++++" -+ System.lineSeparator() -+ "| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |" -+ System.lineSeparator() -+ "+-+-+--++++" -+ System.lineSeparator() -+ "| (NULL) | 1 |2 | abc | 1.23 | 2020-03-01 18:39:14.0 |" -+ System.lineSeparator() -+ "| false | (NULL) |0 | | 1 | 2020-03-01 18:39:14.1 |" -+ System.lineSeparator() -+ "|true | 2147483647 | (NULL) | abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |" -+ System.lineSeparator() -+ "| false | -2147483648 | 9223372036854775807 | (NULL) |12345.06789 |2020-03-01 18:39:14.123 |" -+ System.lineSeparator() -+ "Query terminated, received a total of 4 rows" -+ System.lineSeparator(), +"Query terminated, received a total of 0 row" + System.lineSeparator(), Review comment: I think this is unexpected. The test is trying to verify that we can actually receive part of the data until users terminates the batch query. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KurtYoung commented on a change in pull request #15603: [FLINK-22169][sql-client] Beautify the CliTableauResultView when print
KurtYoung commented on a change in pull request #15603: URL: https://github.com/apache/flink/pull/15603#discussion_r613735406 ## File path: flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliTableauResultViewTest.java ## @@ -233,22 +233,7 @@ public void testCancelBatchResult() throws Exception { furture.get(5, TimeUnit.SECONDS); Assert.assertEquals( - "+-+-+--++++" -+ System.lineSeparator() -+ "| boolean | int | bigint | varchar | decimal(10, 5) | timestamp |" -+ System.lineSeparator() -+ "+-+-+--++++" -+ System.lineSeparator() -+ "| (NULL) | 1 |2 | abc | 1.23 | 2020-03-01 18:39:14.0 |" -+ System.lineSeparator() -+ "| false | (NULL) |0 | | 1 | 2020-03-01 18:39:14.1 |" -+ System.lineSeparator() -+ "|true | 2147483647 | (NULL) | abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |" -+ System.lineSeparator() -+ "| false | -2147483648 | 9223372036854775807 | (NULL) |12345.06789 |2020-03-01 18:39:14.123 |" -+ System.lineSeparator() -+ "Query terminated, received a total of 4 rows" -+ System.lineSeparator(), +"Query terminated, received a total of 0 row" + System.lineSeparator(), Review comment: I think this is unexpected. The test is trying to verify that we can actually receive part of the data until users terminate the batch query. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KurtYoung commented on a change in pull request #15549: [FLINK-20761][hive] Escape the location path when creating input splits
KurtYoung commented on a change in pull request #15549: URL: https://github.com/apache/flink/pull/15549#discussion_r613734122 ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java ## @@ -442,6 +448,41 @@ public void testDynamicPartWithOrderBy() throws Exception { } } +@Test +public void testLocationWithComma() throws Exception { +TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); +File location = tempFolder.newFolder(",tbl1,location,"); +try { +// test table location +tableEnv.executeSql( +String.format( +"create table tbl1 (x int) location '%s'", location.getAbsolutePath())); +tableEnv.executeSql("insert into tbl1 values (1),(2)").await(); +List results = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select * from tbl1").collect()); +assertEquals("[+I[1], +I[2]]", results.toString()); +// test partition location +tableEnv.executeSql("create table tbl2 (x int) partitioned by (p string)"); +location = tempFolder.newFolder(","); +tableEnv.executeSql( +String.format( +"alter table tbl2 add partition (p='a') location '%s'", Review comment: According to the jira description, I was assuming that you want to address both of these ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java ## @@ -442,6 +448,41 @@ public void testDynamicPartWithOrderBy() throws Exception { } } +@Test +public void testLocationWithComma() throws Exception { +TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); +File location = tempFolder.newFolder(",tbl1,location,"); +try { +// test table location +tableEnv.executeSql( +String.format( +"create table tbl1 (x int) location '%s'", location.getAbsolutePath())); +tableEnv.executeSql("insert into tbl1 values (1),(2)").await(); +List results = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select * from tbl1").collect()); +assertEquals("[+I[1], +I[2]]", results.toString()); +// test partition location +tableEnv.executeSql("create table tbl2 (x int) partitioned by (p string)"); +location = tempFolder.newFolder(","); +tableEnv.executeSql( +String.format( +"alter table tbl2 add partition (p='a') location '%s'", Review comment: According to the jira description, I was assuming that you want to address both of them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-22140) Test the unified binary savepoint
[ https://issues.apache.org/jira/browse/FLINK-22140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321879#comment-17321879 ] Yun Gao commented on FLINK-22140: - Tested the unified binary save-point with [an artificial job|https://github.com/gaoyunhaii/flink1.13test]: # The job has a source generate the tuple (index % numberOfKeys, index / numberOfKeys) for index in [0, numberOfRecords). The input is feed into operators with different type of keyed states. There are 5 types of states in total, namely value state, reducing state, aggregating state, list state and map state. # During the execution of the job, we execute stop with savepoint to create a savepoint. # Then we start another job with the savepoint to continue execution. The new job would continue to run until the expected number of records is emitted. Then the operators write their final state content into files, and the files would be compared with the expected content. We tested the cases that # Starts the first job with one statebackend in (HashMap, Rocksdb and Incremental Rocksdb), and start the second job with another statebackend. We tests all the 9 cases. # For the 9 cases in the first item, we start the second job with a larger parallelism. # For the 9 cases in the first item, we start the second job with a smaller parallelism. # For all the above three items, we change the key type and value type to customized user types. We verified that # The savepoint is taken successfully, and both jobs are finished normally. # The resulted statebackend content is as expected. # The checkpoints of the two jobs are successfully taken. # There is no unexpected behavior during the test process. The test result is good for all cases with only two minor issues: # In Web UI we do have show the configuration of statebackend type and storage type. It is not easy for user to verify which statebackend is using now. # Not print the stack trace if the checkpoints are failed due to not all tasks are running: https://issues.apache.org/jira/browse/FLINK-22117, otherwise the log would be overwhelming with this kind of exceptions. > Test the unified binary savepoint > - > > Key: FLINK-22140 > URL: https://issues.apache.org/jira/browse/FLINK-22140 > Project: Flink > Issue Type: Task > Components: Runtime / State Backends >Affects Versions: 1.13.0 >Reporter: Dawid Wysakowicz >Assignee: Yun Gao >Priority: Blocker > Labels: release-testing > Fix For: 1.13.0 > > > With https://issues.apache.org/jira/browse/FLINK-20976 we introduced a > unified binary savepoint format which should let you switch between different > state backends. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15618: [FLINK-22125][python] Returns None when a key doesn't exist when calling MapState.get()
flinkbot edited a comment on pull request #15618: URL: https://github.com/apache/flink/pull/15618#issuecomment-820024114 ## CI report: * a5030e29742e58a6df2dbca7d7e1ce27a90e8722 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16576) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15525: [FLINK-22154][table-planner-blink] Fix bug where PushFilterIntoTableSourceScanRule fails to deal with IN expressions
flinkbot edited a comment on pull request #15525: URL: https://github.com/apache/flink/pull/15525#issuecomment-815836876 ## CI report: * 00492ee3748f2830378d9f7c73afa8b6e3e48fcb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16504) * 98d1ce0ed7ea00fd3cc89854ae38e02522926cf4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-22113) UniqueKey constraint is lost with multiple sources join in SQL
[ https://issues.apache.org/jira/browse/FLINK-22113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321868#comment-17321868 ] Xu Guangheng edited comment on FLINK-22113 at 4/15/21, 3:25 AM: Hello [~jark], Thanks for you reply. Yes, {{FlinkRelMdColumnUniqueness}} has implemented {{areColumnsUnique}} for TableScan node, but there is a {{TODO}} inside the implementation. It seems the implementation is not complete. Can you assign the issue to me, I can try to fix it. was (Author: xuguangheng): Hello [~jark], Thanks for you reply. Yes, FlinkRelMdColumnUniqueness has implemented areColumnsUnique for TableScan node, but there is a TODO inside the implementation. It seems the implementation is not complete. Can you assign the issue to me, I can try to fix it. > UniqueKey constraint is lost with multiple sources join in SQL > -- > > Key: FLINK-22113 > URL: https://issues.apache.org/jira/browse/FLINK-22113 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.0 >Reporter: Fu Kai >Priority: Major > Fix For: 1.14.0 > > > Hi team, > > We have a use case to join multiple data sources to generate a continuous > updated view. We defined primary key constraint on all the input sources and > all the keys are the subsets in the join condition. All joins are left join. > > In our case, the first two inputs can produce *JoinKeyContainsUniqueKey* > input sepc, which is good and performant. While when it comes to the third > input source, it's joined with the intermediate output table of the first two > input tables, and the intermediate table does not carry key constraint > information(although the thrid source input table does), so it results in a > *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance > implications per the[ Force Join Unique > Key|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651] > email thread, we want to know if there is any mitigation solution for this. > > Example: > Take the example from > [https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md] > {code:java} > CREATE TEMPORARY TABLE passengers ( > passenger_key STRING, > first_name STRING, > last_name STRING, > update_time TIMESTAMP(3), > PRIMARY KEY (passenger_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'passengers', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE stations ( > station_key STRING, > update_time TIMESTAMP(3), > city STRING, > PRIMARY KEY (station_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'stations', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE booking_channels ( > booking_channel_key STRING, > update_time TIMESTAMP(3), > channel STRING, > PRIMARY KEY (booking_channel_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'booking_channels', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE train_activities ( > scheduled_departure_time TIMESTAMP(3), > actual_departure_date TIMESTAMP(3), > passenger_key STRING, > origin_station_key STRING, > destination_station_key STRING, > booking_channel_key STRING, > PRIMARY KEY (booking_channel_key, origin_station_key, > destination_station_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'train_activities', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'json', > 'value.format' = 'json' > ); > SELECT > t.actual_departure_date, > p.first_name, > p.last_name, > b.channel, > os.city AS origin_station, > ds.city AS destination_station > FROM train_activities_1 t > LEFT JOIN booking_channels b > ON t.booking_channel_key = b.booking_channel_key > LEFT JOIN passengers p > ON t.passenger_key = p.passenger_key > LEFT JOIN stations os > ON t.origin_station_key = os.station_key > LEFT JOIN stations ds > ON t.destination_station_key = ds.station_key > {code} > > The query will generate exeuction plan of: > > {code:java} > Flink SQL> explain > > SELECT > >t.actual_departure_date, > >p.first_name, > >p.last_name, > >b.channel, > >os.city AS origin_station, > >ds.city AS destination_station > > FROM train_activities_1 t > > LEFT JOIN booking_channels b > > ON t.booking_channel_key = b.booking_channel_key > > LEFT JOIN passengers p > > ON t.passenger_key =
[jira] [Commented] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)
[ https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321878#comment-17321878 ] xx chai commented on FLINK-22281: - thanks [~jark] I solve the question . > flink sql consumer kakfa canal-json message then sum(amount) > -- > > Key: FLINK-22281 > URL: https://issues.apache.org/jira/browse/FLINK-22281 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: flink 1.12 local >Reporter: xx chai >Priority: Major > Attachments: screenshot-1.png > > > I use flink sql to consumer kafka canal-json message the sql is > CREATE TABLE kafka_mall_order_info ( > id int, > amount double, >PRIMARY KEY ( id) NOT ENFORCED >) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_yx-dc-3-102_3306', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'kafka_to_hive', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'canal-json'); > create table t2 (amount double) with ('connector' = 'print'); > > > insert into t2 select sum(amount) from kafka_mall_order_info ; > but the result is not i think > the result in image -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #14484: [FLINK-20722][hive] HiveTableSink should copy the record when convert…
flinkbot edited a comment on pull request #14484: URL: https://github.com/apache/flink/pull/14484#issuecomment-750730068 ## CI report: * adaac76c0ae0a62ca7f156340653128a43bf7b20 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12645) * af484144ed52eb8074b78710915bff0fb049dc97 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-22275) Datagen add a max lag option for a series of timestamp-related types
[ https://issues.apache.org/jira/browse/FLINK-22275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321872#comment-17321872 ] Yi Tang edited comment on FLINK-22275 at 4/15/21, 3:23 AM: --- Of course. How about introduce a {{fields.#.max-lag}} integer option with default value as {{0}} (expected to nonnegative)? It is obvious that this option can only be applied to {{random}} kind and for {{TimestampType}} / {{ZonedTimestampType}} / {{LocalZonedTimestampType}} types which all return the current timestamp for now. If one user configured the max-lag, then the generated timestamp will be {{currentTimeMillis - random(0, max-lag)}} instead of the original {{currentTimeMillis}} only. For example fields.atime.max-lag = 5000, then the generated timestamp maybe consists of (one per second) |timestamp|1618455622000|1618455623000|1618455624000|1618455625000| |lag|0|1000|3000|2000| |generated|1618455622000|1618455622000|1618455621000|1618455623000| was (Author: yittg): Of course. How about introduce a {{fields.#.max-lag}} integer option with default value as {{0}} (expected to nonnegative)? It is obvious that this option can only be applied to {{random}} kind and for {{TimestampType}} / {{ZonedTimestampType}} / {{LocalZonedTimestampType}} types which all return the current timestamp for now. If one user configured the max-lag, then the generated timestamp will be {{currentTimeMillis - random(0, max-lag)}} instead of the original {{currentTimeMillis}} only. For example fields.atime.max-lag = 5000, then the generated timestamp maybe consists of (one per second) |timestamp|1618455622000|1618455623000|1618455624000|1618455625000| |delta|0|1000|3000|2000| |generated|1618455622000|1618455622000|1618455621000|1618455623000| > Datagen add a max lag option for a series of timestamp-related types > > > Key: FLINK-22275 > URL: https://issues.apache.org/jira/browse/FLINK-22275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem >Reporter: Yi Tang >Priority: Minor > > For now, the {{datagen}} connector always resolves to the current timestamp > for timestamp-related types. > Here proposals to add a max lag option for these types, which will generate a > timestamp with a random lag (with max lag as option) compared with the > current timestamp. Leave it as before if the option is not configured. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-22275) Datagen add a max lag option for a series of timestamp-related types
[ https://issues.apache.org/jira/browse/FLINK-22275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321872#comment-17321872 ] Yi Tang edited comment on FLINK-22275 at 4/15/21, 3:22 AM: --- Of course. How about introduce a {{fields.#.max-lag}} integer option with default value as {{0}} (expected to nonnegative)? It is obvious that this option can only be applied to {{random}} kind and for {{TimestampType}} / {{ZonedTimestampType}} / {{LocalZonedTimestampType}} types which all return the current timestamp for now. If one user configured the max-lag, then the generated timestamp will be {{currentTimeMillis - random(0, max-lag)}} instead of the original {{currentTimeMillis}} only. For example fields.atime.max-lag = 5000, then the generated timestamp maybe consists of (one per second) |timestamp|1618455622000|1618455623000|1618455624000|1618455625000| |delta|0|1000|3000|2000| |generated|1618455622000|1618455622000|1618455621000|1618455623000| was (Author: yittg): Of course. First of all, it is obvious that this option can only be applied to \{{ random }} kind. How about introduce a {{fields.#.max-lag}} integer option with default value as {{0}} (expected to nonnegative)? The option can be applied for {{TimestampType}} / {{ZonedTimestampType}} / {{LocalZonedTimestampType}} types which all return the current timestamp for now. If one user configured the max-lag, then the generated timestamp will be {{currentTimeMillis - random(0, max-lag)}} instead of the original {{currentTimeMillis}} only. For example fields.atime.max-lag = 5000, then the generated timestamp maybe consists of (one per second) |timestamp|1618455622000|1618455623000|1618455624000|1618455625000| |delta|0|1000|3000|2000| |generated|1618455622000|1618455622000|1618455621000|1618455623000| > Datagen add a max lag option for a series of timestamp-related types > > > Key: FLINK-22275 > URL: https://issues.apache.org/jira/browse/FLINK-22275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem >Reporter: Yi Tang >Priority: Minor > > For now, the {{datagen}} connector always resolves to the current timestamp > for timestamp-related types. > Here proposals to add a max lag option for these types, which will generate a > timestamp with a random lag (with max lag as option) compared with the > current timestamp. Leave it as before if the option is not configured. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-6113) Implement split/select with Side Outputs
[ https://issues.apache.org/jira/browse/FLINK-6113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321877#comment-17321877 ] Chen Qin commented on FLINK-6113: - Seems community already get rid of split and select transformation in master branch. So this Jira seems no longer make sense. attaching patch for curious minds. [^split_select.patch] > Implement split/select with Side Outputs > > > Key: FLINK-6113 > URL: https://issues.apache.org/jira/browse/FLINK-6113 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.3.0 >Reporter: Chen Qin >Priority: Minor > Labels: stale-minor > Attachments: split_select.patch > > > With completion of FLINK-4460(side outputs), this is one of follow up item > towards deprecate string tag based split/select with OutputTag based > split/select. > In Flink 2.0, we might consider eventually deprecate split/select -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-6113) Implement split/select with Side Outputs
[ https://issues.apache.org/jira/browse/FLINK-6113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin closed FLINK-6113. --- Resolution: Fixed > Implement split/select with Side Outputs > > > Key: FLINK-6113 > URL: https://issues.apache.org/jira/browse/FLINK-6113 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.3.0 >Reporter: Chen Qin >Priority: Minor > Labels: stale-minor > Attachments: split_select.patch > > > With completion of FLINK-4460(side outputs), this is one of follow up item > towards deprecate string tag based split/select with OutputTag based > split/select. > In Flink 2.0, we might consider eventually deprecate split/select -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-6113) Implement split/select with Side Outputs
[ https://issues.apache.org/jira/browse/FLINK-6113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-6113: Attachment: split_select.patch > Implement split/select with Side Outputs > > > Key: FLINK-6113 > URL: https://issues.apache.org/jira/browse/FLINK-6113 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.3.0 >Reporter: Chen Qin >Priority: Minor > Labels: stale-minor > Attachments: split_select.patch > > > With completion of FLINK-4460(side outputs), this is one of follow up item > towards deprecate string tag based split/select with OutputTag based > split/select. > In Flink 2.0, we might consider eventually deprecate split/select -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22284) Null address will be logged when channel is closed in NettyPartitionRequestClient
[ https://issues.apache.org/jira/browse/FLINK-22284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhilong Hong updated FLINK-22284: - Attachment: exception.png > Null address will be logged when channel is closed in > NettyPartitionRequestClient > - > > Key: FLINK-22284 > URL: https://issues.apache.org/jira/browse/FLINK-22284 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.13.0 >Reporter: Zhilong Hong >Priority: Minor > Fix For: 1.13.0 > > Attachments: exception.png > > > In NettyPartitionRequestClient#requestSubpartition, when a channel is closed, > the channel will throw a LocalTransportException with the error message > "Sending the partition request to 'null' failed.". The message is confusing > since we wouldn't know where the remote client connected to this channel > locates, and we couldn't track down to that TaskExecutor and find out what > happened. > > Also I'm wondering that should we use TransportException instead of > LocalTransportException here, because it's a little confusing to see a > LocalTransportException is thrown out when a remote channel is closed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22284) Null address will be logged when channel is closed in NettyPartitionRequestClient
[ https://issues.apache.org/jira/browse/FLINK-22284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhilong Hong updated FLINK-22284: - Description: In NettyPartitionRequestClient#requestSubpartition, when a channel is closed, the channel will throw a LocalTransportException with the error message "Sending the partition request to 'null' failed.". The message is confusing since we wouldn't know where the remote client connected to this channel locates, and we couldn't track down to that TaskExecutor and find out what happened. Also I'm wondering that should we use TransportException instead of LocalTransportException here, because it's a little confusing to see a LocalTransportException is thrown out when a remote channel is closed. !exception.png! was: In NettyPartitionRequestClient#requestSubpartition, when a channel is closed, the channel will throw a LocalTransportException with the error message "Sending the partition request to 'null' failed.". The message is confusing since we wouldn't know where the remote client connected to this channel locates, and we couldn't track down to that TaskExecutor and find out what happened. Also I'm wondering that should we use TransportException instead of LocalTransportException here, because it's a little confusing to see a LocalTransportException is thrown out when a remote channel is closed. > Null address will be logged when channel is closed in > NettyPartitionRequestClient > - > > Key: FLINK-22284 > URL: https://issues.apache.org/jira/browse/FLINK-22284 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.13.0 >Reporter: Zhilong Hong >Priority: Minor > Fix For: 1.13.0 > > Attachments: exception.png > > > In NettyPartitionRequestClient#requestSubpartition, when a channel is closed, > the channel will throw a LocalTransportException with the error message > "Sending the partition request to 'null' failed.". The message is confusing > since we wouldn't know where the remote client connected to this channel > locates, and we couldn't track down to that TaskExecutor and find out what > happened. > > Also I'm wondering that should we use TransportException instead of > LocalTransportException here, because it's a little confusing to see a > LocalTransportException is thrown out when a remote channel is closed. > > !exception.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] kezhuw commented on a change in pull request #15601: [FLINK-18071][FLINK-21996][coordination] - Part two: Ensure reliable OperatorEvent to running Task matching
kezhuw commented on a change in pull request #15601: URL: https://github.com/apache/flink/pull/15601#discussion_r613727138 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java ## @@ -285,41 +348,130 @@ public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exc String.format("Don't recognize event '%s' from task %d.", event, subtask)); } -if (periodicTask != null) { -throw new Exception("periodic already running"); +synchronized (recoveredTaskRunning) { +// signal the previous task that its recovered task is now running +final CompletableFuture prevTaskFuture = recoveredTaskRunning.peekLast(); +if (prevTaskFuture != null) { +prevTaskFuture.complete(null); +} +// add a future for this task +recoveredTaskRunning.addLast(new CompletableFuture<>()); } -periodicTask = -executor.scheduleWithFixedDelay(this, delay, delay, TimeUnit.MILLISECONDS); + +// first, we hand this over to the mailbox thread, so we preserve order on operations, +// even if the action is only to do a thread safe scheduling into the scheduledExecutor +runInMailbox( +() -> { +checkState(!workLoopRunning); +checkState(subtaskGateway != null); + +workLoopRunning = true; +scheduleSingleAction(); +}); } @Override public void subtaskFailed(int subtask, @Nullable Throwable reason) { Review comment: It should require some minor changes to `subtaskReset`, `notifyCheckpointComplete` and `notifyCheckpointAborted`. Personally, I think `SubtaskGateway` is more vital to subtask failure than job failure. For solely job failure, recreating a brand new coordinator on `resetToCheckpoint` should pass this test also. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-22284) Null address will be logged when channel is closed in NettyPartitionRequestClient
[ https://issues.apache.org/jira/browse/FLINK-22284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhilong Hong updated FLINK-22284: - Description: In NettyPartitionRequestClient#requestSubpartition, when a channel is closed, the channel will throw a LocalTransportException with the error message "Sending the partition request to 'null' failed.". The message is confusing since we wouldn't know where the remote client connected to this channel locates, and we couldn't track down to that TaskExecutor and find out what happened. Also I'm wondering that should we use TransportException instead of LocalTransportException here, because it's a little confusing to see a LocalTransportException is thrown out when a remote channel is closed. was: In NettyPartitionRequestClient#requestSubpartition, when channel is closed, the channel will throw a LocalTransportException with the error message "Sending the partition request to 'null' failed.". The message is confusing since we wouldn't know where the remote client connected to this channel locates, and we couldn't track down to that TaskExecutor and find out what happened. Also I'm wondering that should we use TransportException instead of LocalTransportException here, because it's a little confusing to see a LocalTransportException is thrown out when a remote channel is closed. > Null address will be logged when channel is closed in > NettyPartitionRequestClient > - > > Key: FLINK-22284 > URL: https://issues.apache.org/jira/browse/FLINK-22284 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.13.0 >Reporter: Zhilong Hong >Priority: Minor > Fix For: 1.13.0 > > > In NettyPartitionRequestClient#requestSubpartition, when a channel is closed, > the channel will throw a LocalTransportException with the error message > "Sending the partition request to 'null' failed.". The message is confusing > since we wouldn't know where the remote client connected to this channel > locates, and we couldn't track down to that TaskExecutor and find out what > happened. > > Also I'm wondering that should we use TransportException instead of > LocalTransportException here, because it's a little confusing to see a > LocalTransportException is thrown out when a remote channel is closed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #15618: [FLINK-22125][python] Returns None when a key doesn't exist when calling MapState.get()
flinkbot commented on pull request #15618: URL: https://github.com/apache/flink/pull/15618#issuecomment-820024114 ## CI report: * a5030e29742e58a6df2dbca7d7e1ce27a90e8722 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-22284) Null address will be logged when channel is closed in NettyPartitionRequestClient
Zhilong Hong created FLINK-22284: Summary: Null address will be logged when channel is closed in NettyPartitionRequestClient Key: FLINK-22284 URL: https://issues.apache.org/jira/browse/FLINK-22284 Project: Flink Issue Type: Improvement Components: Runtime / Network Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 In NettyPartitionRequestClient#requestSubpartition, when channel is closed, the channel will throw a LocalTransportException with the error message "Sending the partition request to 'null' failed.". The message is confusing since we wouldn't know where the remote client connected to this channel locates, and we couldn't track down to that TaskExecutor and find out what happened. Also I'm wondering that should we use TransportException instead of LocalTransportException here, because it's a little confusing to see a LocalTransportException is thrown out when a remote channel is closed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22275) Datagen add a max lag option for a series of timestamp-related types
[ https://issues.apache.org/jira/browse/FLINK-22275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321872#comment-17321872 ] Yi Tang commented on FLINK-22275: - Of course. First of all, it is obvious that this option can only be applied to \{{ random }} kind. How about introduce a {{fields.#.max-lag}} integer option with default value as {{0}} (expected to nonnegative)? The option can be applied for {{TimestampType}} / {{ZonedTimestampType}} / {{LocalZonedTimestampType}} types which all return the current timestamp for now. If one user configured the max-lag, then the generated timestamp will be {{currentTimeMillis - random(0, max-lag)}} instead of the original {{currentTimeMillis}} only. For example fields.atime.max-lag = 5000, then the generated timestamp maybe consists of (one per second) |timestamp|1618455622000|1618455623000|1618455624000|1618455625000| |delta|0|1000|3000|2000| |generated|1618455622000|1618455622000|1618455621000|1618455623000| > Datagen add a max lag option for a series of timestamp-related types > > > Key: FLINK-22275 > URL: https://issues.apache.org/jira/browse/FLINK-22275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem >Reporter: Yi Tang >Priority: Minor > > For now, the {{datagen}} connector always resolves to the current timestamp > for timestamp-related types. > Here proposals to add a max lag option for these types, which will generate a > timestamp with a random lag (with max lag as option) compared with the > current timestamp. Leave it as before if the option is not configured. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15020: [FLINK-21445][kubernetes] Application mode does not set the configuration when building PackagedProgram
flinkbot edited a comment on pull request #15020: URL: https://github.com/apache/flink/pull/15020#issuecomment-785804760 ## CI report: * 4a4f8a643c603007519ebefc4c1e732347b6d3d2 UNKNOWN * 6ebcd56d7080224fcd21f0f22ee467a951feb2e0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=16545) * de3bd6e46144fc21952f2ea1035d278a6dc1076f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-22283) GraphiteReporter Metrics named xxx alredy exists
HideOnBush created FLINK-22283: -- Summary: GraphiteReporter Metrics named xxx alredy exists Key: FLINK-22283 URL: https://issues.apache.org/jira/browse/FLINK-22283 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.9.0 Reporter: HideOnBush When I was using Flink 1.9 to monitor GraphiteReporter, an error was reported, which caused the relevant indicators of my taskmanager to not be collected. Error message: a metric named xxx alredy exists -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-22113) UniqueKey constraint is lost with multiple sources join in SQL
[ https://issues.apache.org/jira/browse/FLINK-22113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321868#comment-17321868 ] Xu Guangheng edited comment on FLINK-22113 at 4/15/21, 3:00 AM: Hello [~jark], Thanks for you reply. Yes, FlinkRelMdColumnUniqueness has implemented areColumnsUnique for TableScan node, but there is a TODO inside the implementation. It seems the implementation is not complete. Can you assign the issue to me, I can try to fix it. was (Author: xuguangheng): Hello [~jark], Thanks for you reply. Yes, FlinkRelMdColumnUniqueness has implemented areColumnsUnique for TableScan node, but there is a TODO inside the implementation. It seems the implementation is not complete. Can you assign the issue to me, I cam try to fix it. > UniqueKey constraint is lost with multiple sources join in SQL > -- > > Key: FLINK-22113 > URL: https://issues.apache.org/jira/browse/FLINK-22113 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.0 >Reporter: Fu Kai >Priority: Major > Fix For: 1.14.0 > > > Hi team, > > We have a use case to join multiple data sources to generate a continuous > updated view. We defined primary key constraint on all the input sources and > all the keys are the subsets in the join condition. All joins are left join. > > In our case, the first two inputs can produce *JoinKeyContainsUniqueKey* > input sepc, which is good and performant. While when it comes to the third > input source, it's joined with the intermediate output table of the first two > input tables, and the intermediate table does not carry key constraint > information(although the thrid source input table does), so it results in a > *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance > implications per the[ Force Join Unique > Key|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651] > email thread, we want to know if there is any mitigation solution for this. > > Example: > Take the example from > [https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md] > {code:java} > CREATE TEMPORARY TABLE passengers ( > passenger_key STRING, > first_name STRING, > last_name STRING, > update_time TIMESTAMP(3), > PRIMARY KEY (passenger_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'passengers', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE stations ( > station_key STRING, > update_time TIMESTAMP(3), > city STRING, > PRIMARY KEY (station_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'stations', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE booking_channels ( > booking_channel_key STRING, > update_time TIMESTAMP(3), > channel STRING, > PRIMARY KEY (booking_channel_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'booking_channels', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE train_activities ( > scheduled_departure_time TIMESTAMP(3), > actual_departure_date TIMESTAMP(3), > passenger_key STRING, > origin_station_key STRING, > destination_station_key STRING, > booking_channel_key STRING, > PRIMARY KEY (booking_channel_key, origin_station_key, > destination_station_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'train_activities', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'json', > 'value.format' = 'json' > ); > SELECT > t.actual_departure_date, > p.first_name, > p.last_name, > b.channel, > os.city AS origin_station, > ds.city AS destination_station > FROM train_activities_1 t > LEFT JOIN booking_channels b > ON t.booking_channel_key = b.booking_channel_key > LEFT JOIN passengers p > ON t.passenger_key = p.passenger_key > LEFT JOIN stations os > ON t.origin_station_key = os.station_key > LEFT JOIN stations ds > ON t.destination_station_key = ds.station_key > {code} > > The query will generate exeuction plan of: > > {code:java} > Flink SQL> explain > > SELECT > >t.actual_departure_date, > >p.first_name, > >p.last_name, > >b.channel, > >os.city AS origin_station, > >ds.city AS destination_station > > FROM train_activities_1 t > > LEFT JOIN booking_channels b > > ON t.booking_channel_key = b.booking_channel_key > > LEFT JOIN passengers p > > ON t.passenger_key = p.passenger_key > >
[jira] [Commented] (FLINK-22113) UniqueKey constraint is lost with multiple sources join in SQL
[ https://issues.apache.org/jira/browse/FLINK-22113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321868#comment-17321868 ] Xu Guangheng commented on FLINK-22113: -- Hello [~jark], Thanks for you reply. Yes, FlinkRelMdColumnUniqueness has implemented areColumnsUnique for TableScan node, but there is a TODO inside the implementation. It seems the implementation is not complete. Can you assign the issue to me, I cam try to fix it. > UniqueKey constraint is lost with multiple sources join in SQL > -- > > Key: FLINK-22113 > URL: https://issues.apache.org/jira/browse/FLINK-22113 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.0 >Reporter: Fu Kai >Priority: Major > Fix For: 1.14.0 > > > Hi team, > > We have a use case to join multiple data sources to generate a continuous > updated view. We defined primary key constraint on all the input sources and > all the keys are the subsets in the join condition. All joins are left join. > > In our case, the first two inputs can produce *JoinKeyContainsUniqueKey* > input sepc, which is good and performant. While when it comes to the third > input source, it's joined with the intermediate output table of the first two > input tables, and the intermediate table does not carry key constraint > information(although the thrid source input table does), so it results in a > *NoUniqueKey* input sepc. Given NoUniqueKey inputs has dramatic performance > implications per the[ Force Join Unique > Key|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Force-Join-Unique-Key-td39521.html#a39651] > email thread, we want to know if there is any mitigation solution for this. > > Example: > Take the example from > [https://github.com/ververica/flink-sql-cookbook/blob/master/joins/05/05_star_schema.md] > {code:java} > CREATE TEMPORARY TABLE passengers ( > passenger_key STRING, > first_name STRING, > last_name STRING, > update_time TIMESTAMP(3), > PRIMARY KEY (passenger_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'passengers', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE stations ( > station_key STRING, > update_time TIMESTAMP(3), > city STRING, > PRIMARY KEY (station_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'stations', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE booking_channels ( > booking_channel_key STRING, > update_time TIMESTAMP(3), > channel STRING, > PRIMARY KEY (booking_channel_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'booking_channels', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'raw', > 'value.format' = 'json' > ); > CREATE TEMPORARY TABLE train_activities ( > scheduled_departure_time TIMESTAMP(3), > actual_departure_date TIMESTAMP(3), > passenger_key STRING, > origin_station_key STRING, > destination_station_key STRING, > booking_channel_key STRING, > PRIMARY KEY (booking_channel_key, origin_station_key, > destination_station_key) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'train_activities', > 'properties.bootstrap.servers' = 'localhost:9092', > 'key.format' = 'json', > 'value.format' = 'json' > ); > SELECT > t.actual_departure_date, > p.first_name, > p.last_name, > b.channel, > os.city AS origin_station, > ds.city AS destination_station > FROM train_activities_1 t > LEFT JOIN booking_channels b > ON t.booking_channel_key = b.booking_channel_key > LEFT JOIN passengers p > ON t.passenger_key = p.passenger_key > LEFT JOIN stations os > ON t.origin_station_key = os.station_key > LEFT JOIN stations ds > ON t.destination_station_key = ds.station_key > {code} > > The query will generate exeuction plan of: > > {code:java} > Flink SQL> explain > > SELECT > >t.actual_departure_date, > >p.first_name, > >p.last_name, > >b.channel, > >os.city AS origin_station, > >ds.city AS destination_station > > FROM train_activities_1 t > > LEFT JOIN booking_channels b > > ON t.booking_channel_key = b.booking_channel_key > > LEFT JOIN passengers p > > ON t.passenger_key = p.passenger_key > > LEFT JOIN stations os > > ON t.origin_station_key = os.station_key > > LEFT JOIN stations ds > > ON t.destination_station_key = ds.station_key; > == Abstract Syntax Tree == > LogicalProject(actual_departure_date=[$1], first_name=[$10], last_name=[$11], > channel=[$8], origin_station=[$15], destination_station=[$18]) > +-
[jira] [Updated] (FLINK-22125) Revisit the return value of MapState.get when a key doesn't exist
[ https://issues.apache.org/jira/browse/FLINK-22125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-22125: Summary: Revisit the return value of MapState.get when a key doesn't exist (was: Revisit the return value of MapState.get when the key doesn't exist) > Revisit the return value of MapState.get when a key doesn't exist > - > > Key: FLINK-22125 > URL: https://issues.apache.org/jira/browse/FLINK-22125 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.13.0 >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > Currently, it will thrown KeyError if the key doesn't exist for MapState in > Python DataStream API. However, it returns null in the Java DataStream API. > Maybe we should keep the behavior the same across Python DataStream API and > Java DataStream API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-22125) Revisit the return value of MapState.get when the key doesn't exist
[ https://issues.apache.org/jira/browse/FLINK-22125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-22125: --- Assignee: Dian Fu > Revisit the return value of MapState.get when the key doesn't exist > --- > > Key: FLINK-22125 > URL: https://issues.apache.org/jira/browse/FLINK-22125 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.13.0 >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > Currently, it will thrown KeyError if the key doesn't exist for MapState in > Python DataStream API. However, it returns null in the Java DataStream API. > Maybe we should keep the behavior the same across Python DataStream API and > Java DataStream API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-22273) Add documentation for General Python Group Window Aggregation in Python Table API
[ https://issues.apache.org/jira/browse/FLINK-22273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-22273. --- Resolution: Fixed Merged to master via 00a92b4fb188a8c1504279c07504b8fa10ca46e6 > Add documentation for General Python Group Window Aggregation in Python Table > API > - > > Key: FLINK-22273 > URL: https://issues.apache.org/jira/browse/FLINK-22273 > Project: Flink > Issue Type: Improvement > Components: API / Python, Documentation >Affects Versions: 1.13.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] dianfu closed pull request #15609: [FLINK-22273][python][docs] Add documentation for General Python Group Window Aggregation in Python Table API
dianfu closed pull request #15609: URL: https://github.com/apache/flink/pull/15609 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #15618: [FLINK-22125][python] Returns None when a key doesn't exist when calling MapState.get()
flinkbot commented on pull request #15618: URL: https://github.com/apache/flink/pull/15618#issuecomment-820020073 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit a5030e29742e58a6df2dbca7d7e1ce27a90e8722 (Thu Apr 15 02:55:52 UTC 2021) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-22125).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-22125) Revisit the return value of MapState.get when the key doesn't exist
[ https://issues.apache.org/jira/browse/FLINK-22125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-22125: --- Labels: pull-request-available (was: ) > Revisit the return value of MapState.get when the key doesn't exist > --- > > Key: FLINK-22125 > URL: https://issues.apache.org/jira/browse/FLINK-22125 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.13.0 >Reporter: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > Currently, it will thrown KeyError if the key doesn't exist for MapState in > Python DataStream API. However, it returns null in the Java DataStream API. > Maybe we should keep the behavior the same across Python DataStream API and > Java DataStream API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] dianfu opened a new pull request #15618: [FLINK-22125][python] Returns None when a key doesn't exist when calling MapState.get()
dianfu opened a new pull request #15618: URL: https://github.com/apache/flink/pull/15618 ## What is the purpose of the change *Currently it will throw KeyError when a key doesn't exist when calling MapState.get. This pull request change the behavior of MapState.get to None when a key doesn't exist.* ## Verifying this change This change is a trivial rework without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-22125) Revisit the return value of MapState.get when the key doesn't exist
[ https://issues.apache.org/jira/browse/FLINK-22125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321843#comment-17321843 ] Roc Marshal edited comment on FLINK-22125 at 4/15/21, 2:42 AM: --- When we are judging whether mapstate is empty or not, there may be some logical loopholes, such as using 'is_ empty () 'method instead of`_ is_ empty ` variable was (Author: rocmarshal): When we judge whether mapstate is empty or not, there may be some logical loopholes, such as using 'is_ empty () 'method instead of`_ is_ empty ` variable > Revisit the return value of MapState.get when the key doesn't exist > --- > > Key: FLINK-22125 > URL: https://issues.apache.org/jira/browse/FLINK-22125 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.13.0 >Reporter: Dian Fu >Priority: Major > Fix For: 1.13.0 > > > Currently, it will thrown KeyError if the key doesn't exist for MapState in > Python DataStream API. However, it returns null in the Java DataStream API. > Maybe we should keep the behavior the same across Python DataStream API and > Java DataStream API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22125) Revisit the return value of MapState.get when the key doesn't exist
[ https://issues.apache.org/jira/browse/FLINK-22125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321863#comment-17321863 ] Dian Fu commented on FLINK-22125: - Hi [~RocMarshal], there is a method named is_empty() in MapState. You could refer to https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/state.py#L262 for more details. Not sure what do you mean about _is_empty()? > Revisit the return value of MapState.get when the key doesn't exist > --- > > Key: FLINK-22125 > URL: https://issues.apache.org/jira/browse/FLINK-22125 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.13.0 >Reporter: Dian Fu >Priority: Major > Fix For: 1.13.0 > > > Currently, it will thrown KeyError if the key doesn't exist for MapState in > Python DataStream API. However, it returns null in the Java DataStream API. > Maybe we should keep the behavior the same across Python DataStream API and > Java DataStream API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)
[ https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321856#comment-17321856 ] xx chai commented on FLINK-22281: - i add the Parameter streamTableEnvironment.getConfig().addJobParameter("table.exec.source.cdc-events-duplicate","true"); but the result is not change > flink sql consumer kakfa canal-json message then sum(amount) > -- > > Key: FLINK-22281 > URL: https://issues.apache.org/jira/browse/FLINK-22281 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: flink 1.12 local >Reporter: xx chai >Priority: Major > Attachments: screenshot-1.png > > > I use flink sql to consumer kafka canal-json message the sql is > CREATE TABLE kafka_mall_order_info ( > id int, > amount double, >PRIMARY KEY ( id) NOT ENFORCED >) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_yx-dc-3-102_3306', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'kafka_to_hive', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'canal-json'); > create table t2 (amount double) with ('connector' = 'print'); > > > insert into t2 select sum(amount) from kafka_mall_order_info ; > but the result is not i think > the result in image -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-16325) A connection check is required, and it needs to be reopened when the JDBC connection is interrupted
[ https://issues.apache.org/jira/browse/FLINK-16325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] todd closed FLINK-16325. Resolution: Invalid > A connection check is required, and it needs to be reopened when the JDBC > connection is interrupted > > > Key: FLINK-16325 > URL: https://issues.apache.org/jira/browse/FLINK-16325 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.10.0 >Reporter: todd >Priority: Minor > > JDBCOutputFormat#writeRecord. > When writing data, if the JDBC connection has been disconnected, the data > will be lost.Therefore, a connectivity judgment is required in the > writeRecord method. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-22279) Upload the pipeline.classpaths file in yarn application mode
[ https://issues.apache.org/jira/browse/FLINK-22279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] todd closed FLINK-22279. Resolution: Invalid > Upload the pipeline.classpaths file in yarn application mode > > > Key: FLINK-22279 > URL: https://issues.apache.org/jira/browse/FLINK-22279 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.12.0 >Reporter: todd >Priority: Major > > pipeline.classpaths is a local resource package. If this file is used, all > nodes must include this file. I think that pipeline.jars should be uploaded > from the client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22279) Upload the pipeline.classpaths file in yarn application mode
[ https://issues.apache.org/jira/browse/FLINK-22279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321854#comment-17321854 ] todd commented on FLINK-22279: -- Thank you very much, what you said is right. I did not pay attention to this parameter. [~fly_in_gis] > Upload the pipeline.classpaths file in yarn application mode > > > Key: FLINK-22279 > URL: https://issues.apache.org/jira/browse/FLINK-22279 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.12.0 >Reporter: todd >Priority: Major > > pipeline.classpaths is a local resource package. If this file is used, all > nodes must include this file. I think that pipeline.jars should be uploaded > from the client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14927) Remove LegacyTimestampTypeInfo/LegacyLocalDateTimeTypeInfo/LegacyInstantTypeInfo when the conversion is not needed
[ https://issues.apache.org/jira/browse/FLINK-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-14927: Priority: Major (was: Minor) > Remove > LegacyTimestampTypeInfo/LegacyLocalDateTimeTypeInfo/LegacyInstantTypeInfo > when the conversion is not needed > -- > > Key: FLINK-14927 > URL: https://issues.apache.org/jira/browse/FLINK-14927 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Zhenghua Gao >Priority: Major > Labels: stale-minor > > 1) PhysicalTableSourceScan.getSourceTransformation use > TypeInfoDataTypeConverter.fromDataTypeToTypeInfo, which bypass > TypeConversions to support precision > 2) Conversion from DateType to TypeInformation (and back) exists in > TableSourceUtil.computeIndexMapping -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-14927) Remove LegacyTimestampTypeInfo/LegacyLocalDateTimeTypeInfo/LegacyInstantTypeInfo when the conversion is not needed
[ https://issues.apache.org/jira/browse/FLINK-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-14927: Labels: (was: stale-minor) > Remove > LegacyTimestampTypeInfo/LegacyLocalDateTimeTypeInfo/LegacyInstantTypeInfo > when the conversion is not needed > -- > > Key: FLINK-14927 > URL: https://issues.apache.org/jira/browse/FLINK-14927 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Zhenghua Gao >Priority: Major > > 1) PhysicalTableSourceScan.getSourceTransformation use > TypeInfoDataTypeConverter.fromDataTypeToTypeInfo, which bypass > TypeConversions to support precision > 2) Conversion from DateType to TypeInformation (and back) exists in > TableSourceUtil.computeIndexMapping -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16120) Update the hive section of the connectors doc
[ https://issues.apache.org/jira/browse/FLINK-16120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321852#comment-17321852 ] Rui Li commented on FLINK-16120: Hive connector doc has been moved to the {{Table & SQL Connectors}}. Closing this one. > Update the hive section of the connectors doc > - > > Key: FLINK-16120 > URL: https://issues.apache.org/jira/browse/FLINK-16120 > Project: Flink > Issue Type: Task > Components: Connectors / Hive, Documentation >Reporter: Rui Li >Priority: Minor > Labels: stale-minor > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16120) Update the hive section of the connectors doc
[ https://issues.apache.org/jira/browse/FLINK-16120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Li updated FLINK-16120: --- Release Note: (was: Hive connector doc has been moved to the {{Table & SQL Connectors}}. Closing this one.) > Update the hive section of the connectors doc > - > > Key: FLINK-16120 > URL: https://issues.apache.org/jira/browse/FLINK-16120 > Project: Flink > Issue Type: Task > Components: Connectors / Hive, Documentation >Reporter: Rui Li >Priority: Minor > Labels: stale-minor > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (FLINK-16120) Update the hive section of the connectors doc
[ https://issues.apache.org/jira/browse/FLINK-16120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Li resolved FLINK-16120. Release Note: Hive connector doc has been moved to the {{Table & SQL Connectors}}. Closing this one. Resolution: Not A Problem > Update the hive section of the connectors doc > - > > Key: FLINK-16120 > URL: https://issues.apache.org/jira/browse/FLINK-16120 > Project: Flink > Issue Type: Task > Components: Connectors / Hive, Documentation >Reporter: Rui Li >Priority: Minor > Labels: stale-minor > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22275) Datagen add a max lag option for a series of timestamp-related types
[ https://issues.apache.org/jira/browse/FLINK-22275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321851#comment-17321851 ] Jark Wu commented on FLINK-22275: - We should discuss about the API/options and behaviors first. > Datagen add a max lag option for a series of timestamp-related types > > > Key: FLINK-22275 > URL: https://issues.apache.org/jira/browse/FLINK-22275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem >Reporter: Yi Tang >Priority: Minor > > For now, the {{datagen}} connector always resolves to the current timestamp > for timestamp-related types. > Here proposals to add a max lag option for these types, which will generate a > timestamp with a random lag (with max lag as option) compared with the > current timestamp. Leave it as before if the option is not configured. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-22275) Datagen add a max lag option for a series of timestamp-related types
[ https://issues.apache.org/jira/browse/FLINK-22275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-22275: --- Assignee: Yi Tang > Datagen add a max lag option for a series of timestamp-related types > > > Key: FLINK-22275 > URL: https://issues.apache.org/jira/browse/FLINK-22275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem >Reporter: Yi Tang >Assignee: Yi Tang >Priority: Minor > > For now, the {{datagen}} connector always resolves to the current timestamp > for timestamp-related types. > Here proposals to add a max lag option for these types, which will generate a > timestamp with a random lag (with max lag as option) compared with the > current timestamp. Leave it as before if the option is not configured. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-22275) Datagen add a max lag option for a series of timestamp-related types
[ https://issues.apache.org/jira/browse/FLINK-22275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-22275: --- Assignee: (was: Yi Tang) > Datagen add a max lag option for a series of timestamp-related types > > > Key: FLINK-22275 > URL: https://issues.apache.org/jira/browse/FLINK-22275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem >Reporter: Yi Tang >Priority: Minor > > For now, the {{datagen}} connector always resolves to the current timestamp > for timestamp-related types. > Here proposals to add a max lag option for these types, which will generate a > timestamp with a random lag (with max lag as option) compared with the > current timestamp. Leave it as before if the option is not configured. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)
[ https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-22281. --- Resolution: Not A Problem > flink sql consumer kakfa canal-json message then sum(amount) > -- > > Key: FLINK-22281 > URL: https://issues.apache.org/jira/browse/FLINK-22281 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: flink 1.12 local >Reporter: xx chai >Priority: Major > Attachments: screenshot-1.png > > > I use flink sql to consumer kafka canal-json message the sql is > CREATE TABLE kafka_mall_order_info ( > id int, > amount double, >PRIMARY KEY ( id) NOT ENFORCED >) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_yx-dc-3-102_3306', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'kafka_to_hive', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'canal-json'); > create table t2 (amount double) with ('connector' = 'print'); > > > insert into t2 select sum(amount) from kafka_mall_order_info ; > but the result is not i think > the result in image -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)
[ https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321850#comment-17321850 ] Jark Wu commented on FLINK-22281: - If you don't have the full historical data, only the incremental binlog data, then aggregating on it will get wrong results, because the input data is incorrect (update an non-existing data). You can try to turn on {{table.exec.source.cdc-events-duplicate=true}} to convert the in-complete changelog into a normalized changelog, e.g. a non-existing update will be converted into an insert. See https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/canal.html#duplicate-change-events > flink sql consumer kakfa canal-json message then sum(amount) > -- > > Key: FLINK-22281 > URL: https://issues.apache.org/jira/browse/FLINK-22281 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: flink 1.12 local >Reporter: xx chai >Priority: Major > Attachments: screenshot-1.png > > > I use flink sql to consumer kafka canal-json message the sql is > CREATE TABLE kafka_mall_order_info ( > id int, > amount double, >PRIMARY KEY ( id) NOT ENFORCED >) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_yx-dc-3-102_3306', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'kafka_to_hive', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'canal-json'); > create table t2 (amount double) with ('connector' = 'print'); > > > insert into t2 select sum(amount) from kafka_mall_order_info ; > but the result is not i think > the result in image -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-12085) Improve Mesos job cluster entry point 'MesosJobClusterEntrypoint'
[ https://issues.apache.org/jira/browse/FLINK-12085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacky Yin updated FLINK-12085: -- Labels: (was: stale-minor) > Improve Mesos job cluster entry point 'MesosJobClusterEntrypoint' > -- > > Key: FLINK-12085 > URL: https://issues.apache.org/jira/browse/FLINK-12085 > Project: Flink > Issue Type: Improvement > Components: Deployment / Mesos >Affects Versions: 1.7.2 >Reporter: Jacky Yin >Assignee: Jacky Yin >Priority: Minor > Original Estimate: 336h > Remaining Estimate: 336h > > Currently, the Mesos job cluster is not that easy to use. You have to > manually serialize the job graph and specify the job graph file path. And the > only way to include the user code jars is to put them in the `lib` folder. > `StandaloneJobClusterEntrypoint` is a good example to learn. It can include > the user code jars and allow to specify the entry class. That way we would > not need to generate the JobGraph yourself and then serialize it. I would > like to enhance `MesosJobClusterEntrypoint` similarly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] lirui-apache commented on a change in pull request #15549: [FLINK-20761][hive] Escape the location path when creating input splits
lirui-apache commented on a change in pull request #15549: URL: https://github.com/apache/flink/pull/15549#discussion_r613709681 ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java ## @@ -442,6 +448,41 @@ public void testDynamicPartWithOrderBy() throws Exception { } } +@Test +public void testLocationWithComma() throws Exception { +TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); +File location = tempFolder.newFolder(",tbl1,location,"); +try { +// test table location +tableEnv.executeSql( +String.format( +"create table tbl1 (x int) location '%s'", location.getAbsolutePath())); +tableEnv.executeSql("insert into tbl1 values (1),(2)").await(); +List results = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select * from tbl1").collect()); +assertEquals("[+I[1], +I[2]]", results.toString()); +// test partition location +tableEnv.executeSql("create table tbl2 (x int) partitioned by (p string)"); +location = tempFolder.newFolder(","); +tableEnv.executeSql( +String.format( +"alter table tbl2 add partition (p='a') location '%s'", Review comment: The location path has comma and hive's partition value doesn't have to be the same as the location path. But I can also add a test case where the partition value also contains comma. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-22279) Upload the pipeline.classpaths file in yarn application mode
[ https://issues.apache.org/jira/browse/FLINK-22279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yang Wang updated FLINK-22279: -- Priority: Major (was: Blocker) > Upload the pipeline.classpaths file in yarn application mode > > > Key: FLINK-22279 > URL: https://issues.apache.org/jira/browse/FLINK-22279 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.12.0 >Reporter: renjianxu >Priority: Major > > pipeline.classpaths is a local resource package. If this file is used, all > nodes must include this file. I think that pipeline.jars should be uploaded > from the client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22279) Upload the pipeline.classpaths file in yarn application mode
[ https://issues.apache.org/jira/browse/FLINK-22279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321846#comment-17321846 ] Yang Wang commented on FLINK-22279: --- I do not think this is a bug of Flink. The config option {{pipeline.classpaths}} is designed for working same as CLI option {{-C,--classpath}}. If you want to ship files, then you should use {{yarn.ship-files}}. {code:java} -C,--classpath Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}. {code} > Upload the pipeline.classpaths file in yarn application mode > > > Key: FLINK-22279 > URL: https://issues.apache.org/jira/browse/FLINK-22279 > Project: Flink > Issue Type: Bug > Components: Command Line Client >Affects Versions: 1.12.0 >Reporter: renjianxu >Priority: Blocker > > pipeline.classpaths is a local resource package. If this file is used, all > nodes must include this file. I think that pipeline.jars should be uploaded > from the client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22280) Add XML output format for Flink Table
[ https://issues.apache.org/jira/browse/FLINK-22280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-22280: Component/s: (was: Table SQL / API) Table SQL / Ecosystem Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Add XML output format for Flink Table > - > > Key: FLINK-22280 > URL: https://issues.apache.org/jira/browse/FLINK-22280 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.12.1 >Reporter: François Lacombe >Priority: Major > > Dear maintainers > I'm looking for the ability to output xml files from Flink Table API, just > like csv and json already supported formats. > To me, a new format could be required to make the appropriate serialization. > Am I missing any existing feature (or duplicate issue) that could allow it > without a dedicated format? > Depending on your returns and if it makes sense, I could get involved in > writing the appropriate format based on the same logic as > `JsonRowDataSerializationSchema`. > Best regards -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22280) Add XML output format for Flink Table
[ https://issues.apache.org/jira/browse/FLINK-22280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321845#comment-17321845 ] Jark Wu commented on FLINK-22280: - Yes. I think this can be a xml format. > Add XML output format for Flink Table > - > > Key: FLINK-22280 > URL: https://issues.apache.org/jira/browse/FLINK-22280 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem >Affects Versions: 1.12.1 >Reporter: François Lacombe >Priority: Major > > Dear maintainers > I'm looking for the ability to output xml files from Flink Table API, just > like csv and json already supported formats. > To me, a new format could be required to make the appropriate serialization. > Am I missing any existing feature (or duplicate issue) that could allow it > without a dedicated format? > Depending on your returns and if it makes sense, I could get involved in > writing the appropriate format based on the same logic as > `JsonRowDataSerializationSchema`. > Best regards -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22125) Revisit the return value of MapState.get when the key doesn't exist
[ https://issues.apache.org/jira/browse/FLINK-22125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321843#comment-17321843 ] Roc Marshal commented on FLINK-22125: - When we judge whether mapstate is empty or not, there may be some logical loopholes, such as using 'is_ empty () 'method instead of`_ is_ empty ` variable > Revisit the return value of MapState.get when the key doesn't exist > --- > > Key: FLINK-22125 > URL: https://issues.apache.org/jira/browse/FLINK-22125 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.13.0 >Reporter: Dian Fu >Priority: Major > Fix For: 1.13.0 > > > Currently, it will thrown KeyError if the key doesn't exist for MapState in > Python DataStream API. However, it returns null in the Java DataStream API. > Maybe we should keep the behavior the same across Python DataStream API and > Java DataStream API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread
[ https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated FLINK-22282: - Issue Type: Bug (was: Task) > Move creation of SplitEnumerator to the SourceCoordinator thread > > > Key: FLINK-22282 > URL: https://issues.apache.org/jira/browse/FLINK-22282 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.2 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Critical > Fix For: 1.13.0, 1.12.3 > > > Currently the creation of the SplitEnumerator is in the JM main thread. In > case the SplitEnumerator instantiation takes long, the job execution will > timeout. The fix is moving the SplitEnumerator creation to the coordinator > thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread
[ https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated FLINK-22282: - Priority: Critical (was: Major) > Move creation of SplitEnumerator to the SourceCoordinator thread > > > Key: FLINK-22282 > URL: https://issues.apache.org/jira/browse/FLINK-22282 > Project: Flink > Issue Type: Task > Components: Connectors / Common >Affects Versions: 1.12.2 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Critical > Fix For: 1.13.0, 1.12.3 > > > Currently the creation of the SplitEnumerator is in the JM main thread. In > case the SplitEnumerator instantiation takes long, the job execution will > timeout. The fix is moving the SplitEnumerator creation to the coordinator > thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread
[ https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated FLINK-22282: - Issue Type: Improvement (was: Bug) > Move creation of SplitEnumerator to the SourceCoordinator thread > > > Key: FLINK-22282 > URL: https://issues.apache.org/jira/browse/FLINK-22282 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.12.2 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Critical > Fix For: 1.13.0, 1.12.3 > > > Currently the creation of the SplitEnumerator is in the JM main thread. In > case the SplitEnumerator instantiation takes long, the job execution will > timeout. The fix is moving the SplitEnumerator creation to the coordinator > thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread
[ https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated FLINK-22282: - Fix Version/s: 1.12.3 1.13.0 > Move creation of SplitEnumerator to the SourceCoordinator thread > > > Key: FLINK-22282 > URL: https://issues.apache.org/jira/browse/FLINK-22282 > Project: Flink > Issue Type: Task > Components: Connectors / Common >Affects Versions: 1.12.2 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Major > Fix For: 1.13.0, 1.12.3 > > > Currently the creation of the SplitEnumerator is in the JM main thread. In > case the SplitEnumerator instantiation takes long, the job execution will > timeout. The fix is moving the SplitEnumerator creation to the coordinator > thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread
[ https://issues.apache.org/jira/browse/FLINK-22282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin reassigned FLINK-22282: Assignee: Jiangjie Qin > Move creation of SplitEnumerator to the SourceCoordinator thread > > > Key: FLINK-22282 > URL: https://issues.apache.org/jira/browse/FLINK-22282 > Project: Flink > Issue Type: Task > Components: Connectors / Common >Affects Versions: 1.12.2 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Major > > Currently the creation of the SplitEnumerator is in the JM main thread. In > case the SplitEnumerator instantiation takes long, the job execution will > timeout. The fix is moving the SplitEnumerator creation to the coordinator > thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)
[ https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xx chai updated FLINK-22281: Description: I use flink sql to consumer kafka canal-json message the sql is CREATE TABLE kafka_mall_order_info ( id int, amount double, PRIMARY KEY ( id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'topic_yx-dc-3-102_3306', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'kafka_to_hive', 'scan.startup.mode' = 'latest-offset', 'format' = 'canal-json'); create table t2 (amount double) with ('connector' = 'print'); insert into t2 select sum(amount) from kafka_mall_order_info ; but the result is not i think the result in image was: I use flink sql to consumer kafka canal-json message the sql is CREATE TABLE kafka_mall_order_info ( id int, amount double, PRIMARY KEY ( id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'topic_yx-dc-3-102_3306', 'properties.bootstrap.servers' = '192.168.3.100:9092,192.168.3.101:9092,192.168.3.102:9092', 'properties.group.id' = 'kafka_to_hive', 'scan.startup.mode' = 'latest-offset', 'format' = 'canal-json'); create table t2 (amount double) with ('connector' = 'print'); insert into t2 select sum(amount) from kafka_mall_order_info ; but the result is not i think the result in image > flink sql consumer kakfa canal-json message then sum(amount) > -- > > Key: FLINK-22281 > URL: https://issues.apache.org/jira/browse/FLINK-22281 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: flink 1.12 local >Reporter: xx chai >Priority: Major > Attachments: screenshot-1.png > > > I use flink sql to consumer kafka canal-json message the sql is > CREATE TABLE kafka_mall_order_info ( > id int, > amount double, >PRIMARY KEY ( id) NOT ENFORCED >) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_yx-dc-3-102_3306', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'kafka_to_hive', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'canal-json'); > create table t2 (amount double) with ('connector' = 'print'); > > > insert into t2 select sum(amount) from kafka_mall_order_info ; > but the result is not i think > the result in image -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)
[ https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xx chai updated FLINK-22281: Description: I use flink sql to consumer kafka canal-json message the sql is CREATE TABLE kafka_mall_order_info ( id int, amount double, PRIMARY KEY ( id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'topic_yx-dc-3-102_3306', 'properties.bootstrap.servers' = '192.168.3.100:9092,192.168.3.101:9092,192.168.3.102:9092', 'properties.group.id' = 'kafka_to_hive', 'scan.startup.mode' = 'latest-offset', 'format' = 'canal-json'); create table t2 (amount double) with ('connector' = 'print'); insert into t2 select sum(amount) from kafka_mall_order_info ; but the result is not i think the result in image was: I use flink sql to consumer kafka canal-json message the sql is CREATE TABLE kafka_mall_order_info ( id int, amount double, PRIMARY KEY ( id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'topic_yx-dc-3-102_3306', 'properties.bootstrap.servers' = '192.168.3.100:9092,192.168.3.101:9092,192.168.3.102:9092', 'properties.group.id' = 'kafka_to_hive', 'scan.startup.mode' = 'latest-offset', 'format' = 'canal-json'); create table t2 (amount double) with ('connector' = 'print'); insert into t2 select sum(amount) from kafka_mall_order_info ; but the result is not i think > flink sql consumer kakfa canal-json message then sum(amount) > -- > > Key: FLINK-22281 > URL: https://issues.apache.org/jira/browse/FLINK-22281 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: flink 1.12 local >Reporter: xx chai >Priority: Major > Attachments: screenshot-1.png > > > I use flink sql to consumer kafka canal-json message the sql is > CREATE TABLE kafka_mall_order_info ( > id int, > amount double, >PRIMARY KEY ( id) NOT ENFORCED >) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_yx-dc-3-102_3306', > 'properties.bootstrap.servers' = > '192.168.3.100:9092,192.168.3.101:9092,192.168.3.102:9092', > 'properties.group.id' = 'kafka_to_hive', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'canal-json'); > create table t2 (amount double) with ('connector' = 'print'); > > > insert into t2 select sum(amount) from kafka_mall_order_info ; > but the result is not i think > the result in image -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)
[ https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xx chai updated FLINK-22281: Attachment: screenshot-1.png > flink sql consumer kakfa canal-json message then sum(amount) > -- > > Key: FLINK-22281 > URL: https://issues.apache.org/jira/browse/FLINK-22281 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: flink 1.12 local >Reporter: xx chai >Priority: Major > Attachments: screenshot-1.png > > > I use flink sql to consumer kafka canal-json message the sql is > CREATE TABLE kafka_mall_order_info ( > id int, > amount double, >PRIMARY KEY ( id) NOT ENFORCED >) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_yx-dc-3-102_3306', > 'properties.bootstrap.servers' = > '192.168.3.100:9092,192.168.3.101:9092,192.168.3.102:9092', > 'properties.group.id' = 'kafka_to_hive', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'canal-json'); > create table t2 (amount double) with ('connector' = 'print'); > > > insert into t2 select sum(amount) from kafka_mall_order_info ; > but the result is not i think -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)
[ https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xx chai updated FLINK-22281: Description: I use flink sql to consumer kafka canal-json message the sql is CREATE TABLE kafka_mall_order_info ( id int, amount double, PRIMARY KEY ( id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'topic_yx-dc-3-102_3306', 'properties.bootstrap.servers' = '192.168.3.100:9092,192.168.3.101:9092,192.168.3.102:9092', 'properties.group.id' = 'kafka_to_hive', 'scan.startup.mode' = 'latest-offset', 'format' = 'canal-json'); create table t2 (amount double) with ('connector' = 'print'); insert into t2 select sum(amount) from kafka_mall_order_info ; but the result is not i think was:I use flink sql to consumer kafka canal-json message Environment: flink 1.12 local > flink sql consumer kakfa canal-json message then sum(amount) > -- > > Key: FLINK-22281 > URL: https://issues.apache.org/jira/browse/FLINK-22281 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: flink 1.12 local >Reporter: xx chai >Priority: Major > > I use flink sql to consumer kafka canal-json message the sql is > CREATE TABLE kafka_mall_order_info ( > id int, > amount double, >PRIMARY KEY ( id) NOT ENFORCED >) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_yx-dc-3-102_3306', > 'properties.bootstrap.servers' = > '192.168.3.100:9092,192.168.3.101:9092,192.168.3.102:9092', > 'properties.group.id' = 'kafka_to_hive', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'canal-json'); > create table t2 (amount double) with ('connector' = 'print'); > > > insert into t2 select sum(amount) from kafka_mall_order_info ; > but the result is not i think -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22282) Move creation of SplitEnumerator to the SourceCoordinator thread
Jiangjie Qin created FLINK-22282: Summary: Move creation of SplitEnumerator to the SourceCoordinator thread Key: FLINK-22282 URL: https://issues.apache.org/jira/browse/FLINK-22282 Project: Flink Issue Type: Task Components: Connectors / Common Affects Versions: 1.12.2 Reporter: Jiangjie Qin Currently the creation of the SplitEnumerator is in the JM main thread. In case the SplitEnumerator instantiation takes long, the job execution will timeout. The fix is moving the SplitEnumerator creation to the coordinator thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22275) Datagen add a max lag option for a series of timestamp-related types
[ https://issues.apache.org/jira/browse/FLINK-22275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321842#comment-17321842 ] Yi Tang commented on FLINK-22275: - [~jark] You can assign it to me, if no further more discussion needed. > Datagen add a max lag option for a series of timestamp-related types > > > Key: FLINK-22275 > URL: https://issues.apache.org/jira/browse/FLINK-22275 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem >Reporter: Yi Tang >Priority: Minor > > For now, the {{datagen}} connector always resolves to the current timestamp > for timestamp-related types. > Here proposals to add a max lag option for these types, which will generate a > timestamp with a random lag (with max lag as option) compared with the > current timestamp. Leave it as before if the option is not configured. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22281) flink sql consumer kakfa canal-json message then sum(amount)
[ https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xx chai updated FLINK-22281: Component/s: Table SQL / API Affects Version/s: 1.12.0 Description: I use flink sql to consumer kafka canal-json message Issue Type: Bug (was: Improvement) Summary: flink sql consumer kakfa canal-json message then sum(amount)(was: flink sql) > flink sql consumer kakfa canal-json message then sum(amount) > -- > > Key: FLINK-22281 > URL: https://issues.apache.org/jira/browse/FLINK-22281 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 >Reporter: xx chai >Priority: Major > > I use flink sql to consumer kafka canal-json message -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22281) flink sql
xx chai created FLINK-22281: --- Summary: flink sql Key: FLINK-22281 URL: https://issues.apache.org/jira/browse/FLINK-22281 Project: Flink Issue Type: Improvement Reporter: xx chai -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] KurtYoung commented on a change in pull request #15549: [FLINK-20761][hive] Escape the location path when creating input splits
KurtYoung commented on a change in pull request #15549: URL: https://github.com/apache/flink/pull/15549#discussion_r613693906 ## File path: flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java ## @@ -442,6 +448,41 @@ public void testDynamicPartWithOrderBy() throws Exception { } } +@Test +public void testLocationWithComma() throws Exception { +TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); +File location = tempFolder.newFolder(",tbl1,location,"); +try { +// test table location +tableEnv.executeSql( +String.format( +"create table tbl1 (x int) location '%s'", location.getAbsolutePath())); +tableEnv.executeSql("insert into tbl1 values (1),(2)").await(); +List results = +CollectionUtil.iteratorToList( +tableEnv.executeSql("select * from tbl1").collect()); +assertEquals("[+I[1], +I[2]]", results.toString()); +// test partition location +tableEnv.executeSql("create table tbl2 (x int) partitioned by (p string)"); +location = tempFolder.newFolder(","); +tableEnv.executeSql( +String.format( +"alter table tbl2 add partition (p='a') location '%s'", Review comment: The partition path should have comma? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org