[jira] [Updated] (FLINK-22456) Support InitializeOnMaster and FinalizeOnMaster to be used in InputFormat

2021-04-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-22456:
---
Labels: pull-request-available  (was: )

> Support InitializeOnMaster and FinalizeOnMaster to be used in InputFormat
> -
>
> Key: FLINK-22456
> URL: https://issues.apache.org/jira/browse/FLINK-22456
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Li
>Priority: Minor
>  Labels: pull-request-available
>
>         In _InputOutputFormatVertex_, _initializeGlobal_ and _finalizeGlobal_ 
> are only called when the Format is _OutputFormat_, however _InputFormat_ is 
> not be called.
>         In FLINK-1722, its say _HadoopOutputFormats_ ues it to do something 
> before and after the task. And they only support _initializeGlobal_ and 
> _finalizeGlobal_ in _OutputFormat_.
>         I don't know why _InputFormat_ doesn't support, anyone can tell me 
> why?
>         But I think _InitializeOnMaster_ and _FinalizeOnMaster_ should also 
> be supported in _InputFormat_.
>         For example, an offline task in _JdbcInputFormat_, user can use 
> _initializeGlobal_ to query the total counts of this task, and then user can 
> create InputSplits by total counts. While task running, user can add progress 
> indicators metric by calculating the total number of records divided by the 
> current number of reads, and even the remaining time of the task can be 
> estimated. It is very helpful for users to view task progress and remaining 
> time through external systems.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] kanata163 opened a new pull request #15775: [FLINK-22456][runtime] Support InitializeOnMaster and FinalizeOnMaster to be used in InputFormat

2021-04-26 Thread GitBox


kanata163 opened a new pull request #15775:
URL: https://github.com/apache/flink/pull/15775


   
   
   ## What is the purpose of the change
   
   *Support `InitializeOnMaster` and `FinalizeOnMaster` to be used in 
InputFormat*
   
   
   ## Brief change log
   
 - `InputOutputFormatVertex`calls `initializeGlobal()` when `inputFormat` 
instanceof `InitializeOnMaster`
 - `InputOutputFormatVertex`calls `finalizeGlobal()` when `inputFormat` 
instanceof `FinalizeOnMaster`
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Extended test that validates the logic of `InputOutputFormatVertex`.*
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15746: [FLINK-17783][Table SQL/Ecosystem]Add array,map,row types support for orc row writer

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15746:
URL: https://github.com/apache/flink/pull/15746#issuecomment-826269766


   
   ## CI report:
   
   * 08cf1b3d72dbf729d6de14c8d508b10b5d1a20b1 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17154)
 
   * 8d6792043f6dcba404f1313d35d3c1f9a6ddd7a4 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17265)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15131: [FLINK-21700][yarn]Allow to disable fetching Hadoop delegation token on Yarn

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15131:
URL: https://github.com/apache/flink/pull/15131#issuecomment-794957907


   
   ## CI report:
   
   * 0500d84f78d8fdb967b077fb024fda0cd4ba2d6a Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15682)
 
   * b99e1a12c74051454d211b0094c986b8741dd28f Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17261)
 
   * e375a91dd1bd01ff1142c14a0f982eb7d5f94c14 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-22085) KafkaSourceLegacyITCase hangs/fails on azure

2021-04-26 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332957#comment-17332957
 ] 

Dong Lin edited comment on FLINK-22085 at 4/27/21, 5:53 AM:


Thank you [~dwysakowicz] for the information.

For the first test failure [1], it could be because the Azure pipeline is very 
slow and the it takes more than 60 seconds (due to long GC) to complete that 
test. Maybe we can see if increasing the timeout to 120 seconds could reduce 
the failure rate of this test.

For the second test failure [2], it appears that the test failed due to 
"OperatorEvent from an OperatorCoordinator to a task was lost". This is relate 
to https://github.com/apache/flink/pull/15605 which was committed recently. 

Given that KafkaSourceLegacyITCase no longer hangs and the comment history in 
this JIRA is already very long, I opened 
https://issues.apache.org/jira/browse/FLINK-22488 to track the issue of 
"OperatorEvent from an OperatorCoordinator to a task was lost". Maybe we can 
close this JIRA and continue the discussion in FLINK-22488.

[1] 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17206=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6612
[2] 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17212=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=7062


was (Author: lindong):
Thank you [~dwysakowicz] for the information.

For the first test failure [1], it could be because the Azure pipeline is very 
slow and the it takes more than 60 seconds (due to long GC) to complete that 
test. Maybe we can see if increasing the timeout to 120 seconds could reduce 
the failure rate of this test.

For the second test failure [2], it appears that the test failed due to 
"OperatorEvent from an OperatorCoordinator to a task was lost". This is relate 
to https://github.com/apache/flink/pull/15605 which was committed recently. 

Since the KafkaSourceLegacyITCase no longer hangs and the comments in this JIRA 
is already very long, I opened 
https://issues.apache.org/jira/browse/FLINK-22488 to track the issue of 
"OperatorEvent from an OperatorCoordinator to a task was lost". Maybe we can 
close this JIRA and continue the discussion in FLINK-22488.

[1] 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17206=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6612
[2] 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17212=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=7062

> KafkaSourceLegacyITCase hangs/fails on azure
> 
>
> Key: FLINK-22085
> URL: https://issues.apache.org/jira/browse/FLINK-22085
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0, 1.14.0
>Reporter: Dawid Wysakowicz
>Assignee: Dong Lin
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.13.0
>
>
> 1) Observations
> a) The Azure pipeline would occasionally hang without printing any test error 
> information.
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=8219]
> b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO 
> level logging, the the test would hang with the following error message 
> printed repeatedly:
> {code:java}
> 20451 [New I/O boss #50] ERROR 
> org.apache.flink.networking.NetworkFailureHandler [] - Closing communication 
> channel because of an exception
> java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
> ~[?:1.8.0_151]
> at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 
> ~[?:1.8.0_151]
> at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
>  ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at 
> 

[GitHub] [flink] curcur edited a comment on pull request #15674: [FLINK-22311] [connector/jdbc] Validate maxRetries for XA Sink

2021-04-26 Thread GitBox


curcur edited a comment on pull request #15674:
URL: https://github.com/apache/flink/pull/15674#issuecomment-827331397


   doc checks seem to fail:
   
   
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17083=logs=c5d67f7d-375d-5407-4743-f9d0c4436a81=38411795-40c9-51fa-10b0-bd083cf9f5a5
   
   Would you please try
   `mvn package -Dgenerate-config-docs -pl flink-docs -am -nsu -DskipTests`
   
   to regenerate docs?
   
   Or I can simply put this into the doc follow-up:
   
https://github.com/apache/flink/pull/15720/commits/e69548aec17fdffe8a60dee7a4a58d3966ee06e7
   
   And you can finish the pre-check before 1.13 release?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22085) KafkaSourceLegacyITCase hangs/fails on azure

2021-04-26 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332957#comment-17332957
 ] 

Dong Lin commented on FLINK-22085:
--

Thank you [~dwysakowicz] for the information.

For the first test failure [1], it could be because the Azure pipeline is very 
slow and the it takes more than 60 seconds (due to long GC) to complete that 
test. Maybe we can see if increasing the timeout to 120 seconds could reduce 
the failure rate of this test.

For the second test failure [2], it appears that the test failed due to 
"OperatorEvent from an OperatorCoordinator to a task was lost". This is relate 
to https://github.com/apache/flink/pull/15605 which was committed recently. 

Since the KafkaSourceLegacyITCase no longer hangs and the comments in this JIRA 
is already very long, I opened 
https://issues.apache.org/jira/browse/FLINK-22488 to track the issue of 
"OperatorEvent from an OperatorCoordinator to a task was lost". Maybe we can 
close this JIRA and continue the discussion in FLINK-22488.

[1] 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17206=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6612
[2] 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17212=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=7062

> KafkaSourceLegacyITCase hangs/fails on azure
> 
>
> Key: FLINK-22085
> URL: https://issues.apache.org/jira/browse/FLINK-22085
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0, 1.14.0
>Reporter: Dawid Wysakowicz
>Assignee: Dong Lin
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.13.0
>
>
> 1) Observations
> a) The Azure pipeline would occasionally hang without printing any test error 
> information.
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=8219]
> b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO 
> level logging, the the test would hang with the following error message 
> printed repeatedly:
> {code:java}
> 20451 [New I/O boss #50] ERROR 
> org.apache.flink.networking.NetworkFailureHandler [] - Closing communication 
> channel because of an exception
> java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
> ~[?:1.8.0_151]
> at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 
> ~[?:1.8.0_151]
> at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
>  ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at 
> org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at 
> org.apache.flink.shaded.testutils.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at 
> org.apache.flink.shaded.testutils.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>  [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_151]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_151]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
> {code}
> *2) Root cause explanations*
> The test would hang because it enters the following loop:
>  - closeOnFlush() is called for a given channel
>  - closeOnFlush() calls channel.write(..)
>  - channel.write() triggers the exceptionCaught(...) callback
>  - closeOnFlush() is called for the same channel again.
> *3) Solution*
> Update closeOnFlush() so that, if a channel is being closed by this method, 
> then closeOnFlush() would not try to write to this channel if it is called on 
> 

[GitHub] [flink] wuchong commented on a change in pull request #15755: [FLINK-22318][table] Support RENAME column name for ALTER TABLE state…

2021-04-26 Thread GitBox


wuchong commented on a change in pull request #15755:
URL: https://github.com/apache/flink/pull/15755#discussion_r620886208



##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##
@@ -144,6 +149,116 @@ public static Operation convertChangeColumn(
 // TODO: handle watermark and constraints
 }
 
+public static Operation convertRenameColumn(
+ObjectIdentifier tableIdentifier,
+String originColumnName,
+String newColumnName,
+CatalogTable catalogTable) {
+
+Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();
+validateColumnName(originColumnName, newColumnName, 
modifiedTableSchema);
+
+Schema.Builder builder = Schema.newBuilder();
+// build column
+modifiedTableSchema.getColumns().stream()
+.forEach(
+column -> {
+if (StringUtils.equals(column.getName(), 
originColumnName)) {
+buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
+} else {
+buildNewColumnFromOriginColumn(builder, 
column, column.getName());
+}
+});
+// build primary key column
+List originPrimaryKeyNames =
+modifiedTableSchema
+.getPrimaryKey()
+.map(Schema.UnresolvedPrimaryKey::getColumnNames)
+.orElseGet(Lists::newArrayList);
+
+List newPrimaryKeyNames =
+originPrimaryKeyNames.stream()
+.map(
+pkName ->
+StringUtils.equals(pkName, 
originColumnName)
+? newColumnName
+: pkName)
+.collect(Collectors.toList());
+
+if (newPrimaryKeyNames.size() > 0) {
+builder.primaryKey(newPrimaryKeyNames);
+}
+// build watermark
+modifiedTableSchema.getWatermarkSpecs().stream()
+.forEach(
+watermarkSpec -> {
+String watermarkRefColumnName = 
watermarkSpec.getColumnName();
+Expression watermarkExpression = 
watermarkSpec.getWatermarkExpression();
+if (StringUtils.equals(watermarkRefColumnName, 
originColumnName)) {
+String newWatermarkExpression =
+((SqlCallExpression) 
watermarkExpression)
+.getSqlExpression()
+
.replace(watermarkRefColumnName, newColumnName);

Review comment:
   Would be better to investigate solutions first, I'm not sure whether 
there are any other ways to do this. 
   cc @twalthr , do you have any ideas?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #15674: [FLINK-22311] [connector/jdbc] Validate maxRetries for XA Sink

2021-04-26 Thread GitBox


curcur commented on pull request #15674:
URL: https://github.com/apache/flink/pull/15674#issuecomment-827331397


   doc checks seem to fail:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22408) Flink Table Parsr Hive Drop Partitions Syntax unparse is Error

2021-04-26 Thread Rui Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332945#comment-17332945
 ] 

Rui Li commented on FLINK-22408:


[~aidenma] OK. I think we can fix this issue in release-1.12 branch. We don't 
need to fix it in 1.13 and later, since the calcite parser is deprecated. So 
please submit a PR for 1.12.

> Flink Table Parsr Hive Drop Partitions Syntax unparse is Error
> --
>
> Key: FLINK-22408
> URL: https://issues.apache.org/jira/browse/FLINK-22408
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.11.3
>Reporter: Ma Jun
>Priority: Major
>  Labels: pull-request-available
>
> Flink Table Parser is error:
> *Synopsis:*
>  
> *SQL:*
> {code:java}
> alter table tbl drop partition (p1='a',p2=1), partition(p1='b',p2=2);{code}
> *hive muit partition unparse toSqlString is :*
> {code:java}
> ALTER TABLE `TBL`\n" +
>   "DROP\n" +
>   "PARTITION (`P1` = 'a', `P2` = 1)\n" +
>   "PARTITION (`P1` = 'b', `P2` = 2)
> {code}
> Missing comma in Partition SqlNodeList 
>  Hive syntax:
> {code:java}
> ALTER TABLE table_name DROP [IF EXISTS] PARTITION (partition_spec) [, 
> PARTITION (partition_spec)];
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22488) KafkaSourceLegacyITCase.testOneToOneSources failed due to "OperatorEvent from an OperatorCoordinator to a task was lost"

2021-04-26 Thread Dong Lin (Jira)
Dong Lin created FLINK-22488:


 Summary: KafkaSourceLegacyITCase.testOneToOneSources failed due to 
"OperatorEvent from an OperatorCoordinator to a task was lost"
 Key: FLINK-22488
 URL: https://issues.apache.org/jira/browse/FLINK-22488
 Project: Flink
  Issue Type: Improvement
Reporter: Dong Lin


According to [1], the test KafkaSourceLegacyITCase.testOneToOneSources failed 
because it runs a streaming job (which uses KafkaSource) with 
restartAttempts=1. In addition to the failover explicitly triggered by the 
FailingIdentityMapper, the job additionally failed due to 
"org.apache.flink.util.FlinkException: An OperatorEvent from an 
OperatorCoordinator to a task was lost. Triggering task failover to ensure 
consistency", which is unexpected by the test.

Note that SubtaskGatewayImpl was updated by [2] on 4/14 which triggers task 
failover if any OperatorEvent was lost. This could explain why those Kafka 
tests start to fail due to the exception described above.

In order to make this test stable, let's try to understand why there is such a 
high chance of loosing OperatorEvent in the Azure test pipeline. And if we 
could not avoid loosing OperatorEvent in the test pipeline, we probably need to 
update the test to allow the pipeline being restarted arbitrary times (and 
still be able to stop the test on the happy path).


[1] 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17212=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6960
[2] https://github.com/apache/flink/pull/15605




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15763: [FLINK-22364][doc] Translate the page of "Data Sources" to Chinese.

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15763:
URL: https://github.com/apache/flink/pull/15763#issuecomment-826589720


   
   ## CI report:
   
   * 0dcb773365ec16118d2b4c0f55910441bc6d7d58 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17259)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15746: [FLINK-17783][Table SQL/Ecosystem]Add array,map,row types support for orc row writer

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15746:
URL: https://github.com/apache/flink/pull/15746#issuecomment-826269766


   
   ## CI report:
   
   * 08cf1b3d72dbf729d6de14c8d508b10b5d1a20b1 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17154)
 
   * 8d6792043f6dcba404f1313d35d3c1f9a6ddd7a4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rmetzger commented on pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-04-26 Thread GitBox


rmetzger commented on pull request #15599:
URL: https://github.com/apache/flink/pull/15599#issuecomment-827326084


   Yeah, I'm happy to help, I also think I know what's causing this, but I 
don't know how to resolve it  
   
   I pushed this PR onto 
   - my Personal CI, where it passed without problems.
   - onto Flink's CI, where it passed without problems.
   - I manually restarted this PRs CI, and it broke again.
   
   The only difference between all these runs is that the broken PR CI run is 
downloading data from the maven cache on Azure (e.g. the local .m2 directory 
contains stuff in the failure case).
   
   @galenwarren Could you do me a favor and add a "-U" argument here: 
https://github.com/apache/flink/blob/master/tools/ci/maven-utils.sh#L107. This 
COULD resolve the issue.
   
   > This can be confirmed from the maven log, where no "Downloading xxx" can 
be found until the failure.
   
   The reason for the missing log statements is that we've disabled the 
download messages from maven: 
https://github.com/apache/flink/blob/master/tools/ci/maven-utils.sh#L103


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] todd5167 commented on a change in pull request #15755: [FLINK-22318][table] Support RENAME column name for ALTER TABLE state…

2021-04-26 Thread GitBox


todd5167 commented on a change in pull request #15755:
URL: https://github.com/apache/flink/pull/15755#discussion_r620879361



##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##
@@ -144,6 +149,116 @@ public static Operation convertChangeColumn(
 // TODO: handle watermark and constraints
 }
 
+public static Operation convertRenameColumn(
+ObjectIdentifier tableIdentifier,
+String originColumnName,
+String newColumnName,
+CatalogTable catalogTable) {
+
+Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();
+validateColumnName(originColumnName, newColumnName, 
modifiedTableSchema);
+
+Schema.Builder builder = Schema.newBuilder();
+// build column
+modifiedTableSchema.getColumns().stream()
+.forEach(
+column -> {
+if (StringUtils.equals(column.getName(), 
originColumnName)) {
+buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
+} else {
+buildNewColumnFromOriginColumn(builder, 
column, column.getName());
+}
+});
+// build primary key column
+List originPrimaryKeyNames =
+modifiedTableSchema
+.getPrimaryKey()
+.map(Schema.UnresolvedPrimaryKey::getColumnNames)
+.orElseGet(Lists::newArrayList);
+
+List newPrimaryKeyNames =
+originPrimaryKeyNames.stream()
+.map(
+pkName ->
+StringUtils.equals(pkName, 
originColumnName)
+? newColumnName
+: pkName)
+.collect(Collectors.toList());
+
+if (newPrimaryKeyNames.size() > 0) {
+builder.primaryKey(newPrimaryKeyNames);
+}
+// build watermark
+modifiedTableSchema.getWatermarkSpecs().stream()
+.forEach(
+watermarkSpec -> {
+String watermarkRefColumnName = 
watermarkSpec.getColumnName();
+Expression watermarkExpression = 
watermarkSpec.getWatermarkExpression();
+if (StringUtils.equals(watermarkRefColumnName, 
originColumnName)) {
+String newWatermarkExpression =
+((SqlCallExpression) 
watermarkExpression)
+.getSqlExpression()
+
.replace(watermarkRefColumnName, newColumnName);

Review comment:
   OK. Computed expression and Watermark expression  disallow rename  in 
the first version?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22485) Support client attach on application mode

2021-04-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332939#comment-17332939
 ] 

Jark Wu commented on FLINK-22485:
-

Sorry, [~zuston], I misunderstand your problem. Flink CLI already supports 
attached mode, you can refer: 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-attached

> Support client attach on application mode
> -
>
> Key: FLINK-22485
> URL: https://issues.apache.org/jira/browse/FLINK-22485
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Junfan Zhang
>Priority: Major
>
> Now, client will not wait until job finish when using application mode.
> Can we support client attach on application mode?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22485) Support client attach on application mode

2021-04-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332939#comment-17332939
 ] 

Jark Wu edited comment on FLINK-22485 at 4/27/21, 5:35 AM:
---

Sorry, [~zuston], I misunderstood your problem. Flink CLI already supports 
attached mode, you can refer: 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-attached


was (Author: jark):
Sorry, [~zuston], I misunderstand your problem. Flink CLI already supports 
attached mode, you can refer: 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-attached

> Support client attach on application mode
> -
>
> Key: FLINK-22485
> URL: https://issues.apache.org/jira/browse/FLINK-22485
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Junfan Zhang
>Priority: Major
>
> Now, client will not wait until job finish when using application mode.
> Can we support client attach on application mode?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-22294) Hive reading fail when getting file numbers on different filesystem nameservices

2021-04-26 Thread Rui Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331442#comment-17331442
 ] 

Rui Li edited comment on FLINK-22294 at 4/27/21, 5:33 AM:
--

Fixed in master: 6071f9686d2a7bfb154b1e7b1682b2cfee190922
Fixed in release-1.13: 7a4a18da90521fe3c0758d4579441c50907f368b


was (Author: lirui):
Fixed in master: 6071f9686d2a7bfb154b1e7b1682b2cfee190922
Fixed in release-1.13: TBD

> Hive reading fail when getting file numbers on different filesystem 
> nameservices
> 
>
> Key: FLINK-22294
> URL: https://issues.apache.org/jira/browse/FLINK-22294
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.12.2
>Reporter: Junfan Zhang
>Priority: Major
>  Labels: pull-request-available
>
> The same problem like https://issues.apache.org/jira/browse/FLINK-20710



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] lirui-apache closed pull request #15750: [FLINK-22294][hive] Hive reading fail when getting file numbers on different filesystem nameservices

2021-04-26 Thread GitBox


lirui-apache closed pull request #15750:
URL: https://github.com/apache/flink/pull/15750


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-22485) Support client attach on application mode

2021-04-26 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-22485:

Component/s: (was: Table SQL / Client)
 Command Line Client

> Support client attach on application mode
> -
>
> Key: FLINK-22485
> URL: https://issues.apache.org/jira/browse/FLINK-22485
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client
>Reporter: Junfan Zhang
>Priority: Major
>
> Now, client will not wait until job finish when using application mode.
> Can we support client attach on application mode?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] lirui-apache commented on pull request #15750: [FLINK-22294][hive] Hive reading fail when getting file numbers on different filesystem nameservices

2021-04-26 Thread GitBox


lirui-apache commented on pull request #15750:
URL: https://github.com/apache/flink/pull/15750#issuecomment-827324483


   Pushed to release-1.13 via 7a4a18da90521fe3c0758d4579441c50907f368b


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #15720: [FLINK-22289] Update JDBC XA sink docs

2021-04-26 Thread GitBox


curcur commented on a change in pull request #15720:
URL: https://github.com/apache/flink/pull/15720#discussion_r620850140



##
File path: docs/content/docs/connectors/datastream/jdbc.md
##
@@ -167,15 +177,24 @@ env
 ps.setDouble(4, t.price);
 ps.setInt(5, t.qty);
 },
-JdbcExecutionOptions.builder().build(),
+JdbcExecutionOptions.builder()
+.withMaxRetries(0)
+.build(),
 JdbcExactlyOnceOptions.defaults(),
 () -> {
 // create a driver-specific XA DataSource
+// The following example is for derby 
 EmbeddedXADataSource ds = new EmbeddedXADataSource();
 ds.setDatabaseName("my_db");
 return ds;
 });
 env.execute();
 ```
+Postgres XADataSource Example:
+```java
+PGXADataSource pgxaDataSource = new PGXADataSource();
+pgxaDataSource.setUrl(
+"jdbc:postgresql://localhost:5432/postgres");

Review comment:
   Yes, usually it should.
   
   My intention for this example is to make this example "runnable out of box"; 
That says if user copy-paste the example to its IDE, and has postgres installed 
with default set up, it would work.
   
   I installed postgre and used its default table as an example. The default 
one can be accessed without username and password (with default username and 
password).
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (FLINK-22092) HiveConf can cache hiveSiteURL from classpath and cause FileNotFoundException

2021-04-26 Thread Rui Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Li resolved FLINK-22092.

Resolution: Fixed

> HiveConf can cache hiveSiteURL from classpath and cause FileNotFoundException
> -
>
> Key: FLINK-22092
> URL: https://issues.apache.org/jira/browse/FLINK-22092
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Reporter: Rui Li
>Assignee: Rui Li
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.13.0
>
>
> It turns out that FLINK-19702 is incomplete and {{HiveConf}} may still 
> automatically load hive-site from classpath and set {{hiveSiteURL}} variable. 
> This can cause problems, e.g. create a HiveCatalog reading hive-site from 
> classpath, drop this catalog and also remove the hive-site file, create 
> another HiveCatalog with hive-conf-dir pointing to another location, the 2nd 
> HiveCatalog cannot be created because {{HiveConf}} has remembered the 
> hive-site location from the previous one and complains the file can no longer 
> be found.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink-web] loniecc commented on pull request #386: [FLINK-13679] Translate "Code Style - Preamble" page into Chinese

2021-04-26 Thread GitBox


loniecc commented on pull request #386:
URL: https://github.com/apache/flink-web/pull/386#issuecomment-827322217


   > @loniecc thanks for the contribution, will give it a review soon.
   
   thx,have reslove all , please review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-web] loniecc commented on a change in pull request #386: [FLINK-13679] Translate "Code Style - Preamble" page into Chinese

2021-04-26 Thread GitBox


loniecc commented on a change in pull request #386:
URL: https://github.com/apache/flink-web/pull/386#discussion_r620874828



##
File path: contributing/code-style-and-quality-preamble.zh.md
##
@@ -1,25 +1,23 @@
 ---
-title:  "Apache Flink Code Style and Quality Guide — Preamble"
+title:  "Apache Flink 代码风格和质量指南 — 序言"
 ---
 
 {% include code-style-navbar.zh.md %}
 
+这是一次为了保证那些被我们维护的代码和质量标准的尝试
 
-This is an attempt to capture the code and quality standard that we want to 
maintain.
+一次代码贡献(或者任何代码片段)可以从很多角度进行评价:一组评判标准是代码是否正确和高效。这需要正确且良好的解决逻辑或者算法问题。
 
-A code contribution (or any piece of code) can be evaluated in various ways: 
One set of properties is whether the code is correct and efficient. This 
requires solving the _logical or algorithmic problem_ correctly and well.
+另一组评判标准是代码是否使用了简洁的设计和架构,不管是通过概念分割实现了良好的结构,还是使用了简单易懂的代码。该评判标准需要良好的解决软件工程问题。一个好的解决方案需要代码是容易被测试的,可以被除了原作者之外的其他人维护的(因为突然中断之后再维护是非常困难的),同时还需要能够高效的迭代演进的。

Review comment:
   嗯,重新捋了一下,确实是 “破坏规范” 更好一些




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-web] loniecc commented on a change in pull request #386: [FLINK-13679] Translate "Code Style - Preamble" page into Chinese

2021-04-26 Thread GitBox


loniecc commented on a change in pull request #386:
URL: https://github.com/apache/flink-web/pull/386#discussion_r620874392



##
File path: contributing/code-style-and-quality-preamble.zh.md
##
@@ -1,25 +1,23 @@
 ---
-title:  "Apache Flink Code Style and Quality Guide — Preamble"
+title:  "Apache Flink 代码风格和质量指南 — 序言"
 ---
 
 {% include code-style-navbar.zh.md %}
 
+这是一次为了保证那些被我们维护的代码和质量标准的尝试
 
-This is an attempt to capture the code and quality standard that we want to 
maintain.
+一次代码贡献(或者任何代码片段)可以从很多角度进行评价:一组评判标准是代码是否正确和高效。这需要正确且良好的解决逻辑或者算法问题。
 
-A code contribution (or any piece of code) can be evaluated in various ways: 
One set of properties is whether the code is correct and efficient. This 
requires solving the _logical or algorithmic problem_ correctly and well.
+另一组评判标准是代码是否使用了简洁的设计和架构,不管是通过概念分割实现了良好的结构,还是使用了简单易懂的代码。该评判标准需要良好的解决软件工程问题。一个好的解决方案需要代码是容易被测试的,可以被除了原作者之外的其他人维护的(因为突然中断之后再维护是非常困难的),同时还需要能够高效的迭代演进的。
 
-Another set of properties is whether the code follows an intuitive design and 
architecture, whether it is well structured with right separation of concerns, 
and whether the code is easily understandable and makes its assumptions 
explicit. That set of properties requires solving the _software engineering 
problem_ well. A good solution implies that the code is easily testable, 
maintainable also by other people than the original authors (because it is 
harder to accidentally break), and efficient to evolve.
+不过第一组标准有相当客观的达成条件,相比之下要达到第二组评判标准更加困难,但是对于 Apache Flink 
这样的开源项目来说却非常重要。为了基础的代码能够邀请到更多开发者,为了的开源贡献能够更容易被开发者理解,同时也为了众多开发者同时开发时代码的健壮性,良好工程化的代码是至关重要的。对于良好的工程代码来说,更加容易保证代码的正确性和高效不会随着时间的推移受到影响
 
-While the first set of properties has rather objective approval criteria, the 
second set of properties is much harder to assess, but is of high importance 
for an open source project like Apache Flink. To make the code base inviting to 
many contributors, to make contributions easy to understand for developers that 
did not write the original code, and to make the code robust in the face of 
many contributions, well engineered code is crucial.[^1] For well engineered 
code, it is easier to keep it correct and fast over time.
+当然,本指南并不是一份如何写出良好的工程代码的全方位指导。有相当多的书籍尝试说明如何实现良好的代码。本指南仅仅尝试作为最佳实践的检查单,因为模式,反模式和常见错误我们都在开发Flink的时候遇见过。

Review comment:
   确实,已修改




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-22485) Support client attach on application mode

2021-04-26 Thread Junfan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332935#comment-17332935
 ] 

Junfan Zhang edited comment on FLINK-22485 at 4/27/21, 5:22 AM:


[~jark]Thanks for your reply.
Attach detailed info
version: 1.12.1
Component: Flink cli
execution mode: Flink batch

I want to know whether the appliation mode supports {{attach}} in flink cli, I 
think it is not related to sql client




was (Author: zuston):
Attach detailed info
version: 1.12.1
Component: Flink cli
execution mode: Flink batch

I want to know whether the appliation mode supports {{attach}} in flink cli, I 
think it is not related to sql client



> Support client attach on application mode
> -
>
> Key: FLINK-22485
> URL: https://issues.apache.org/jira/browse/FLINK-22485
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Reporter: Junfan Zhang
>Priority: Major
>
> Now, client will not wait until job finish when using application mode.
> Can we support client attach on application mode?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22485) Support client attach on application mode

2021-04-26 Thread Junfan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332935#comment-17332935
 ] 

Junfan Zhang commented on FLINK-22485:
--

Attach detailed info
version: 1.12.1
Component: Flink cli
execution mode: Flink batch

I want to know whether the appliation mode supports {{attach}} in flink cli, I 
think it is not related to sql client



> Support client attach on application mode
> -
>
> Key: FLINK-22485
> URL: https://issues.apache.org/jira/browse/FLINK-22485
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Reporter: Junfan Zhang
>Priority: Major
>
> Now, client will not wait until job finish when using application mode.
> Can we support client attach on application mode?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangwei1025 commented on a change in pull request #15746: [FLINK-17783][Table SQL/Ecosystem]Add array,map,row types support for orc row writer

2021-04-26 Thread GitBox


wangwei1025 commented on a change in pull request #15746:
URL: https://github.com/apache/flink/pull/15746#discussion_r620868599



##
File path: 
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/RowDataVectorizer.java
##
@@ -151,8 +160,114 @@ private static void setColumn(
 vector.set(rowId, timestamp);
 break;
 }
+case ARRAY:
+{
+ListColumnVector listColumnVector = (ListColumnVector) 
column;
+setColumn(rowId, listColumnVector, type, row, columnId);
+break;
+}
+case MAP:
+{
+MapColumnVector mapColumnVector = (MapColumnVector) column;
+setColumn(rowId, mapColumnVector, type, row, columnId);
+break;
+}
+case ROW:
+{
+StructColumnVector structColumnVector = 
(StructColumnVector) column;
+setColumn(rowId, structColumnVector, type, row, columnId);
+break;
+}
 default:
 throw new UnsupportedOperationException("Unsupported type: " + 
type);
 }
 }
+
+private static void setColumn(
+int rowId,
+ListColumnVector listColumnVector,
+LogicalType type,
+RowData row,
+int columnId) {
+ArrayData arrayData = row.getArray(columnId);
+ArrayType arrayType = (ArrayType) type;
+listColumnVector.lengths[rowId] = arrayData.size();
+listColumnVector.offsets[rowId] = listColumnVector.childCount;
+listColumnVector.childCount += listColumnVector.lengths[rowId];
+listColumnVector.child.ensureSize(
+listColumnVector.childCount, listColumnVector.offsets[rowId] 
!= 0);
+for (int i = 0; i < arrayData.size(); i++) {
+setColumn(
+(int) listColumnVector.offsets[rowId] + i,
+listColumnVector.child,
+arrayType.getElementType(),
+convert(arrayData, arrayType.getElementType()),

Review comment:
   Yes, it's uncessary. This happend in 
setColumn(int,MapColumnVector,LogicalType,RowData,int)  too. I fix them in this 
latest commit [hotfix]move convert out of loop




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15745: [FLINK-22304][table] Refactor some interfaces for TVF based window to…

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15745:
URL: https://github.com/apache/flink/pull/15745#issuecomment-826249839


   
   ## CI report:
   
   * 11130e794b07aa9215035e5c204766295a4f50fe Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17258)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] FuyaoLi2017 commented on pull request #15602: [FLINK-22264][docs] Fix misleading statement about Flink Job Cluster Kubernetes Support in Flink Architecture page

2021-04-26 Thread GitBox


FuyaoLi2017 commented on pull request #15602:
URL: https://github.com/apache/flink/pull/15602#issuecomment-827307615


   @tillrohrmann Hello Till, just have one small question, why the committer 
username of this PR appears as fuyli instead of @FuyaoLi2017 ?  fuyli is my 
alias for my company's laptop, hhh. A little bit confused here. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #15720: [FLINK-22289] Update JDBC XA sink docs

2021-04-26 Thread GitBox


curcur commented on a change in pull request #15720:
URL: https://github.com/apache/flink/pull/15720#discussion_r620850140



##
File path: docs/content/docs/connectors/datastream/jdbc.md
##
@@ -167,15 +177,24 @@ env
 ps.setDouble(4, t.price);
 ps.setInt(5, t.qty);
 },
-JdbcExecutionOptions.builder().build(),
+JdbcExecutionOptions.builder()
+.withMaxRetries(0)
+.build(),
 JdbcExactlyOnceOptions.defaults(),
 () -> {
 // create a driver-specific XA DataSource
+// The following example is for derby 
 EmbeddedXADataSource ds = new EmbeddedXADataSource();
 ds.setDatabaseName("my_db");
 return ds;
 });
 env.execute();
 ```
+Postgres XADataSource Example:
+```java
+PGXADataSource pgxaDataSource = new PGXADataSource();
+pgxaDataSource.setUrl(
+"jdbc:postgresql://localhost:5432/postgres");

Review comment:
   Yes, usually it should.
   
   My intention for this example is to make this example "runnable out of box"; 
That says if user copy-paste the example to its IDE, and has postgres installed 
with default set up, it would work.
   
   I installed postgre and `postgres` is the default table can be accessed 
without username and password or default (username and password).
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #15720: [FLINK-22289] Update JDBC XA sink docs

2021-04-26 Thread GitBox


curcur commented on a change in pull request #15720:
URL: https://github.com/apache/flink/pull/15720#discussion_r620846546



##
File path: docs/content/docs/connectors/datastream/jdbc.md
##
@@ -32,8 +32,15 @@ To use it, add the following dependency to your project 
(along with your JDBC dr
 
 {{< artifact flink-connector-jdbc withScalaVersion >}}
 
-Note that the streaming connectors are currently __NOT__ part of the binary 
distribution. See how to link with them for cluster execution [here]({{< ref 
"docs/dev/datastream/project-configuration" >}}).
+A driver dependency is also required to connect to a specified database. Here 
are drivers currently supported:
+
+| Driver  |  Group Id  |  Artifact Id   |  JAR 
|
+| :---| :--| :--| 
:|
+| MySQL   |   `mysql`  | `mysql-connector-java` | 
[Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
+| PostgreSQL  |  `org.postgresql`  |  `postgresql`  | 
[Download](https://jdbc.postgresql.org/download.html) |

Review comment:
   It is using the same link as 
   
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/
   
   So I would only say this link does not include extra legal issue




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on a change in pull request #15720: [FLINK-22289] Update JDBC XA sink docs

2021-04-26 Thread GitBox


curcur commented on a change in pull request #15720:
URL: https://github.com/apache/flink/pull/15720#discussion_r620846546



##
File path: docs/content/docs/connectors/datastream/jdbc.md
##
@@ -32,8 +32,15 @@ To use it, add the following dependency to your project 
(along with your JDBC dr
 
 {{< artifact flink-connector-jdbc withScalaVersion >}}
 
-Note that the streaming connectors are currently __NOT__ part of the binary 
distribution. See how to link with them for cluster execution [here]({{< ref 
"docs/dev/datastream/project-configuration" >}}).
+A driver dependency is also required to connect to a specified database. Here 
are drivers currently supported:
+
+| Driver  |  Group Id  |  Artifact Id   |  JAR 
|
+| :---| :--| :--| 
:|
+| MySQL   |   `mysql`  | `mysql-connector-java` | 
[Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
+| PostgreSQL  |  `org.postgresql`  |  `postgresql`  | 
[Download](https://jdbc.postgresql.org/download.html) |

Review comment:
   It is using the same link as 
   
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/
   
   So my take is `yes`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #15720: [FLINK-22289] Update JDBC XA sink docs

2021-04-26 Thread GitBox


curcur commented on pull request #15720:
URL: https://github.com/apache/flink/pull/15720#issuecomment-827302091


   > Thanks for the PR @curcur.
   > 
   > Should we also remove this warning:
   > 
   > > Attention: In 1.13, Flink JDBC sink does not support exactly-once mode 
with MySQL or other databases that do not support
   > > multiple XA transaction per connection. We will improve the support in 
FLINK-22239.
   > > ?
   
   That's a good point.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-04-26 Thread GitBox


xintongsong commented on pull request #15599:
URL: https://github.com/apache/flink/pull/15599#issuecomment-827297699


   @rmetzger, thanks for help looking into this. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-04-26 Thread GitBox


xintongsong commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r620837563



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/** The committer for the GS recoverable writer. */
+class GSRecoverableWriterCommitter implements 
RecoverableFsDataOutputStream.Committer {
+
+/** The underlying blob storage. */
+private final BlobStorage storage;
+
+/** The GS file system options. */
+private final GSFileSystemOptions options;
+
+/** The recoverable writer instance. */
+private final GSRecoverableWriter writer;
+
+/** The recoverable writer state for the commit operation. */
+private final GSRecoverableWriterState state;
+
+GSRecoverableWriterCommitter(
+BlobStorage storage,
+GSFileSystemOptions options,
+GSRecoverableWriter writer,
+GSRecoverableWriterState state) {
+this.storage = Preconditions.checkNotNull(storage);
+this.options = Preconditions.checkNotNull(options);
+this.writer = Preconditions.checkNotNull(writer);
+this.state = Preconditions.checkNotNull(state);
+}
+
+@Override
+public void commit() throws IOException {
+
+// compose all the component blob ids into the final blob id. if the 
component blob ids are
+// in the same bucket as the final blob id, this can be done directly. 
otherwise, we must
+// compose to a new temporary blob id in the same bucket as the 
component blob ids and
+// then copy that blob to the final blob location
+if 
(state.finalBlobId.getBucket().equals(state.getTemporaryBucketName(options))) {
+
+// compose directly to final blob
+composeBlobs(
+state.getComponentBlobIds(options),
+state.finalBlobId,
+options.writerContentType);
+
+} else {
+
+// compose to a temporary blob id, then copy to final blob id
+BlobId intermediateBlobId = state.createTemporaryBlobId(options);
+composeBlobs(
+state.getComponentBlobIds(options),
+intermediateBlobId,
+options.writerContentType);
+storage.copy(intermediateBlobId, state.finalBlobId);
+}
+
+// clean up after commit
+writer.cleanupRecoverableState(state);

Review comment:
   On second thought, if we cannot clean anything in 
`cleanupRecoverableState`, it might be our only chance to clean the temporary 
blobs on committing. This might have a higher priority than supporting manually 
recover from an early checkpoint.
   
   I think the following issues are closely related and really need to be 
clarified consistently.
   - What are the relationships between snapshots (resumables) and temporary 
blobs
   - Actions for `cleanupRecoverableState`
   - Actions for `commit`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-22487) Support `print` to print logs in PyFlink

2021-04-26 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo updated FLINK-22487:
-
Description: 
Currently, if users want to print logs, they need to use logging module.

{code:python}
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
import logging
logging.info("debug")
return i + j
{code}
It will be more convenient to use `print` to print logs.

{code: python}
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
print("debug")
return i + j
{code}


  was:
Currently, if users want to print logs, they need to use logging module.

{code:python}
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
import logging
logging.info("debug")
return i + j
{code}

It will be more convenient to use `print` to print logs.
{code: python}
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
print("debug")
return i + j
{code}



> Support `print` to print logs in PyFlink
> 
>
> Key: FLINK-22487
> URL: https://issues.apache.org/jira/browse/FLINK-22487
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.14.0
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
> Fix For: 1.14.0
>
>
> Currently, if users want to print logs, they need to use logging module.
> {code:python}
> @udf(result_type=DataTypes.BIGINT())
> def add(i, j):
> import logging
> logging.info("debug")
> return i + j
> {code}
> It will be more convenient to use `print` to print logs.
> {code: python}
> @udf(result_type=DataTypes.BIGINT())
> def add(i, j):
> print("debug")
> return i + j
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] lirui-apache commented on a change in pull request #15746: [FLINK-17783][Table SQL/Ecosystem]Add array,map,row types support for orc row writer

2021-04-26 Thread GitBox


lirui-apache commented on a change in pull request #15746:
URL: https://github.com/apache/flink/pull/15746#discussion_r620826305



##
File path: 
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/RowDataVectorizer.java
##
@@ -151,8 +160,114 @@ private static void setColumn(
 vector.set(rowId, timestamp);
 break;
 }
+case ARRAY:
+{
+ListColumnVector listColumnVector = (ListColumnVector) 
column;
+setColumn(rowId, listColumnVector, type, row, columnId);
+break;
+}
+case MAP:
+{
+MapColumnVector mapColumnVector = (MapColumnVector) column;
+setColumn(rowId, mapColumnVector, type, row, columnId);
+break;
+}
+case ROW:
+{
+StructColumnVector structColumnVector = 
(StructColumnVector) column;
+setColumn(rowId, structColumnVector, type, row, columnId);
+break;
+}
 default:
 throw new UnsupportedOperationException("Unsupported type: " + 
type);
 }
 }
+
+private static void setColumn(
+int rowId,
+ListColumnVector listColumnVector,
+LogicalType type,
+RowData row,
+int columnId) {
+ArrayData arrayData = row.getArray(columnId);
+ArrayType arrayType = (ArrayType) type;
+listColumnVector.lengths[rowId] = arrayData.size();
+listColumnVector.offsets[rowId] = listColumnVector.childCount;
+listColumnVector.childCount += listColumnVector.lengths[rowId];
+listColumnVector.child.ensureSize(
+listColumnVector.childCount, listColumnVector.offsets[rowId] 
!= 0);
+for (int i = 0; i < arrayData.size(); i++) {
+setColumn(
+(int) listColumnVector.offsets[rowId] + i,
+listColumnVector.child,
+arrayType.getElementType(),
+convert(arrayData, arrayType.getElementType()),

Review comment:
   Move this out of the loop so that we don't do this for each field?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-22487) Support `print` to print logs in PyFlink

2021-04-26 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-22487:


 Summary: Support `print` to print logs in PyFlink
 Key: FLINK-22487
 URL: https://issues.apache.org/jira/browse/FLINK-22487
 Project: Flink
  Issue Type: New Feature
  Components: API / Python
Affects Versions: 1.14.0
Reporter: Huang Xingbo
Assignee: Huang Xingbo
 Fix For: 1.14.0


Currently, if users want to print logs, they need to use logging module.

{code:python}
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
import logging
logging.info("debug")
return i + j
{code}

It will be more convenient to use `print` to print logs.
{code: python}
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
print("debug")
return i + j
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22485) Support client attach on application mode

2021-04-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332912#comment-17332912
 ] 

Jark Wu commented on FLINK-22485:
-

Which version are you using [~zuston]? We just supported sync/async mode in the 
1.13 release, could you try the 1.13 rc2 [1]? And here is the documentation: 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-dml-statements-syncasync


[1]: https://dist.apache.org/repos/dist/dev/flink/flink-1.13.0-rc2/



> Support client attach on application mode
> -
>
> Key: FLINK-22485
> URL: https://issues.apache.org/jira/browse/FLINK-22485
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Reporter: Junfan Zhang
>Priority: Major
>
> Now, client will not wait until job finish when using application mode.
> Can we support client attach on application mode?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22485) Support client attach on application mode

2021-04-26 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-22485:

Component/s: Table SQL / Client

> Support client attach on application mode
> -
>
> Key: FLINK-22485
> URL: https://issues.apache.org/jira/browse/FLINK-22485
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Reporter: Junfan Zhang
>Priority: Major
>
> Now, client will not wait until job finish when using application mode.
> Can we support client attach on application mode?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong commented on a change in pull request #15755: [FLINK-22318][table] Support RENAME column name for ALTER TABLE state…

2021-04-26 Thread GitBox


wuchong commented on a change in pull request #15755:
URL: https://github.com/apache/flink/pull/15755#discussion_r620825538



##
File path: 
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
##
@@ -478,6 +481,17 @@ SqlAlterTable SqlAlterTable() :
 tableIdentifier,
 newTableIdentifier);
 }
+|
+
+originColumnName = SimpleIdentifier()
+
+newColumnName = SimpleIdentifier()
+{
+return new SqlAlterTableRenameColumn(
+startPos.plus(getPos()),
+tableIdentifier,
+originColumnName,newColumnName);

Review comment:
   nit: Please make them in separate lines. 

##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##
@@ -144,6 +149,116 @@ public static Operation convertChangeColumn(
 // TODO: handle watermark and constraints
 }
 
+public static Operation convertRenameColumn(
+ObjectIdentifier tableIdentifier,
+String originColumnName,
+String newColumnName,
+CatalogTable catalogTable) {
+
+Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();

Review comment:
   `modifiedTableSchema` sounds like this schema has been modified, would 
be better to call `originSchema`. 

##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##
@@ -144,6 +149,116 @@ public static Operation convertChangeColumn(
 // TODO: handle watermark and constraints
 }
 
+public static Operation convertRenameColumn(
+ObjectIdentifier tableIdentifier,
+String originColumnName,
+String newColumnName,
+CatalogTable catalogTable) {
+
+Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();
+validateColumnName(originColumnName, newColumnName, 
modifiedTableSchema);
+
+Schema.Builder builder = Schema.newBuilder();
+// build column
+modifiedTableSchema.getColumns().stream()
+.forEach(
+column -> {
+if (StringUtils.equals(column.getName(), 
originColumnName)) {
+buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
+} else {
+buildNewColumnFromOriginColumn(builder, 
column, column.getName());
+}
+});
+// build primary key column
+List originPrimaryKeyNames =
+modifiedTableSchema
+.getPrimaryKey()
+.map(Schema.UnresolvedPrimaryKey::getColumnNames)
+.orElseGet(Lists::newArrayList);
+
+List newPrimaryKeyNames =
+originPrimaryKeyNames.stream()
+.map(
+pkName ->
+StringUtils.equals(pkName, 
originColumnName)
+? newColumnName
+: pkName)
+.collect(Collectors.toList());
+
+if (newPrimaryKeyNames.size() > 0) {
+builder.primaryKey(newPrimaryKeyNames);
+}
+// build watermark
+modifiedTableSchema.getWatermarkSpecs().stream()
+.forEach(
+watermarkSpec -> {
+String watermarkRefColumnName = 
watermarkSpec.getColumnName();
+Expression watermarkExpression = 
watermarkSpec.getWatermarkExpression();
+if (StringUtils.equals(watermarkRefColumnName, 
originColumnName)) {
+String newWatermarkExpression =
+((SqlCallExpression) 
watermarkExpression)
+.getSqlExpression()
+
.replace(watermarkRefColumnName, newColumnName);

Review comment:
   1. We can't guarantee this is always `SqlCallExpression`. 
   2. We can't use String `replace` for renaming, this is very error-prone, 
e.g. the original expressions is `f123 - f1` and `f1` is rename to `f2`, then 
the replaced result would be `f223 - f2` which is wrong.  
   
   If we don't have a good way to replace column names in expression, we can 
disallow rename for columns referenced in expressions in the first version. 

##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##
@@ -144,6 +149,116 @@ public static Operation convertChangeColumn(
 // 

[jira] [Comment Edited] (FLINK-22408) Flink Table Parsr Hive Drop Partitions Syntax unparse is Error

2021-04-26 Thread Ma Jun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332910#comment-17332910
 ] 

Ma Jun edited comment on FLINK-22408 at 4/27/21, 3:54 AM:
--

Hi [~lirui] Thank!I got it! I know that if there is a subsequent release of 
version 1.13, I will switch the version of the existing business. I have a 
business requirement here: I use the module SQL translation in Flink to do some 
front-end SQL verification and formatting, because I found that the syntax of 
hive is inconsistent when converting dialects. There may be misunderstandings 
among users. So I think we should keep the syntax consistent with hive.


was (Author: aidenma):
Hi [~lirui] Thank!I got it! I know that if there is a subsequent release of 
version 1.13, I will switch the version of the existing business. I have a 
business requirement here: I use the module SQL translation in Flink to do some 
front-end SQL verification and formatting, because I found that the syntax of 
hive is inconsistent when converting dialects. So I think we should keep the 
syntax consistent with hive.

> Flink Table Parsr Hive Drop Partitions Syntax unparse is Error
> --
>
> Key: FLINK-22408
> URL: https://issues.apache.org/jira/browse/FLINK-22408
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.11.3
>Reporter: Ma Jun
>Priority: Major
>  Labels: pull-request-available
>
> Flink Table Parser is error:
> *Synopsis:*
>  
> *SQL:*
> {code:java}
> alter table tbl drop partition (p1='a',p2=1), partition(p1='b',p2=2);{code}
> *hive muit partition unparse toSqlString is :*
> {code:java}
> ALTER TABLE `TBL`\n" +
>   "DROP\n" +
>   "PARTITION (`P1` = 'a', `P2` = 1)\n" +
>   "PARTITION (`P1` = 'b', `P2` = 2)
> {code}
> Missing comma in Partition SqlNodeList 
>  Hive syntax:
> {code:java}
> ALTER TABLE table_name DROP [IF EXISTS] PARTITION (partition_spec) [, 
> PARTITION (partition_spec)];
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15131: [FLINK-21700][yarn]Allow to disable fetching Hadoop delegation token on Yarn

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15131:
URL: https://github.com/apache/flink/pull/15131#issuecomment-794957907


   
   ## CI report:
   
   * 0500d84f78d8fdb967b077fb024fda0cd4ba2d6a Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15682)
 
   * b99e1a12c74051454d211b0094c986b8741dd28f Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17261)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15121: The method $(String) is undefined for the type TableExample

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15121:
URL: https://github.com/apache/flink/pull/15121#issuecomment-793480240


   
   ## CI report:
   
   * cf26ce895a7956d258acc073817f578558e78227 UNKNOWN
   * 709c4110370b845f42d916a06e56c6026cf2fac8 UNKNOWN
   * 4ea99332e1997eebca1f3f0a9d9229b8265fe32c UNKNOWN
   * 9a256ec6b644a0f795376076f4e3ff4e85aaad1e UNKNOWN
   * 76bbb8de288e2990974fe9ea7a09fb47e37daad8 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17220)
 
   * fbd38b53197c244af5ed3af54cebdee75b393a1d Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17257)
 
   * b7dca63a2e98f09c8c780a21091b5268d897b220 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22408) Flink Table Parsr Hive Drop Partitions Syntax unparse is Error

2021-04-26 Thread Ma Jun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332910#comment-17332910
 ] 

Ma Jun commented on FLINK-22408:


Hi [~lirui] Thank!I got it! I know that if there is a subsequent release of 
version 1.13, I will switch the version of the existing business. I have a 
business requirement here: I use the module SQL translation in Flink to do some 
front-end SQL verification and formatting, because I found that the syntax of 
hive is inconsistent when converting dialects. So I think we should keep the 
syntax consistent with hive.

> Flink Table Parsr Hive Drop Partitions Syntax unparse is Error
> --
>
> Key: FLINK-22408
> URL: https://issues.apache.org/jira/browse/FLINK-22408
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.11.3
>Reporter: Ma Jun
>Priority: Major
>  Labels: pull-request-available
>
> Flink Table Parser is error:
> *Synopsis:*
>  
> *SQL:*
> {code:java}
> alter table tbl drop partition (p1='a',p2=1), partition(p1='b',p2=2);{code}
> *hive muit partition unparse toSqlString is :*
> {code:java}
> ALTER TABLE `TBL`\n" +
>   "DROP\n" +
>   "PARTITION (`P1` = 'a', `P2` = 1)\n" +
>   "PARTITION (`P1` = 'b', `P2` = 2)
> {code}
> Missing comma in Partition SqlNodeList 
>  Hive syntax:
> {code:java}
> ALTER TABLE table_name DROP [IF EXISTS] PARTITION (partition_spec) [, 
> PARTITION (partition_spec)];
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-21513) Rethink up-/down-/restartingTime metrics

2021-04-26 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17329947#comment-17329947
 ] 

Steven Zhen Wu edited comment on FLINK-21513 at 4/27/21, 3:41 AM:
--

[~trohrmann] thanks for tagging me here. 

Yeah, availability can be a little tricky concept for Flink application. Users 
typically ask what is the availability / uptime for my Flink application (e.g. 
4 nines). For micro services, availability can be measured as success/failure 
rate in a naive way (there are more sophisticated and probably accurate ways). 
How do we define availability for Flink? Current uptime metric doesn't capture 
availability.

Also availability probably can be captured in different time scales (last hour, 
last 4 hours, last 12 hours, last 24 hours, last week etc.).


was (Author: stevenz3wu):
[~trohrmann] thanks for tagging me here. 

Yeah, availability can be a little tricky concept for Flink application. Users 
typically ask what is the availability / uptime for my Flink application (4 
nines). E.g., for micro services, availability can be measured as 
success/failure rate in a naive way (there are more sophisticated and probably 
accurate ways). How do we define availability for Flink? uptime doesn't capture 
availability.

Also availability probably can be captured in different time scales (last hour, 
last 4 hours, last 12 hours, last 24 hours, last week etc.).

> Rethink up-/down-/restartingTime metrics
> 
>
> Key: FLINK-21513
> URL: https://issues.apache.org/jira/browse/FLINK-21513
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / Metrics
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: stale-major
> Fix For: 1.13.0
>
>
> While thinking about FLINK-21510 I stumbled upon some issues in the the 
> semantics of these metrics, both from a user perspective and from our own, 
> and I think we need to clarify some things.
> h4. upTime
> This metric describes the time since the job transitioned RUNNING state.
> It is meant as a measure for how stably a deployment is.
> In the default scheduler this transitions happens before we do any actual 
> scheduling work, and as a result this also includes the time it takes for the 
> JM to request slots and deploy tasks. In practive this means we start the 
> timer once the job has been submitted and the JobMaster/Scheduler/EG have 
> been initialized.
> For the adaptive scheduler this now puts us a bit into an odd situation 
> because it first acquires slots before actually transitioning the EG into a 
> RUNNING state, so as is we'd end up measuring 2 slightly different things.
> The question now is whether this is a problem.
> While we could certainly stick with the definition of "time since EG switched 
> to RUNNING", it raises the question what the semantics of this metric are 
> should a scheduler use a different data-structure than the EG.
> In other words, what I'm looking for is a definition that is independent from 
> existing data-structures; a crude example could be "The time since the job is 
> in a state where the deployment of a task is possible.".
> An alternative for the adaptive scheduler would be to measure the time since 
> we transitioned to WaitingForResources, with which we would also include the 
> slot acquisition, but it would be inconsistent with the logs and UI (because 
> they only display an INITIALIZING job).
> h4. restartingTime
> This metric describes the time since the job transitioned into a RESTARTING 
> state.
> It is meant as a measure for how long the recovery in case of a job failure 
> takes.
> In the default scheduler this in practice is the time between a failure 
> arriving at the JM and the cancellation of tasks being completed / restart 
> backoff (whichever is higher).
> This is consistent with the semantics of the upTime metric, because upTime 
> also includes the time required for acquiring slots and deploying tasks.
> For the adaptive scheduler we can follow similar semantics, by measuring the 
> time we spend in the {{Restarting}} state.
> However, if we stick to the definition of upTime as time spent in RUNNING, 
> then we will end up with a gap for the time spent in WaitingForResources.
> h4. downTime
> This metric describes the time between the job transitioning from FAILING to 
> RUNNING.
> It is meant as a measure for how long the recovery in case of a job failure 
> takes.
> You may be wondering what the difference between {{downTime}} and 
> {{restartingTime}} is meant to be. Unfortunately I do not have the answer to 
> that.
> Presumably, at the time they were added, they were covering different parts 
> of the recovery process, but since we never documented these steps explicitly 
> the exact semantics are 

[GitHub] [flink] ZhangChaoming removed a comment on pull request #15749: [hotfix][docs] Fix typo.

2021-04-26 Thread GitBox


ZhangChaoming removed a comment on pull request #15749:
URL: https://github.com/apache/flink/pull/15749#issuecomment-826467604


   @rmetzger Excuse me? The pipeline failed for the reason that 
`table.local-time-zone` does not exist. But I did not modify any about this 
option.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-04-26 Thread GitBox


xintongsong commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r620812109



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+private static final String SCHEME = "gs";
+
+private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+private static final String FLINK_SHADING_PREFIX = "";
+
+public static final ConfigOption WRITER_TEMPORARY_BUCKET_NAME =
+ConfigOptions.key("gs.writer.temporary.bucket.name")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+.withDescription(
+"This option sets the bucket name used by the 
recoverable writer to store temporary files. "
++ "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+public static final ConfigOption WRITER_TEMPORARY_OBJECT_PREFIX =
+ConfigOptions.key("gs.writer.temporary.object.prefix")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+.withDescription(
+"This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
++ "final object name to form the base name 
for temporary files.");

Review comment:
   I'd be in favor of not introducing this option if it's not absolutely 
necessary. The more options we provide to the users, the more constraints we 
put on future developing and maintaining.
   
   As for including the bucket names in temporary blob names, I think that's a 
good idea. Since the temporary blobs are not meant to be manipulated by users 
directly, it would be nice to carry more information with the blob names.
   
   Moreover, I wonder whether we should also include the index of temporary 
blobs, in addition to the UUID. That might be useful for cleaning up the 
resumables. If in future we want to combine the temporary blobs in advance 
instead of all in the last, it might be helpful to understand a blob with name 
`xxx-1-32-xxx` contains all the content from `xxx-1-1-xxx` to `xxx-32-32-xxx`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15763: [FLINK-22364][doc] Translate the page of "Data Sources" to Chinese.

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15763:
URL: https://github.com/apache/flink/pull/15763#issuecomment-826589720


   
   ## CI report:
   
   * 03b1c6bc999ee92a23eb0a0b84a368f2643e841d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17201)
 
   * 0dcb773365ec16118d2b4c0f55910441bc6d7d58 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17259)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22460) Conversion to relational algebra failed due to NOT NULL modifier

2021-04-26 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332902#comment-17332902
 ] 

Shengkai Fang commented on FLINK-22460:
---

It seems the input schema is not as same as the sink schema. You can specify 
the type of the column `number` in the table `table` is `BIGINT NOT NULL`.

> Conversion to relational algebra failed due to NOT NULL modifier
> 
>
> Key: FLINK-22460
> URL: https://issues.apache.org/jira/browse/FLINK-22460
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.1
>Reporter: Haiwei Zhou
>Priority: Major
>
> Flink complains that an insert sql doesn't match the table schema. The 
> validated type is missing a "NOT NULL" modifier.
>  
>  
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o18.executeSql.
> : java.lang.AssertionError: Conversion to relational algebra failed to 
> preserve datatypes:
> validated type:
> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" request, CHAR(7) 
> CHARACTER SET "UTF-16LE" NOT NULL EXPR$1, BIGINT number, TIMESTAMP(3) 
> start_time, TIMESTAMP(3) end_time) NOT NULL
> converted type:
> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" request, CHAR(7) 
> CHARACTER SET "UTF-16LE" NOT NULL EXPR$1, BIGINT NOT NULL number, 
> TIMESTAMP(3) start_time, TIMESTAMP(3) end_time) NOT
>  NULL{code}
>  
>  
> {code:java}
> table_env.execute_sql('''
> CREATE TABLE preload_stats (
>  lineitems STRING,
>  itype STRING,
>  number BIGINT NOT NULL,
>  start_time TIMESTAMP(3),
>  end_time TIMESTAMP(3)
> )'''
>  
> table_env.execute_sql(
>  "SELECT request, 'request', number, start_time, end_time "
>  "FROM result_1 ").execute_insert('preload_stats')
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-04-26 Thread GitBox


xintongsong commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r620824056



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriter.java
##
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+
+/** The recoverable writer implementation for Google storage. */
+public class GSRecoverableWriter implements RecoverableWriter {
+
+/** The underlying blob storage. */
+private final BlobStorage storage;
+
+/** The GS file system options. */
+private final GSFileSystemOptions options;
+
+/**
+ * Construct a GS recoverable writer.
+ *
+ * @param storage The underlying blob storage instance
+ * @param options The GS file system options
+ */
+public GSRecoverableWriter(BlobStorage storage, GSFileSystemOptions 
options) {
+this.storage = Preconditions.checkNotNull(storage);
+this.options = Preconditions.checkNotNull(options);
+}
+
+@Override
+public boolean requiresCleanupOfRecoverableState() {
+return true;
+}
+
+@Override
+public boolean supportsResume() {
+return true;
+}
+
+@Override
+public RecoverableFsDataOutputStream open(Path path) throws IOException {
+Preconditions.checkNotNull(path);
+
+BlobId finalBlobId = BlobUtils.parseUri(path.toUri());
+GSRecoverableWriterState state = new 
GSRecoverableWriterState(finalBlobId);
+return new GSRecoverableFsDataOutputStream(storage, options, this, 
state);
+}
+
+@Override
+public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) 
throws IOException {
+Preconditions.checkNotNull(resumable);
+
+GSRecoverableWriterState state = (GSRecoverableWriterState) resumable;
+return new GSRecoverableFsDataOutputStream(storage, options, this, 
state);
+}
+
+@Override
+public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws 
IOException {
+Preconditions.checkNotNull(resumable);
+
+// determine the partial name for the temporary objects to be deleted
+GSRecoverableWriterState state = (GSRecoverableWriterState) resumable;
+String temporaryBucketName = state.getTemporaryBucketName(options);
+String temporaryObjectPartialName = 
state.getTemporaryObjectPartialName(options);
+
+// this will hold the set of blob ids that were actually deleted
+HashSet deletedBlobIds = new HashSet<>();
+
+// find all the temp blobs by looking for anything that starts with 
the temporary
+// object partial name. doing it this way finds any orphaned temp 
blobs that might
+// have come about when resuming
+List foundTempBlobIds =
+storage.list(temporaryBucketName, temporaryObjectPartialName);
+if (!foundTempBlobIds.isEmpty()) {
+
+// delete all the temp blobs, and populate the set with ones that 
were actually deleted
+// normalize in case the blob came back with a generation populated
+List deleteResults = storage.delete(foundTempBlobIds);
+for (int i = 0; i < deleteResults.size(); i++) {
+if (deleteResults.get(i)) {
+
deletedBlobIds.add(BlobUtils.normalizeBlobId(foundTempBlobIds.get(i)));
+}
+}
+}

Review comment:
   Contract wise, `cleanupRecoverableState` is expected to clean-up for a 
resumable rather than a target file being written. Once a checkpoint is 
successfully 

[jira] [Created] (FLINK-22486) Wrong results of the IN operator

2021-04-26 Thread Shengkai Fang (Jira)
Shengkai Fang created FLINK-22486:
-

 Summary: Wrong results of the IN operator
 Key: FLINK-22486
 URL: https://issues.apache.org/jira/browse/FLINK-22486
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.13.0
Reporter: Shengkai Fang


Please add the following test in the {{CalcITCase}}.
{code:java}
@Test
  def testSimpleProject(): Unit = {
val myTableDataId = TestValuesTableFactory.registerData(Seq(row("HC809")))
val ddl =
  s"""
 |CREATE TABLE SimpleTable (
 |  content String
 |) WITH (
 |  'connector' = 'values',
 |  'data-id' = '$myTableDataId',
 |  'bounded' = 'true'
 |)
 |""".stripMargin
tEnv.executeSql(ddl)

val sql =
  """
|SELECT content from SimpleTable where content in (
|'CTNBSmokeSensor','H388N',
|'H389N',
|'GHL-IRD','JY-BF-20YN','HC809',
|'DH-9908N-AEP','DH-9908N'
|)
|
|""".stripMargin

checkResult(
  sql,
  Seq(row("HC809"))
)
  }
{code}

It should return the result but nothing return



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong commented on a change in pull request #15768: [FLINK-22451][table] Support (*) as parameter of UDFs in Table API

2021-04-26 Thread GitBox


wuchong commented on a change in pull request #15768:
URL: https://github.com/apache/flink/pull/15768#discussion_r620820408



##
File path: docs/content/docs/dev/table/functions/udfs.md
##
@@ -171,6 +171,52 @@ env.createTemporarySystemFunction("SubstringFunction", new 
SubstringFunction(tru
 {{< /tab >}}
 {{< /tabs >}}
 
+You can use star (*) as one argument of the function call to act as a wildcard 
in Table API,
+all columns in the table will be passed to the function at the corresponding 
position.
+
+{{< tabs "64dd4129-6313-4904-b7e7-a1a0535822e9" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+public static class JoinFunction extends ScalarFunction {
+
+  public String eval(String f0, String f1) {
+return f0 + f1;
+  }
+}
+
+TableEnvironment env = TableEnvironment.create(...);
+
+// call function with $("*"), if MyTable has two string fields,
+// all of them will be passed to JoinFunction.
+env.from("MyTable").select(call(JoinFunction.class, $("*")));

Review comment:
   I would suggest to show a more meaningful example here, e.g. `concat`? 
   
   ```java
   public static class MyConcatFunction extends ScalarFunction {
   public String eval(Object... fields) {
   return Arrays.stream(fields)
   .map(Object::toString)
   .collect(Collectors.joining(","));
   }
   }
   TableEnvironment env = TableEnvironment.create(...);
   
   // call function with $("*"), if MyTable has 3 fields (a, b, c), 
   // all of them will be passed to JoinFunction.
   env.from("MyTable").select(call(MyConcatFunction.class, $("*")));
   
   // it's equal to call function with explicitly selecting all columns
   env.from("MyTable").select(call(MyConcatFunction.class, $("a"), $("b"), 
$("c")));
   ```

##
File path: docs/content/docs/dev/table/functions/udfs.md
##
@@ -171,6 +171,52 @@ env.createTemporarySystemFunction("SubstringFunction", new 
SubstringFunction(tru
 {{< /tab >}}
 {{< /tabs >}}
 
+You can use star (*) as one argument of the function call to act as a wildcard 
in Table API,
+all columns in the table will be passed to the function at the corresponding 
position.
+
+{{< tabs "64dd4129-6313-4904-b7e7-a1a0535822e9" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+public static class JoinFunction extends ScalarFunction {
+
+  public String eval(String f0, String f1) {
+return f0 + f1;
+  }
+}
+
+TableEnvironment env = TableEnvironment.create(...);
+
+// call function with $("*"), if MyTable has two string fields,
+// all of them will be passed to JoinFunction.
+env.from("MyTable").select(call(JoinFunction.class, $("*")));

Review comment:
   And please also add an IT case for this to make sure this is supported 
end-to-end. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22485) Support client attach on application mode

2021-04-26 Thread Junfan Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332900#comment-17332900
 ] 

Junfan Zhang commented on FLINK-22485:
--

Any ideas on it? [~jark] [~lirui]

> Support client attach on application mode
> -
>
> Key: FLINK-22485
> URL: https://issues.apache.org/jira/browse/FLINK-22485
> Project: Flink
>  Issue Type: Improvement
>Reporter: Junfan Zhang
>Priority: Major
>
> Now, client will not wait until job finish when using application mode.
> Can we support client attach on application mode?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22485) Support client attach on application mode

2021-04-26 Thread Junfan Zhang (Jira)
Junfan Zhang created FLINK-22485:


 Summary: Support client attach on application mode
 Key: FLINK-22485
 URL: https://issues.apache.org/jira/browse/FLINK-22485
 Project: Flink
  Issue Type: Improvement
Reporter: Junfan Zhang


Now, client will not wait until job finish when using application mode.
Can we support client attach on application mode?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-04-26 Thread GitBox


xintongsong commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r620824056



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriter.java
##
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.fs.gs.storage.BlobStorage;
+import org.apache.flink.fs.gs.utils.BlobUtils;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+
+/** The recoverable writer implementation for Google storage. */
+public class GSRecoverableWriter implements RecoverableWriter {
+
+/** The underlying blob storage. */
+private final BlobStorage storage;
+
+/** The GS file system options. */
+private final GSFileSystemOptions options;
+
+/**
+ * Construct a GS recoverable writer.
+ *
+ * @param storage The underlying blob storage instance
+ * @param options The GS file system options
+ */
+public GSRecoverableWriter(BlobStorage storage, GSFileSystemOptions 
options) {
+this.storage = Preconditions.checkNotNull(storage);
+this.options = Preconditions.checkNotNull(options);
+}
+
+@Override
+public boolean requiresCleanupOfRecoverableState() {
+return true;
+}
+
+@Override
+public boolean supportsResume() {
+return true;
+}
+
+@Override
+public RecoverableFsDataOutputStream open(Path path) throws IOException {
+Preconditions.checkNotNull(path);
+
+BlobId finalBlobId = BlobUtils.parseUri(path.toUri());
+GSRecoverableWriterState state = new 
GSRecoverableWriterState(finalBlobId);
+return new GSRecoverableFsDataOutputStream(storage, options, this, 
state);
+}
+
+@Override
+public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) 
throws IOException {
+Preconditions.checkNotNull(resumable);
+
+GSRecoverableWriterState state = (GSRecoverableWriterState) resumable;
+return new GSRecoverableFsDataOutputStream(storage, options, this, 
state);
+}
+
+@Override
+public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws 
IOException {
+Preconditions.checkNotNull(resumable);
+
+// determine the partial name for the temporary objects to be deleted
+GSRecoverableWriterState state = (GSRecoverableWriterState) resumable;
+String temporaryBucketName = state.getTemporaryBucketName(options);
+String temporaryObjectPartialName = 
state.getTemporaryObjectPartialName(options);
+
+// this will hold the set of blob ids that were actually deleted
+HashSet deletedBlobIds = new HashSet<>();
+
+// find all the temp blobs by looking for anything that starts with 
the temporary
+// object partial name. doing it this way finds any orphaned temp 
blobs that might
+// have come about when resuming
+List foundTempBlobIds =
+storage.list(temporaryBucketName, temporaryObjectPartialName);
+if (!foundTempBlobIds.isEmpty()) {
+
+// delete all the temp blobs, and populate the set with ones that 
were actually deleted
+// normalize in case the blob came back with a generation populated
+List deleteResults = storage.delete(foundTempBlobIds);
+for (int i = 0; i < deleteResults.size(); i++) {
+if (deleteResults.get(i)) {
+
deletedBlobIds.add(BlobUtils.normalizeBlobId(foundTempBlobIds.get(i)));
+}
+}
+}

Review comment:
   Contract wise, `cleanupRecoverableState` is expected to clean-up for a 
resumable rather than a target file being written. Once a checkpoint is 
successfully 

[GitHub] [flink] flinkbot edited a comment on pull request #15131: [FLINK-21700][yarn]Allow to disable fetching Hadoop delegation token on Yarn

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15131:
URL: https://github.com/apache/flink/pull/15131#issuecomment-794957907


   
   ## CI report:
   
   * 0500d84f78d8fdb967b077fb024fda0cd4ba2d6a Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15682)
 
   * b99e1a12c74051454d211b0094c986b8741dd28f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15121: The method $(String) is undefined for the type TableExample

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15121:
URL: https://github.com/apache/flink/pull/15121#issuecomment-793480240


   
   ## CI report:
   
   * cf26ce895a7956d258acc073817f578558e78227 UNKNOWN
   * 709c4110370b845f42d916a06e56c6026cf2fac8 UNKNOWN
   * 4ea99332e1997eebca1f3f0a9d9229b8265fe32c UNKNOWN
   * 9a256ec6b644a0f795376076f4e3ff4e85aaad1e UNKNOWN
   * 76bbb8de288e2990974fe9ea7a09fb47e37daad8 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17220)
 
   * fbd38b53197c244af5ed3af54cebdee75b393a1d Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17257)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wydhcws commented on a change in pull request #15742: [FLINK-22442][CEP] Fix Using scala api to change the TimeCharacteristic o…

2021-04-26 Thread GitBox


wydhcws commented on a change in pull request #15742:
URL: https://github.com/apache/flink/pull/15742#discussion_r620818295



##
File path: 
flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala
##
@@ -0,0 +1,67 @@
+package org.apache.flink.cep.scala
+
+import java.lang.reflect.Field
+
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.Pattern
+import org.apache.flink.cep.pattern.conditions.SimpleCondition
+import org.apache.flink.streaming.api.datastream.DataStreamSource
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class CEPScalaApiPatternStreamTest {
+  /**
+* These tests simply check that use the Scala API  to update the 
TimeCharacteristic of the PatternStream .
+*/
+
+  @Test
+  def updateCepTimeCharacteristicByScalaApi(): Unit = {
+
+val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
+val input: DataStreamSource[Event] = env.fromElements(Event(1, "barfoo", 
1.0), Event(8, "end", 1.0))
+val pattern: Pattern[Event, Event] = Pattern.begin("start").where(new 
SimpleCondition[Event]() {
+  override def filter(value: Event): Boolean = value.name == "start"
+})
+
+val jestream: cep.PatternStream[Event] = 
org.apache.flink.cep.CEP.pattern(input, pattern)
+
+//get org.apache.flink.cep.scala.PatternStream
+val sePstream = new PatternStream[Event](jestream)
+
+//get  TimeBehaviour
+val time1: AnyRef = getTimeBehaviourFromScalaPatternStream(sePstream)
+
+assertEquals(time1.toString, "EventTime")
+
+//change TimeCharacteristic use scala api
+val sPstream: PatternStream[Event] = sePstream.inProcessingTime()
+
+//get  TimeBehaviour
+val time2: AnyRef = getTimeBehaviourFromScalaPatternStream(sPstream)
+
+assertEquals(time2.toString, "ProcessingTime")
+
+
+  }
+
+  def getTimeBehaviourFromScalaPatternStream(seStream: 
org.apache.flink.cep.scala.PatternStream[Event])  = {
+val field: Field = seStream.getClass.getDeclaredField("jPatternStream")
+field.setAccessible(true)
+val JPattern: AnyRef = field.get(seStream)
+val stream: cep.PatternStream[Event] = 
JPattern.asInstanceOf[cep.PatternStream[Event]]
+getTimeBehaviourFromJavaPatternStream(stream)
+  }
+
+  def getTimeBehaviourFromJavaPatternStream(jeStream: 
org.apache.flink.cep.PatternStream[Event])={
+val builder: Field = jeStream.getClass.getDeclaredField("builder")
+builder.setAccessible(true)
+val o: AnyRef = builder.get(jeStream)
+val timeBehaviour: Field = o.getClass.getDeclaredField("timeBehaviour")
+timeBehaviour.setAccessible(true)

Review comment:
   sir,thank you for your suggestion. Actually, I also think this is not 
good, but I am a little confused. How to prove that a  private variable is a 
change without reflection. Should I write a case to prove that the matching 
result changes after switching the timeCharacteristic of the PatternStream 
instead of proving that the private timeBehaviour changes ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zuston commented on pull request #15131: [FLINK-21700][yarn]Allow to disable fetching Hadoop delegation token on Yarn

2021-04-26 Thread GitBox


zuston commented on pull request #15131:
URL: https://github.com/apache/flink/pull/15131#issuecomment-827278120


   @XComp Done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-04-26 Thread GitBox


xintongsong commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r620815613



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterState.java
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs.writer;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.gs.GSFileSystemOptions;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.storage.BlobId;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** The state of a recoverable write. */
+class GSRecoverableWriterState implements RecoverableWriter.ResumeRecoverable, 
Cloneable {
+
+/** The blob id to which the recoverable write operation is writing. */
+public final BlobId finalBlobId;
+
+/** The number of bytes that have been written so far. */
+public long bytesWritten;
+
+/** Indicates if the write has been closed. */
+public boolean closed;
+
+/** The object ids for the temporary objects that should be composed to 
form the final blob. */
+public final List componentObjectIds;
+
+GSRecoverableWriterState(
+BlobId finalBlobId, long bytesWritten, boolean closed, List 
componentObjectIds) {
+this.finalBlobId = Preconditions.checkNotNull(finalBlobId);
+Preconditions.checkArgument(bytesWritten >= 0);
+this.bytesWritten = bytesWritten;
+this.closed = closed;
+
+// shallow copy the component object ids to ensure this state object 
exclusively
+// manages the list of component object ids
+this.componentObjectIds = new 
ArrayList<>(Preconditions.checkNotNull(componentObjectIds));
+}
+
+GSRecoverableWriterState(GSRecoverableWriterState state) {
+this(state.finalBlobId, state.bytesWritten, state.closed, 
state.componentObjectIds);
+}
+
+GSRecoverableWriterState(BlobId finalBlobId) {
+this(finalBlobId, 0, false, new ArrayList<>());
+}
+
+/**
+ * Returns the temporary bucket name. If options specifies a temporary 
bucket name, we use that
+ * one; otherwise, we use the bucket name of the final blob.
+ *
+ * @param options The GS file system options
+ * @return The temporary bucket name
+ */
+String getTemporaryBucketName(GSFileSystemOptions options) {
+return options.writerTemporaryBucketName.isEmpty()
+? finalBlobId.getBucket()
+: options.writerTemporaryBucketName;
+}

Review comment:
   I see. Thanks for the explanation. Then maybe also make this one a util 
method, that takes `finalBlobId` as an argument?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15763: [FLINK-22364][doc] Translate the page of "Data Sources" to Chinese.

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15763:
URL: https://github.com/apache/flink/pull/15763#issuecomment-826589720


   
   ## CI report:
   
   * 03b1c6bc999ee92a23eb0a0b84a368f2643e841d Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17201)
 
   * 0dcb773365ec16118d2b4c0f55910441bc6d7d58 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15745: [FLINK-22304][table] Refactor some interfaces for TVF based window to…

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15745:
URL: https://github.com/apache/flink/pull/15745#issuecomment-826249839


   
   ## CI report:
   
   * c9df5fedbcb6ce93f3a7576cf35753af3ffab6b4 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17223)
 
   * 11130e794b07aa9215035e5c204766295a4f50fe Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17258)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15121: The method $(String) is undefined for the type TableExample

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15121:
URL: https://github.com/apache/flink/pull/15121#issuecomment-793480240


   
   ## CI report:
   
   * cf26ce895a7956d258acc073817f578558e78227 UNKNOWN
   * 709c4110370b845f42d916a06e56c6026cf2fac8 UNKNOWN
   * 4ea99332e1997eebca1f3f0a9d9229b8265fe32c UNKNOWN
   * 9a256ec6b644a0f795376076f4e3ff4e85aaad1e UNKNOWN
   * 76bbb8de288e2990974fe9ea7a09fb47e37daad8 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17220)
 
   * fbd38b53197c244af5ed3af54cebdee75b393a1d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #15599: [FLINK-11838][flink-gs-fs-hadoop] Create Google Storage file system with recoverable writer support

2021-04-26 Thread GitBox


xintongsong commented on a change in pull request #15599:
URL: https://github.com/apache/flink/pull/15599#discussion_r620812109



##
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.gs;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.util.HadoopConfigLoader;
+import org.apache.flink.util.Preconditions;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * Implementation of the Flink {@link 
org.apache.flink.core.fs.FileSystemFactory} interface for
+ * Google Storage.
+ */
+public class GSFileSystemFactory implements FileSystemFactory {
+
+private static final String SCHEME = "gs";
+
+private static final String HADOOP_CONFIG_PREFIX = "fs.gs.";
+
+private static final String[] FLINK_CONFIG_PREFIXES = {"gs.", 
HADOOP_CONFIG_PREFIX};
+
+private static final String[][] MIRRORED_CONFIG_KEYS = {};
+
+private static final String FLINK_SHADING_PREFIX = "";
+
+public static final ConfigOption WRITER_TEMPORARY_BUCKET_NAME =
+ConfigOptions.key("gs.writer.temporary.bucket.name")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_BUCKET_NAME)
+.withDescription(
+"This option sets the bucket name used by the 
recoverable writer to store temporary files. "
++ "If empty, temporary files are stored in 
the same bucket as the final file being written.");
+
+public static final ConfigOption WRITER_TEMPORARY_OBJECT_PREFIX =
+ConfigOptions.key("gs.writer.temporary.object.prefix")
+.stringType()
+
.defaultValue(GSFileSystemOptions.DEFAULT_WRITER_TEMPORARY_OBJECT_PREFIX)
+.withDescription(
+"This option sets the prefix used by the 
recoverable writer when writing temporary files. This prefix is applied to the "
++ "final object name to form the base name 
for temporary files.");

Review comment:
   I'd be in favor of not introducing this option if it's not absolutely 
necessary. The more options we provide to the users, the more constraints we 
put on future developing and maintaining.
   
   As for including the bucket names in temporary blob names, I think that's a 
good idea. Since the temporary blobs are not meant to be manipulated by users 
directly, it would be nice to carry more information with the blob names.
   
   Moreover, I wonder whether we should also include the index of temporary 
blobs, in addition to the UUID. That might be useful for cleaning up the 
resumables. I'll explain in the other comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22451) Support (*) as parameter of table UserDefinedFunction

2021-04-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332850#comment-17332850
 ] 

Jark Wu commented on FLINK-22451:
-

{{ withColumns}} and {{withoutColumns}} are also shortcuts similar to 
{{$("*")}} for easier selecting columns. 

> Support (*) as parameter of table UserDefinedFunction 
> --
>
> Key: FLINK-22451
> URL: https://issues.apache.org/jira/browse/FLINK-22451
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Yi Tang
>Assignee: Yi Tang
>Priority: Minor
>  Labels: pull-request-available
>
> For now, one can use star \(*) to act as a wild card, selecting all of the 
> columns in the table.
> {code:java}
> Table result = orders.select($("*"));
> {code}
> When one use a star \(*) as parameter of an UDF, it will fail 
> {{ReferenceResolverRule}} in on
> {code:java}
> "Cannot resolve field [*], input field list:[...]."
> {code}
> The cause is that, the parameter of an UDF is not expanded in 
> {{StarReferenceFlatteningRule}}
> I think we can support to expand the star parameter to the real fields list 
> if it is the only parameter(the last parameter is also ok) of the UDF.
> then the parameters can be received by
> {code:java}
> eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object... row)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #15745: [FLINK-22304][table] Refactor some interfaces for TVF based window to…

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15745:
URL: https://github.com/apache/flink/pull/15745#issuecomment-826249839


   
   ## CI report:
   
   * c9df5fedbcb6ce93f3a7576cf35753af3ffab6b4 Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17223)
 
   * 11130e794b07aa9215035e5c204766295a4f50fe UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tisonkun merged pull request #15756: [hotfix][docs] Removed duplicate 'is'

2021-04-26 Thread GitBox


tisonkun merged pull request #15756:
URL: https://github.com/apache/flink/pull/15756


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-22470) The root cause of the exception encountered during compiling the job was not exposed to users in certain cases

2021-04-26 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-22470.
---
Resolution: Fixed

Merged to
- master via bef98cf0ba168dd4f0b2c8614d2179cec8b54ff8
- release-1.13 via dfa6623f3d4a1fead2be88aa964ed69a1219d382
- release-1.12 via fbd38b53197c244af5ed3af54cebdee75b393a1d

> The root cause of the exception encountered during compiling the job was not 
> exposed to users in certain cases
> --
>
> Key: FLINK-22470
> URL: https://issues.apache.org/jira/browse/FLINK-22470
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.12.4
>
>
> For the following job:
> {code}
> def test():
> from pyflink.table import DataTypes, BatchTableEnvironment, 
> EnvironmentSettings
> env_settings = 
> EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
> table_env = 
> BatchTableEnvironment.create(environment_settings=env_settings)
> table_env \
> .get_config() \
> .get_configuration() \
> .set_string(
> "pipeline.jars",
> 
> "file:///Users/dianfu/code/src/alibaba/ververica-connectors/flink-sql-avro-1.12.0.jar"
> )
> table = table_env.from_elements(
> [('111', '222')],
> schema=DataTypes.ROW([
> DataTypes.FIELD('text', DataTypes.STRING()),
> DataTypes.FIELD('text1', DataTypes.STRING())
> ])
> )
> sink_ddl = f"""
> create table Results(
> a STRING,
> b STRING
> ) with (
> 'connector' = 'filesystem',
> 'path' = '/Users/dianfu/tmp/',
> 'format' = 'avro'
> )
> """
> table_env.execute_sql(sink_ddl)
> table.execute_insert("Results").wait()
> if __name__ == "__main__":
> test()
> {code}
> It throws the following exception:
> {code}
> pyflink.util.exceptions.TableException: Failed to execute sql
>at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)
>at 
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:498)
>at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>at java.lang.Thread.run(Thread.java:748)
> Process finished with exit code 1
> {code}
> The root cause isn't exposed and it's difficult for users to figure out what 
> happens.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] dianfu closed pull request #15765: [FLINK-22470][python] Make sure that the root cause of the exception encountered during compiling the job was exposed to users in all cases

2021-04-26 Thread GitBox


dianfu closed pull request #15765:
URL: https://github.com/apache/flink/pull/15765


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dianfu commented on pull request #15765: [FLINK-22470][python] Make sure that the root cause of the exception encountered during compiling the job was exposed to users in all cases

2021-04-26 Thread GitBox


dianfu commented on pull request #15765:
URL: https://github.com/apache/flink/pull/15765#issuecomment-827266624


   Closed via fbd38b53197c244af5ed3af54cebdee75b393a1d


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dianfu closed pull request #15766: [FLINK-22470][python] Make sure that the root cause of the exception encountered during compiling the job was exposed to users in all cases

2021-04-26 Thread GitBox


dianfu closed pull request #15766:
URL: https://github.com/apache/flink/pull/15766


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wydhcws commented on a change in pull request #15742: [FLINK-22442][CEP] Fix Using scala api to change the TimeCharacteristic o…

2021-04-26 Thread GitBox


wydhcws commented on a change in pull request #15742:
URL: https://github.com/apache/flink/pull/15742#discussion_r620795286



##
File path: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
##
@@ -447,17 +447,17 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) 
{
   }
 
  def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T] = {
-   jPatternStream.sideOutputLateData(lateDataOutputTag)
+   jPatternStream = jPatternStream.sideOutputLateData(lateDataOutputTag)

Review comment:
   Thank you for your valuable suggestions. I will try to rewrite it in a 
more elegant way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (FLINK-22074) testRequirementCheckOnlyTriggeredOnce(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest) failed

2021-04-26 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song closed FLINK-22074.

Resolution: Fixed

Fixed via
* master (1.14): 6236473f8fb90cbde9673d1fa0c51659bc9e0c8c
* release-1.13: ac90590061bdb21bac56247d998aeda1ba6c5e2c

> testRequirementCheckOnlyTriggeredOnce(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest)
>  failed
> -
>
> Key: FLINK-22074
> URL: https://issues.apache.org/jira/browse/FLINK-22074
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.13.0
>Reporter: Leonard Xu
>Assignee: Yangze Guo
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.13.0
>
>
> [ERROR] Tests run: 14, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 
> 1.654 s <<< FAILURE! - in 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest
>  
> [ERROR] 
> testRequirementCheckOnlyTriggeredOnce(org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest)
>  Time elapsed: 0.059 s <<< FAILURE! 
> java.lang.AssertionError: Expected to fail with a timeout. 
>  at org.junit.Assert.fail(Assert.java:88) 
>  at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase.assertFutureNotComplete(FineGrainedSlotManagerTestBase.java:126)
>  
>  at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest$10.lambda$new$3(FineGrainedSlotManagerTest.java:605)
>  
>  at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase$Context.runTest(FineGrainedSlotManagerTestBase.java:197)
>  
>  at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest$10.(FineGrainedSlotManagerTest.java:581)
>  
>  at 
> org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTest.testRequirementCheckOnlyTriggeredOnce(FineGrainedSlotManagerTest.java:565)
>  
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  
>  at java.lang.reflect.Method.invoke(Method.java:498) 
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>  
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>  
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  
>  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) 
>  at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) 
>  at org.junit.rules.RunRules.evaluate(RunRules.java:20) 
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>  
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>  
>  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
>  at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
>  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
>  
>  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
>  
>  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
>  
>  at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
>  
>  at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
>  
>  at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
>  
>  at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) 
>  at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong closed pull request #15751: [FLINK-22074][test] Harden FineGrainedSlotManagerTest#testRequirement…

2021-04-26 Thread GitBox


xintongsong closed pull request #15751:
URL: https://github.com/apache/flink/pull/15751


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (FLINK-17726) Scheduler should take care of tasks directly canceled by TaskManager

2021-04-26 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332834#comment-17332834
 ] 

Zhu Zhu edited comment on FLINK-17726 at 4/27/21, 1:50 AM:
---

I think it is a potential issue and is not a real production problem yet. The 
problem would happen only if a task is directly cancelled by TM and no other 
task in the same pipelined region was failed. So far I think this case will not 
happen.


was (Author: zhuzh):
I think it is a potential issue and is not a real production problem yet. The 
problem would happen only if a task is directly cancelled by TM without failing 
nay other task in the same pipelined region. So far I think this case will not 
happen.

> Scheduler should take care of tasks directly canceled by TaskManager
> 
>
> Key: FLINK-17726
> URL: https://issues.apache.org/jira/browse/FLINK-17726
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / Task
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Zhu Zhu
>Priority: Critical
>  Labels: stale-critical
>
> JobManager will not trigger failure handling when receiving CANCELED task 
> update. 
> This is because CANCELED tasks are usually caused by another FAILED task. 
> These CANCELED tasks will be restarted by the failover process triggered  
> FAILED task.
> However, if a task is directly CANCELED by TaskManager due to its own runtime 
> issue, the task will not be recovered by JM and thus the job would hang.
> This is a potential issue and we should avoid it.
> A possible solution is to let JobManager treat tasks transitioning to 
> CANCELED from all states except from CANCELING as failed tasks. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17726) Scheduler should take care of tasks directly canceled by TaskManager

2021-04-26 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332834#comment-17332834
 ] 

Zhu Zhu commented on FLINK-17726:
-

I think it is a potential issue and is not a real production problem yet. The 
problem would happen only if a task is directly cancelled by TM without failing 
nay other task in the same pipelined region. So far I think this case will not 
happen.

> Scheduler should take care of tasks directly canceled by TaskManager
> 
>
> Key: FLINK-17726
> URL: https://issues.apache.org/jira/browse/FLINK-17726
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / Task
>Affects Versions: 1.11.0, 1.12.0
>Reporter: Zhu Zhu
>Priority: Critical
>  Labels: stale-critical
>
> JobManager will not trigger failure handling when receiving CANCELED task 
> update. 
> This is because CANCELED tasks are usually caused by another FAILED task. 
> These CANCELED tasks will be restarted by the failover process triggered  
> FAILED task.
> However, if a task is directly CANCELED by TaskManager due to its own runtime 
> issue, the task will not be recovered by JM and thus the job would hang.
> This is a potential issue and we should avoid it.
> A possible solution is to let JobManager treat tasks transitioning to 
> CANCELED from all states except from CANCELING as failed tasks. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong closed pull request #15748: [hotfix][coordination] Add log for slot allocation in FineGrainedSlot…

2021-04-26 Thread GitBox


xintongsong closed pull request #15748:
URL: https://github.com/apache/flink/pull/15748


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] JingsongLi commented on a change in pull request #15760: [FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder.

2021-04-26 Thread GitBox


JingsongLi commented on a change in pull request #15760:
URL: https://github.com/apache/flink/pull/15760#discussion_r620782487



##
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
##
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.window;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import 
org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
+import org.apache.flink.table.runtime.operators.window.state.WindowListState;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+
+import java.util.IdentityHashMap;
+import java.util.List;
+
+/**
+ * Streaming window join operator.
+ *
+ * Note: currently, {@link WindowJoinOperator} doesn't support early-fire 
and late-arrival. Thus
+ * late elements (elements belong to emitted windows) will be simply dropped.
+ */
+public abstract class WindowJoinOperator extends TableStreamOperator
+implements TwoInputStreamOperator,
+Triggerable,
+KeyContext {
+
+private static final long serialVersionUID = 1L;
+
+private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
+"leftNumLateRecordsDropped";
+private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
+"leftLateRecordsDroppedRate";
+private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
+"rightNumLateRecordsDropped";
+private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
+"rightLateRecordsDroppedRate";
+private static final String WATERMARK_LATENCY_METRIC_NAME = 
"watermarkLatency";
+private static final String LEFT_RECORDS_STATE_NAME = "left-records";
+private static final String RIGHT_RECORDS_STATE_NAME = "right-records";
+
+protected final InternalTypeInfo leftType;
+protected final InternalTypeInfo rightType;
+private final GeneratedJoinCondition generatedJoinCondition;
+
+private final int leftWindowEndIndex;
+private final int rightWindowEndIndex;
+
+private final boolean[] filterNullKeys;
+
+/** Flag to prevent duplicate function.close() calls in close() and 
dispose(). */
+private transient boolean functionsClosed = false;
+
+private transient InternalTimerService internalTimerService;
+
+// 
+protected transient JoinConditionWithNullFilters joinCondition;
+
+/** This is used for emitting elements with a given timestamp. */
+protected transient TimestampedCollector collector;
+
+private transient WindowListState leftWindowState;
+private transient WindowListState rightWindowState;
+
+// 

[GitHub] [flink] JingsongLi commented on a change in pull request #15760: [FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder.

2021-04-26 Thread GitBox


JingsongLi commented on a change in pull request #15760:
URL: https://github.com/apache/flink/pull/15760#discussion_r620781908



##
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
##
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.window;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import 
org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
+import org.apache.flink.table.runtime.operators.window.state.WindowListState;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+
+import java.util.IdentityHashMap;
+import java.util.List;
+
+/**
+ * Streaming window join operator.
+ *
+ * Note: currently, {@link WindowJoinOperator} doesn't support early-fire 
and late-arrival. Thus
+ * late elements (elements belong to emitted windows) will be simply dropped.
+ */
+public abstract class WindowJoinOperator extends TableStreamOperator
+implements TwoInputStreamOperator,
+Triggerable,
+KeyContext {
+
+private static final long serialVersionUID = 1L;
+
+private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
+"leftNumLateRecordsDropped";
+private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
+"leftLateRecordsDroppedRate";
+private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
+"rightNumLateRecordsDropped";
+private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
+"rightLateRecordsDroppedRate";
+private static final String WATERMARK_LATENCY_METRIC_NAME = 
"watermarkLatency";
+private static final String LEFT_RECORDS_STATE_NAME = "left-records";
+private static final String RIGHT_RECORDS_STATE_NAME = "right-records";
+
+protected final InternalTypeInfo leftType;
+protected final InternalTypeInfo rightType;
+private final GeneratedJoinCondition generatedJoinCondition;
+
+private final int leftWindowEndIndex;
+private final int rightWindowEndIndex;
+
+private final boolean[] filterNullKeys;
+
+/** Flag to prevent duplicate function.close() calls in close() and 
dispose(). */
+private transient boolean functionsClosed = false;
+
+private transient InternalTimerService internalTimerService;
+
+// 
+protected transient JoinConditionWithNullFilters joinCondition;
+
+/** This is used for emitting elements with a given timestamp. */
+protected transient TimestampedCollector collector;
+
+private transient WindowListState leftWindowState;
+private transient WindowListState rightWindowState;
+
+// 

[GitHub] [flink] JingsongLi commented on a change in pull request #15760: [FLINK-19606][table-runtime-blink] Introduce WindowJoinOperator and WindowJoinOperatorBuilder.

2021-04-26 Thread GitBox


JingsongLi commented on a change in pull request #15760:
URL: https://github.com/apache/flink/pull/15760#discussion_r620781485



##
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
##
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.window;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import 
org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
+import org.apache.flink.table.runtime.operators.window.state.WindowListState;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.RowKind;
+
+import java.util.IdentityHashMap;
+import java.util.List;
+
+/**
+ * Streaming window join operator.
+ *
+ * Note: currently, {@link WindowJoinOperator} doesn't support early-fire 
and late-arrival. Thus
+ * late elements (elements belong to emitted windows) will be simply dropped.
+ */
+public abstract class WindowJoinOperator extends TableStreamOperator
+implements TwoInputStreamOperator,
+Triggerable,
+KeyContext {
+
+private static final long serialVersionUID = 1L;
+
+private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
+"leftNumLateRecordsDropped";
+private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
+"leftLateRecordsDroppedRate";
+private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
+"rightNumLateRecordsDropped";
+private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
+"rightLateRecordsDroppedRate";
+private static final String WATERMARK_LATENCY_METRIC_NAME = 
"watermarkLatency";
+private static final String LEFT_RECORDS_STATE_NAME = "left-records";
+private static final String RIGHT_RECORDS_STATE_NAME = "right-records";
+
+protected final InternalTypeInfo leftType;
+protected final InternalTypeInfo rightType;
+private final GeneratedJoinCondition generatedJoinCondition;
+
+private final int leftWindowEndIndex;
+private final int rightWindowEndIndex;
+
+private final boolean[] filterNullKeys;
+
+/** Flag to prevent duplicate function.close() calls in close() and 
dispose(). */
+private transient boolean functionsClosed = false;
+
+private transient InternalTimerService internalTimerService;
+
+// 
+protected transient JoinConditionWithNullFilters joinCondition;
+
+/** This is used for emitting elements with a given timestamp. */
+protected transient TimestampedCollector collector;
+
+private transient WindowListState leftWindowState;
+private transient WindowListState rightWindowState;
+
+// 

[GitHub] [flink] JingsongLi merged pull request #15752: [FLINK-19606][table-runtime-blink] Refactor JoinConditionWithFullFilters from an inner class in AbstractStreamingJoinOperator to a common utilit

2021-04-26 Thread GitBox


JingsongLi merged pull request #15752:
URL: https://github.com/apache/flink/pull/15752


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15774: [FLINK-22479][Kinesis][Consumer] Potential lock-up under error condition (Flink 1.14)

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15774:
URL: https://github.com/apache/flink/pull/15774#issuecomment-827133461


   
   ## CI report:
   
   * 5fbd84949babc1e127bc8d6d951ed75dffda4151 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17248)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15773: [FLINK-22479][Kinesis][Consumer] Potential lock-up under error condition (Flink 1.12)

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15773:
URL: https://github.com/apache/flink/pull/15773#issuecomment-827133382


   
   ## CI report:
   
   * ece7df21556a92a2711f39cc46d41aa094392eb7 Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17247)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-21938) Add documentation about how to test Python UDFs

2021-04-26 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-21938:
---
Labels: auto-unassigned pull-request-available  (was: 
pull-request-available stale-assigned)

> Add documentation about how to test Python UDFs
> ---
>
> Key: FLINK-21938
> URL: https://issues.apache.org/jira/browse/FLINK-21938
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Documentation
>Reporter: Dian Fu
>Assignee: Yik San Chan
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.13.0
>
>
> It should be similar to the Java UDFs:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-user-defined-functions



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21938) Add documentation about how to test Python UDFs

2021-04-26 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332797#comment-17332797
 ] 

Flink Jira Bot commented on FLINK-21938:


This issue was marked "stale-assigned" and has not received an update in 7 
days. It is now automatically unassigned. If you are still working on it, you 
can assign it to yourself again. Please also give an update about the status of 
the work.

> Add documentation about how to test Python UDFs
> ---
>
> Key: FLINK-21938
> URL: https://issues.apache.org/jira/browse/FLINK-21938
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Documentation
>Reporter: Dian Fu
>Assignee: Yik San Chan
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.13.0
>
>
> It should be similar to the Java UDFs:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-user-defined-functions



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] mans2singh commented on pull request #15756: [hotfix][docs] Removed duplicate 'is'

2021-04-26 Thread GitBox


mans2singh commented on pull request #15756:
URL: https://github.com/apache/flink/pull/15756#issuecomment-827188168


   Hello @wuchong @leonardBang - Can you please review this PR ? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15772: [FLINK-21131][webui] Alignment timeout displayed in checkpoint config…

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15772:
URL: https://github.com/apache/flink/pull/15772#issuecomment-827092251


   
   ## CI report:
   
   * fbafed45312a6d55e2a65e666833c115c0a0f07a Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17242)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15774: [FLINK-22479][Kinesis][Consumer] Potential lock-up under error condition (Flink 1.14)

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15774:
URL: https://github.com/apache/flink/pull/15774#issuecomment-827133461


   
   ## CI report:
   
   * 5fbd84949babc1e127bc8d6d951ed75dffda4151 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17248)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #15773: [FLINK-22479][Kinesis][Consumer] Potential lock-up under error condition (Flink 1.12)

2021-04-26 Thread GitBox


flinkbot edited a comment on pull request #15773:
URL: https://github.com/apache/flink/pull/15773#issuecomment-827133382


   
   ## CI report:
   
   * ece7df21556a92a2711f39cc46d41aa094392eb7 Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=17247)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #15774: [FLINK-22479][Kinesis][Consumer] Potential lock-up under error condition (Flink 1.14)

2021-04-26 Thread GitBox


flinkbot commented on pull request #15774:
URL: https://github.com/apache/flink/pull/15774#issuecomment-827133461


   
   ## CI report:
   
   * 5fbd84949babc1e127bc8d6d951ed75dffda4151 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #15773: [FLINK-22479][Kinesis][Consumer] Potential lock-up under error condition (Flink 1.12)

2021-04-26 Thread GitBox


flinkbot commented on pull request #15773:
URL: https://github.com/apache/flink/pull/15773#issuecomment-827133382


   
   ## CI report:
   
   * ece7df21556a92a2711f39cc46d41aa094392eb7 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-22441) In Flink v1.11.3 contains netty(version:3.10.6) netty(version:4.1.60) . There are many vulnerabilities, like CVE-2021-21409 etc. please confirm these version and fix.

2021-04-26 Thread Austin Cawley-Edwards (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332718#comment-17332718
 ] 

Austin Cawley-Edwards commented on FLINK-22441:
---

Adding the "drop Scala 2.11 support" ticket as a blocker, as Konstantin 
mentioned the current Akka lib is the root of this issue, and Akka cannot be 
upgraded until Scala 2.11 is dropped. If there's a more accurate ticket for the 
Akka upgrade, we can update the blocker.

> In Flink v1.11.3 contains netty(version:3.10.6) netty(version:4.1.60) . There 
> are many vulnerabilities, like CVE-2021-21409 etc. please confirm these 
> version and fix. thx
> --
>
> Key: FLINK-22441
> URL: https://issues.apache.org/jira/browse/FLINK-22441
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.11.3, 1.12.2, 1.13.0
>Reporter: 张健
>Priority: Major
>
> In Flink v1.11.3 contains netty(version:3.10.6) netty(version:4.1.60) . There 
> are many vulnerabilities, like CVE-2021-21409 CVE-2021-21295 etc. please 
> confirm these version and fix. thx



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on pull request #15774: [FLINK-22479[Kinesis][Consumer] Potential lock-up under error condition (Flink 1.14)

2021-04-26 Thread GitBox


flinkbot commented on pull request #15774:
URL: https://github.com/apache/flink/pull/15774#issuecomment-827123991


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 5fbd84949babc1e127bc8d6d951ed75dffda4151 (Mon Apr 26 
20:24:55 UTC 2021)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #15773: [FLINK-22479[Kinesis][Consumer] Potential lock-up under error condition (Flink 1.12)

2021-04-26 Thread GitBox


flinkbot commented on pull request #15773:
URL: https://github.com/apache/flink/pull/15773#issuecomment-827122947


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit ece7df21556a92a2711f39cc46d41aa094392eb7 (Mon Apr 26 
20:22:58 UTC 2021)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-22479) [Kinesis][Consumer] Potential lock-up under error condition

2021-04-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-22479:
---
Labels: pull-request-available  (was: )

> [Kinesis][Consumer] Potential lock-up under error condition
> ---
>
> Key: FLINK-22479
> URL: https://issues.apache.org/jira/browse/FLINK-22479
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.12.0, 1.12.1, 1.12.2
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.13.0, 1.14.0, 1.12.3
>
>
> *Background*
> This connector has been 
> [forked|https://github.com/awslabs/amazon-kinesis-connector-flink] by AWS for 
> use on KDA with Flink 1.11. Bugs have been encountered:
> - Under high backpressure scenarios
> - When an error is thrown during tear down
> *Scope*
> Pull in the following fixes from AWS fork:
> * Fix issue where {{KinesisDataFetcher.shutdownFetcher()}} hangs 
> ([issue|https://github.com/awslabs/amazon-kinesis-connector-flink/issues/23], 
> [pull 
> request|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/24])
>   
> * Log error when shutting down Kinesis Data Fetcher 
> ([issue|https://github.com/awslabs/amazon-kinesis-connector-flink/issues/22], 
> [pull 
> request|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/25])
>   
> * Treating TimeoutException as Recoverable Exception 
> ([issue|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/28], 
> [pull 
> request|https://github.com/awslabs/amazon-kinesis-connector-flink/issues/21])
>   
> * Add time-out for acquiring subscription and passing events from network to 
> source thread to prevent deadlock ([pull 
> request|https://github.com/awslabs/amazon-kinesis-connector-flink/pull/18]) 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] dannycranmer opened a new pull request #15774: [FLINK-22479[Kinesis][Consumer] Potential lock-up under error condition (Flink 1.14)

2021-04-26 Thread GitBox


dannycranmer opened a new pull request #15774:
URL: https://github.com/apache/flink/pull/15774


   ## What is the purpose of the change
   
   Pull in the following fixes from AWS fork:
   * Fix issue where `KinesisDataFetcher.shutdownFetcher()` hangs 
([issue](https://github.com/awslabs/amazon-kinesis-connector-flink/issues/23), 
[pull 
request](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/24))
 
   * Log error when shutting down Kinesis Data Fetcher 
([issue](https://github.com/awslabs/amazon-kinesis-connector-flink/issues/22), 
[pull 
request](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/25))
 
   * Treating TimeoutException as Recoverable Exception 
([issue](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/28), 
[pull 
request](https://github.com/awslabs/amazon-kinesis-connector-flink/issues/21))
 
   * Add time-out for acquiring subscription and passing events from network to 
source thread to prevent deadlock ([pull 
request](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/18)) 
   
   ## Brief change log
   
   - Added `Timeout` when blocking on queue between source and network thread
   - Catching exceptions during source teardown, to allow source to interrupt 
threads of executor
   
   ## Verifying this change
   
   - Added unit tests for changes
   - Verified on AWS KDA with test applications
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] dannycranmer opened a new pull request #15773: [FLINK-22479[Kinesis][Consumer] Potential lock-up under error condition (Flink 1.12)

2021-04-26 Thread GitBox


dannycranmer opened a new pull request #15773:
URL: https://github.com/apache/flink/pull/15773


   ## What is the purpose of the change
   
   Pull in the following fixes from AWS fork:
   * Fix issue where `KinesisDataFetcher.shutdownFetcher()` hangs 
([issue](https://github.com/awslabs/amazon-kinesis-connector-flink/issues/23), 
[pull 
request](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/24))
 
   * Log error when shutting down Kinesis Data Fetcher 
([issue](https://github.com/awslabs/amazon-kinesis-connector-flink/issues/22), 
[pull 
request](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/25))
 
   * Treating TimeoutException as Recoverable Exception 
([issue](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/28), 
[pull 
request](https://github.com/awslabs/amazon-kinesis-connector-flink/issues/21))
 
   * Add time-out for acquiring subscription and passing events from network to 
source thread to prevent deadlock ([pull 
request](https://github.com/awslabs/amazon-kinesis-connector-flink/pull/18)) 
   
   ## Brief change log
   
   - Added `Timeout` when blocking on queue between source and network thread
   - Catching exceptions during source teardown, to allow source to interrupt 
threads of executor
   
   ## Verifying this change
   
   - Added unit tests for changes
   - Verified on AWS KDA with test applications
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   5   6   >