[jira] [Updated] (FLINK-11411) Failover regions number of RestartPipelinedRegionStrategy not show in LOG due to incorrect parameter

2019-01-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11411:
---
Labels: pull-request-available  (was: )

> Failover regions number of RestartPipelinedRegionStrategy not show in LOG due 
> to incorrect parameter
> 
>
> Key: FLINK-11411
> URL: https://issues.apache.org/jira/browse/FLINK-11411
> Project: Flink
>  Issue Type: Improvement
>Reporter: BoWang
>Assignee: BoWang
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] eaglewatcherwb opened a new pull request #7561: [FLINK-11411][log] show correct failover region number

2019-01-22 Thread GitBox
eaglewatcherwb opened a new pull request #7561: [FLINK-11411][log] show correct 
failover region number
URL: https://github.com/apache/flink/pull/7561
 
 
   Change-Id: I118b7ea28d36edccbb3007fdf946b197b9f88865
   
   
   
   ## What is the purpose of the change
   
   show correct failover region number in log
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):  no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no 
 - The runtime per-record code paths (performance sensitive): no 
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no 
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11411) Failover regions number of RestartPipelinedRegionStrategy not show in LOG due to incorrect parameter

2019-01-22 Thread BoWang (JIRA)
BoWang created FLINK-11411:
--

 Summary: Failover regions number of RestartPipelinedRegionStrategy 
not show in LOG due to incorrect parameter
 Key: FLINK-11411
 URL: https://issues.apache.org/jira/browse/FLINK-11411
 Project: Flink
  Issue Type: Improvement
Reporter: BoWang
Assignee: BoWang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] eaglewatcherwb opened a new pull request #7560: [FLINK-11344][webfrontend] display all execution attempts on web

2019-01-22 Thread GitBox
eaglewatcherwb opened a new pull request #7560: [FLINK-11344][webfrontend] 
display all execution attempts on web
URL: https://github.com/apache/flink/pull/7560
 
 
   dashboard
   
   Change-Id: I859cf593ca567006778330b5a22f3b8af5acc965
   
   
   
   ## What is the purpose of the change
   
   display all execution attempts on web
   
   
   ## Brief change log
   
 - display all execution attempts on web
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers:  no
 - The runtime per-record code paths (performance sensitive):  no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] eaglewatcherwb closed pull request #7503: [FLINK-11344][webfrontend] display all execution attempts on web

2019-01-22 Thread GitBox
eaglewatcherwb closed pull request #7503: [FLINK-11344][webfrontend] display 
all execution attempts on web
URL: https://github.com/apache/flink/pull/7503
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11336) Flink HA didn't remove ZK metadata

2019-01-22 Thread shengjk1 (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16749548#comment-16749548
 ] 

shengjk1 commented on FLINK-11336:
--

Yarn (per job or as a session)

> Flink HA didn't remove ZK metadata
> --
>
> Key: FLINK-11336
> URL: https://issues.apache.org/jira/browse/FLINK-11336
> Project: Flink
>  Issue Type: Improvement
>Reporter: shengjk1
>Priority: Major
> Attachments: image-2019-01-15-19-42-21-902.png
>
>
> Flink HA didn't remove ZK metadata
> such as 
> go to zk cli  : ls /flinkone
> !image-2019-01-15-19-42-21-902.png!
>  
> i suggest we should delete this metadata when the application  cancel or 
> throw exception



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11336) Flink HA didn't remove ZK metadata

2019-01-22 Thread shengjk1 (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16749548#comment-16749548
 ] 

shengjk1 edited comment on FLINK-11336 at 1/23/19 5:53 AM:
---

Yarn (per job or as a session)


was (Author: shengjk1):
Yarn (per job or as a session)

> Flink HA didn't remove ZK metadata
> --
>
> Key: FLINK-11336
> URL: https://issues.apache.org/jira/browse/FLINK-11336
> Project: Flink
>  Issue Type: Improvement
>Reporter: shengjk1
>Priority: Major
> Attachments: image-2019-01-15-19-42-21-902.png
>
>
> Flink HA didn't remove ZK metadata
> such as 
> go to zk cli  : ls /flinkone
> !image-2019-01-15-19-42-21-902.png!
>  
> i suggest we should delete this metadata when the application  cancel or 
> throw exception



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on issue #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter

2019-01-22 Thread GitBox
zhijiangW commented on issue #7438: [FLINK-11282][network] Merge 
StreamRecordWriter into RecordWriter
URL: https://github.com/apache/flink/pull/7438#issuecomment-456673652
 
 
   @pnowojski , I have updated the codes for addressing the comment.  :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter

2019-01-22 Thread GitBox
zhijiangW commented on a change in pull request #7438: [FLINK-11282][network] 
Merge StreamRecordWriter into RecordWriter
URL: https://github.com/apache/flink/pull/7438#discussion_r250035752
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
 ##
 @@ -71,7 +71,7 @@ public void setUp(long flushTimeout) throws Exception {
 *
 * @param flushTimeout
 *  output flushing interval of the
-*  {@link 
org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher 
thread
+*  {@link 
org.apache.flink.runtime.io.network.api.writer}'s output flusher thread
 
 Review comment:
   ,I would add it in a fixup commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] allenxwang commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-22 Thread GitBox
allenxwang commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r250026480
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
 ##
 @@ -55,4 +57,13 @@
 * @return null or the target topic
 */
String getTargetTopic(T element);
+
+   /**
+*
+* @param element The incoming element to be serialized
+* @return collection of headers (maybe empty)
+*/
+   default Iterable> headers(T element) {
 
 Review comment:
   What's the reason to use `Map.Entry` vs. `Tuple2`? `Tuple2` seems to be easier to construct than `Map.Entry` in 
implementation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249800524
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
 
 Review comment:
   The reason I use `getInputFromOutputName` is the return type can be an 
`Option[String]`, because any two output names will surely not be the same. If 
we use `mapInputToOutputName` then we need return a Seq[String]. Both methods 
can solve the problem. Only we have to do is to define the method clearly.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] KurtYoung commented on a change in pull request #6873: [hotfix] Add Review Progress section to PR description template.

2019-01-22 Thread GitBox
KurtYoung commented on a change in pull request #6873: [hotfix] Add Review 
Progress section to PR description template.
URL: https://github.com/apache/flink/pull/6873#discussion_r250022810
 
 

 ##
 File path: .github/PULL_REQUEST_TEMPLATE.md
 ##
 @@ -70,3 +70,15 @@ This change added tests and can be verified as follows:
 
   - Does this pull request introduce a new feature? (yes / no)
   - If yes, how is the feature documented? (not applicable / docs / JavaDocs / 
not documented)
+
+
+
+# Review Progress 
+
+* [ ] 1. The contribution is well-described.
+* [ ] 2. There is consensus that the contribution should go into to Flink.
 
 Review comment:
   Can you give some examples to show what makes a consensus, let developers 
have a good understanding?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tweise opened a new pull request #7559: [FLINK-11048] Mark new RemoteStreamEnvironment constructor PublicEvolving

2019-01-22 Thread GitBox
tweise opened a new pull request #7559: [FLINK-11048] Mark new 
RemoteStreamEnvironment constructor PublicEvolving
URL: https://github.com/apache/flink/pull/7559
 
 
   Trivial follow-up for #7249 to not lock down this constructor, in case we 
want to add any more settings.
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11410) Kubernetes Setup page gives incorrect url of Flink UI

2019-01-22 Thread Frank Huang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Huang closed FLINK-11410.
---
Resolution: Invalid

> Kubernetes Setup page gives incorrect url of Flink UI
> -
>
> Key: FLINK-11410
> URL: https://issues.apache.org/jira/browse/FLINK-11410
> Project: Flink
>  Issue Type: Bug
>Reporter: Frank Huang
>Priority: Major
>
> in this 
> [section|https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/kubernetes.html#deploy-flink-session-cluster-on-kubernetes],
>  url should be 
> [http://localhost:8081/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy|http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy].
>  The port should be 8081 instead of 8001. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11402) User code can fail with an UnsatisfiedLinkError in the presence of multiple classloaders

2019-01-22 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16749170#comment-16749170
 ] 

Chesnay Schepler commented on FLINK-11402:
--

[~uce] What happens if snappy-java is placed in /lib?

Judging from FLINK-5408 this sounds like something we can't fix in general nor 
detect easily.

As such I'd say we add a native section to the classloading-debugging docs, 
stating that in order to use system-provided native libraries the user must 
ensure that they are loaded through the system-classloader, requiring the 
loading classes to be placed in /lib.

> User code can fail with an UnsatisfiedLinkError in the presence of multiple 
> classloaders
> 
>
> Key: FLINK-11402
> URL: https://issues.apache.org/jira/browse/FLINK-11402
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Local Runtime
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Priority: Major
> Attachments: hello-snappy-1.0-SNAPSHOT.jar, hello-snappy.tgz
>
>
> As reported on the user mailing list thread "[`env.java.opts` not persisting 
> after job canceled or failed and then 
> restarted|https://lists.apache.org/thread.html/37cc1b628e16ca6c0bacced5e825de8057f88a8d601b90a355b6a291@%3Cuser.flink.apache.org%3E];,
>  there can be issues with using native libraries and user code class loading.
> h2. Steps to reproduce
> I was able to reproduce the issue reported on the mailing list using 
> [snappy-java|https://github.com/xerial/snappy-java] in a user program. 
> Running the attached user program works fine on initial submission, but 
> results in a failure when re-executed.
> I'm using Flink 1.7.0 using a standalone cluster started via 
> {{bin/start-cluster.sh}}.
> 0. Unpack attached Maven project and build using {{mvn clean package}} *or* 
> directly use attached {{hello-snappy-1.0-SNAPSHOT.jar}}
> 1. Download 
> [snappy-java-1.1.7.2.jar|http://central.maven.org/maven2/org/xerial/snappy/snappy-java/1.1.7.2/snappy-java-1.1.7.2.jar]
>  and unpack libsnappyjava for your system:
> {code}
> jar tf snappy-java-1.1.7.2.jar | grep libsnappy
> ...
> org/xerial/snappy/native/Linux/x86_64/libsnappyjava.so
> ...
> org/xerial/snappy/native/Mac/x86_64/libsnappyjava.jnilib
> ...
> {code}
> 2. Configure system library path to {{libsnappyjava}} in {{flink-conf.yaml}} 
> (path needs to be adjusted for your system):
> {code}
> env.java.opts: -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64
> {code}
> 3. Run attached {{hello-snappy-1.0-SNAPSHOT.jar}}
> {code}
> bin/flink run hello-snappy-1.0-SNAPSHOT.jar
> Starting execution of program
> Program execution finished
> Job with JobID ae815b918dd7bc64ac8959e4e224f2b4 has finished.
> Job Runtime: 359 ms
> {code}
> 4. Rerun attached {{hello-snappy-1.0-SNAPSHOT.jar}}
> {code}
> bin/flink run hello-snappy-1.0-SNAPSHOT.jar
> Starting execution of program
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Job failed. 
> (JobID: 7d69baca58f33180cb9251449ddcd396)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
>   at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at com.github.uce.HelloSnappy.main(HelloSnappy.java:18)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
>   at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>   at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution 

[GitHub] casidiablo opened a new pull request #7558: Fix typo in RocksDB error

2019-01-22 Thread GitBox
casidiablo opened a new pull request #7558: Fix typo in RocksDB error
URL: https://github.com/apache/flink/pull/7558
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11410) Kubernetes Setup page gives incorrect url of Flink UI

2019-01-22 Thread Frank Huang (JIRA)
Frank Huang created FLINK-11410:
---

 Summary: Kubernetes Setup page gives incorrect url of Flink UI
 Key: FLINK-11410
 URL: https://issues.apache.org/jira/browse/FLINK-11410
 Project: Flink
  Issue Type: Bug
Reporter: Frank Huang


in this 
[section|https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/kubernetes.html#deploy-flink-session-cluster-on-kubernetes],
 url should be 
[http://localhost:8081/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy|http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy].
 The port should be 8081 instead of 8001. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11409) Make `ProcessFunction`, `ProcessWindowFunction` and etc. pure interfaces

2019-01-22 Thread Kezhu Wang (JIRA)
Kezhu Wang created FLINK-11409:
--

 Summary: Make `ProcessFunction`, `ProcessWindowFunction` and etc. 
pure interfaces
 Key: FLINK-11409
 URL: https://issues.apache.org/jira/browse/FLINK-11409
 Project: Flink
  Issue Type: Improvement
Reporter: Kezhu Wang


I found these functions express no opinionated demands from implementing 
classes. It would be nice to implement as interfaces not abstract classes as 
abstract class is intrusive and hampers caller user cases. For example, client 
can't write an `AbstractFlinkRichFunction` to unify lifecycle management for 
all data processing functions in easy way.

I dive history of some of these functions, and find that some functions were 
converted as abstract class from interface due to default method 
implementation, such as `ProcessFunction` and `CoProcessFunction` were 
converted to abstract classes in FLINK-4460 which predate FLINK-7274. After 
FLINK-7274, [Java 8 default 
method|https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html]
 would be a better solution.

I notice also that some functions which are introduced after FLINK-7274, such 
as `ProcessJoinFunction`, are implemented as abstract classes. I think it would 
be better to establish a well-known principle to guide both api authors and 
callers of data processing functions.

Personally, I prefer interface for all exported function callbacks for the 
reason I express in first paragraph.

Besides this, with `AbstractRichFunction` and interfaces for data processing 
functions I think lots of rich data processing functions can be eliminated as 
they are plain classes extending `AbstractRichFunction` and implementing data 
processing interfaces, clients can write this in one line code with clear 
intention of both data processing and lifecycle management.

Following is a possible incomplete list of data processing functions 
implemented as abstract classes currently:
 * `ProcessFunction`, `KeyedProcessFunction`, `CoProcessFunction` and 
`ProcessJoinFunction`
 * `ProcessWindowFunction` and `ProcessAllWindowFunction`
 * `BaseBroadcastProcessFunction`, `BroadcastProcessFunction` and 
`KeyedBroadcastProcessFunction`

All above functions are annotated with `@PublicEvolving`, making they 
interfaces won't break Flink's compatibility guarantee but compatibility is 
still a big consideration to evaluate this proposal.

Any thoughts on this proposal ? Please must comment out.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun closed pull request #7528: [hotfix] [utils] Implement utils without cost of "enum"

2019-01-22 Thread GitBox
TisonKun closed pull request #7528: [hotfix] [utils] Implement utils without 
cost of "enum"
URL: https://github.com/apache/flink/pull/7528
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on issue #7529: [hotfix] [blob] Generalize usage of NoOpTransientBlobService

2019-01-22 Thread GitBox
TisonKun commented on issue #7529: [hotfix] [blob] Generalize usage of 
NoOpTransientBlobService
URL: https://github.com/apache/flink/pull/7529#issuecomment-456481455
 
 
   @GJL alright we can defer such a `generalization` when we actually want to 
use it somewhere else. For using `enum` as a strategy to implement singletons, 
I've learned it from zentol how Flink community think of it #7528 . I'll close 
it as left.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun closed pull request #7529: [hotfix] [blob] Generalize usage of NoOpTransientBlobService

2019-01-22 Thread GitBox
TisonKun closed pull request #7529: [hotfix] [blob] Generalize usage of 
NoOpTransientBlobService
URL: https://github.com/apache/flink/pull/7529
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11406) Return TypeSerializerSchemaCompatibility.incompatible() when arity of nested serializers don't match in CompositeTypeSerializerSnapshot's compatibility check

2019-01-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11406:
---
Labels: pull-request-available  (was: )

> Return TypeSerializerSchemaCompatibility.incompatible() when arity of nested 
> serializers don't match in CompositeTypeSerializerSnapshot's compatibility 
> check
> -
>
> Key: FLINK-11406
> URL: https://issues.apache.org/jira/browse/FLINK-11406
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Right now, in 
> {{CompositeTypeSerializerSnapshot.resolveSchemaCompatibility(...)}}, if arity 
> of nested serializers don't match between the snapshot and the provided new 
> serializer, a runtime exception is thrown.
> More ideally, this should return 
> {{TypeSerializerSchemaCompatibility.incompatible()}}.
> We should also make it more clearer in the Javadocs that the 
> {{CompositeTypeSerializerSnapshot}} is intended for product serializers that 
> have a fixed arity of nested serializers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai opened a new pull request #7557: [FLINK-11406] [core] Return INCOMPATIBLE when nested serializers arity don't match in CompositeTypeSerializerSnapshot

2019-01-22 Thread GitBox
tzulitai opened a new pull request #7557: [FLINK-11406] [core] Return 
INCOMPATIBLE when nested serializers arity don't match in 
CompositeTypeSerializerSnapshot
URL: https://github.com/apache/flink/pull/7557
 
 
   ## What is the purpose of the change
   
   Right now, in 
`CompositeTypeSerializerSnapshot.resolveSchemaCompatibility(...)`, if arity of 
nested serializers don't match between the snapshot and the provided new 
serializer, a runtime exception is thrown.
   
   More ideally, this should return 
`TypeSerializerSchemaCompatibility.incompatible()`.
   
   ## Verifying this change
   
   A new unit test 
`CompositeTypeSerializerSnapshotTest.testNestedFieldSerializerArityMismatchPrecedence`
 covers this change.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL edited a comment on issue #7529: [hotfix] [blob] Generalize usage of NoOpTransientBlobService

2019-01-22 Thread GitBox
GJL edited a comment on issue #7529: [hotfix] [blob] Generalize usage of 
NoOpTransientBlobService
URL: https://github.com/apache/flink/pull/7529#issuecomment-456470455
 
 
   Are you planning to use the `NoOpTransientBlobService` somewhere else? If 
not, I would not like to merge this PR. I find it hard to believe that an 
implementation that throws an exception for almost every method can be used for 
testing in a sensible way. Also `enum` is a common strategy to implement 
singletons.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7529: [hotfix] [blob] Generalize usage of NoOpTransientBlobService

2019-01-22 Thread GitBox
GJL commented on issue #7529: [hotfix] [blob] Generalize usage of 
NoOpTransientBlobService
URL: https://github.com/apache/flink/pull/7529#issuecomment-456470455
 
 
   Are you planning to use the `NoOpTransientBlobService` somewhere else? If 
not, I would not like to merge this PR. I find it hard to believe that an 
implementation that throws an exception for almost every method can be used for 
testing in a sensible way. Also `enum` are a common strategy to implement 
singletons.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-22 Thread GitBox
pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 
can not be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#issuecomment-456466692
 
 
   Thanks for showing the interest @tvielgouarin in this topic! 
   
   > But for migration tests of universal FlinkKafkaProducer between Flink 
versions : I read in the doc that this connector is supported since flink 1.7 . 
So currently, no migration test should be passing 
   
   Both for the universal and `FlinkKafkaProducer011` the most important test 
to catch regressions is to test restoring from savepoint that was taken using 
the latest stable version (`1.7`). It would test if current code in the master 
(future `1.8` release) has state compatibility with `1.7` savepoints.
   
   > Also, do we just want to check that the connector can start from a 
previous version checkpoint ?
   
   I think at least for now only checking the compatibility with `1.7` is 
enough.
   
   > Most of the time the test fail with `The producer attempted to use a 
producer id which is not currently assigned to its transactional id`

   I guess your test is failing because you are restoring Flink's state from 
savepoint correctly, but you are ignoring the state of Kafka brokers. I haven't 
thought about this issue before and it looks like nobody else did it as well... 
By the way, thanks for discovering this issue. I guess this is the first 
migration test that must take care of some external state (besides Flink's 
internal state). Probably Kafka migration tests must store not only "savepoint" 
in the resources but also in one way or another store the state of kafka 
cluster as it was just after completing the savepoint from which we want to 
restore (transaction log? topics content?) Whatever that means... Couple of 
random thoughts:
   
   1. it might be possible to identify the list of files that define the 
internal state of Kafka that we need to archive, place it in the resources 
alongside of savepoint and use it during `KafkaTestBase` initialisation
   2. maybe it will be better to express this logic in end to end test?
   
   CC @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-22 Thread GitBox
pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 
can not be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#issuecomment-456466692
 
 
   Thanks for showing the interest @tvielgouarin in this topic! 
   
   > But for migration tests of universal FlinkKafkaProducer between Flink 
versions : I read in the doc that this connector is supported since flink 1.7 . 
So currently, no migration test should be passing 
   
   Both for the universal and `FlinkKafkaProducer011` the most important test 
to catch regressions is to test restoring from savepoint that was taken using 
the latest stable version (`1.7`). It would test if current code in the master 
(future `1.8` release) has state compatibility with `1.7` savepoints.
   
   > Also, do we just want to check that the connector can start from a 
previous version checkpoint ?
   
   I think at least for now only checking the compatibility with `1.7` is 
enough.
   
   > Most of the time the test fail with `The producer attempted to use a 
producer id which is not currently assigned to its transactional id`

   I guess your test is failing because you are restoring Flink's state from 
savepoint correctly, but you are ignoring the state of Kafka brokers. I haven't 
thought about this issue before and it looks like nobody else did it as well... 
By the way, thanks for discovering this issue. I guess this is the first 
migration test that must take care of some external state (besides Flink's 
internal state). Probably Kafka migration tests must store not only "savepoint" 
in the resources but also in one way or another store the state of kafka 
cluster as it was just after completing the savepoint from which we want to 
restore (transaction log? topics content?) Whatever that means... Couple of 
random thoughts:
   
   1. it might be possible to identify the list of files that define the 
internal state of Kafka that we need to archive, place it in the resources 
alongside of savepoint and use it during `KafkaTestBase` initialisation
   2. maybe it will be better to express this logic in end to end tests?
   
   CC @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-22 Thread GitBox
pnowojski commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not 
be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#issuecomment-456466692
 
 
   Thanks for showing the interest @tvielgouarin in this topic!
   
   > But for migration tests of universal FlinkKafkaProducer between Flink 
versions : I read in the doc that this connector is supported since flink 1.7 . 
So currently, no migration test should be passing 
   
   Both for the universal and `FlinkKafkaProducer011` the most important test 
to catch regressions is to test restoring from savepoint that was taken using 
the latest stable version (`1.7`). It would test if current code in the master 
(future `1.8` release) has state compatibility with `1.7` savepoints.
   
   > Also, do we just want to check that the connector can start from a 
previous version checkpoint ?
   
   I think at least for now only checking the compatibility with `1.7` is 
enough.
   
   > Most of the time the test fail with `The producer attempted to use a 
producer id which is not currently assigned to its transactional id`

   I guess your test is failing because you are restoring Flink's state from 
savepoint correctly, but you are ignoring the state of Kafka brokers. I haven't 
thought about this issue before and it looks like nobody else did it as well... 
I guess this is the first migration test that must take care of some external 
state (besides Flink's internal state). Probably Kafka migration tests must 
store not only "savepoint" in the resources but also in one way or another 
store the state of kafka cluster as it was just after completing the savepoint 
from which we want to restore (transaction log? topics content?) Whatever that 
means... Couple of random thoughts:
   
   1. it might be possible to identify the list of files that define the 
internal state of Kafka that we need to archive, place it in the resources 
alongside of savepoint and use it during `KafkaTestBase` initialisation
   2. maybe it will be better to express this logic in end to end tests?
   
   CC @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11408) ContinuousProcessingTimeTrigger: NPE on clear() and state is lost on merge

2019-01-22 Thread Cristian (JIRA)
Cristian created FLINK-11408:


 Summary: ContinuousProcessingTimeTrigger: NPE on clear() and state 
is lost on merge
 Key: FLINK-11408
 URL: https://issues.apache.org/jira/browse/FLINK-11408
 Project: Flink
  Issue Type: Bug
 Environment: Put both bugs in 
[https://github.com/casidiablo/flink-continuous-processing-trigger-bugs]

This is running Flink 1.7.1 locally.
Reporter: Cristian


I was testing session windows using processing time and found a couple of 
problems with the 

ContinuousProcessingTimeTrigger. The first one is an NPE in the clear method:

[https://github.com/casidiablo/flink-continuous-processing-trigger-bugs/blob/master/src/main/java/flink/bug/Bug1.java]

The second one, which is most likely related and the root cause of the first 
one, is that the way the state is merged for windows that are merged somehow 
makes it so that the trigger gets confused and it stops triggering:

 

[https://github.com/casidiablo/flink-continuous-processing-trigger-bugs/blob/master/src/main/java/flink/bug/Bug2.java]

 

I managed to solve both of these by using a modified version of 

ContinuousProcessingTimeTrigger which does NOT call 
`ctx.mergePartitionedState(stateDesc);` in the onMerge method.

This is what I understand it happens at the trigger level:
 * The first element in the stream sets an initial fire time (logic is in

ContinuousProcessingTimeTrigger#onElement()), if there is no trigger set.
 * From then on, ContinuousProcessingTimeTrigger#onProcessingTime() is 
responsible for scheduling the next trigger. 
 * When the windows are merged (in the case of session windows), somehow the 
clear and merge methods are called using the wrong window namespace (I think 
this is the root cause of the bug, but I'm not too familiar with that code).
 * Because the state is not cleared properly in the new window namespace, the 
previously scheduled trigger gets executed against the window that was cleared.
 * Moreover, the new window has the state of the previous window, which means 
that:
 ## onElement will NOT schedule a fire trigger
 ## onProcessingTime will never be called at all



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API

2019-01-22 Thread GitBox
tzulitai commented on issue #7553: [FLINK-11328] [core] Migrate all 
parameterless serializers to use new serialization compatibility API
URL: https://github.com/apache/flink/pull/7553#issuecomment-456460449
 
 
   Thanks! Will merge after Travis confirms green ..


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11407) Allow providing reason messages for TypeSerializerSchemaCompatibility.incompatible()

2019-01-22 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-11407:
---

 Summary: Allow providing reason messages for 
TypeSerializerSchemaCompatibility.incompatible()
 Key: FLINK-11407
 URL: https://issues.apache.org/jira/browse/FLINK-11407
 Project: Flink
  Issue Type: Improvement
  Components: Type Serialization System
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: 1.8.0


There are a few different scenarios where a new serializer can be determined 
incompatible in a compatibility check.

Allowing the incompatible result to be accompanied by a message indicating why 
the new serializer is incompatible would be beneficial, and allows the state 
backends to throw more meaningful exceptions when they do encounter an 
incompatible new serializer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API

2019-01-22 Thread GitBox
dawidwys commented on issue #7553: [FLINK-11328] [core] Migrate all 
parameterless serializers to use new serialization compatibility API
URL: https://github.com/apache/flink/pull/7553#issuecomment-456450150
 
 
   Thanks @tzulitai for the update. It looks good for me now. +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11336) Flink HA didn't remove ZK metadata

2019-01-22 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16748851#comment-16748851
 ] 

Stephan Ewen commented on FLINK-11336:
--

What way did you start Flink?

  - standalone
  - Yarn (per job or as a session)
  - Mesos
  - Container

> Flink HA didn't remove ZK metadata
> --
>
> Key: FLINK-11336
> URL: https://issues.apache.org/jira/browse/FLINK-11336
> Project: Flink
>  Issue Type: Improvement
>Reporter: shengjk1
>Priority: Major
> Attachments: image-2019-01-15-19-42-21-902.png
>
>
> Flink HA didn't remove ZK metadata
> such as 
> go to zk cli  : ls /flinkone
> !image-2019-01-15-19-42-21-902.png!
>  
> i suggest we should delete this metadata when the application  cancel or 
> throw exception



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11406) Return TypeSerializerSchemaCompatibility.incompatible() when arity of nested serializers don't match in CompositeTypeSerializerSnapshot's compatibility check

2019-01-22 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-11406:
---

 Summary: Return TypeSerializerSchemaCompatibility.incompatible() 
when arity of nested serializers don't match in 
CompositeTypeSerializerSnapshot's compatibility check
 Key: FLINK-11406
 URL: https://issues.apache.org/jira/browse/FLINK-11406
 Project: Flink
  Issue Type: Improvement
  Components: Type Serialization System
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: 1.8.0


Right now, in 
{{CompositeTypeSerializerSnapshot.resolveSchemaCompatibility(...)}}, if arity 
of nested serializers don't match between the snapshot and the provided new 
serializer, a runtime exception is thrown.

More ideally, this should return 
{{TypeSerializerSchemaCompatibility.incompatible()}}.
We should also make it more clearer in the Javadocs that the 
{{CompositeTypeSerializerSnapshot}} is intended for product serializers that 
have a fixed arity of nested serializers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API

2019-01-22 Thread GitBox
tzulitai commented on issue #7553: [FLINK-11328] [core] Migrate all 
parameterless serializers to use new serialization compatibility API
URL: https://github.com/apache/flink/pull/7553#issuecomment-456445785
 
 
   @dawidwys I've addressed your comments in fixup commit 6c7a3fe.
   Could you take a look and let me know if there are other issues, or a +1 if 
it looks good?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StephanEwen commented on a change in pull request #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.

2019-01-22 Thread GitBox
StephanEwen commented on a change in pull request #7313: 
[FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers.
URL: https://github.com/apache/flink/pull/7313#discussion_r249832214
 
 

 ##
 File path: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
 ##
 @@ -121,24 +127,14 @@ public boolean supportsResume() {
return true;
}
 
-   @VisibleForTesting
-   static org.apache.hadoop.fs.Path generateStagingTempFilePath(
-   org.apache.hadoop.fs.FileSystem fs,
-   org.apache.hadoop.fs.Path targetFile) throws 
IOException {
-
+   private static org.apache.hadoop.fs.Path 
generateStagingTempFilePath(org.apache.hadoop.fs.Path targetFile) {
checkArgument(targetFile.isAbsolute(), "targetFile must be 
absolute");
 
final org.apache.hadoop.fs.Path parent = targetFile.getParent();
final String name = targetFile.getName();
 
checkArgument(parent != null, "targetFile must not be the root 
directory");
 
-   while (true) {
-   org.apache.hadoop.fs.Path candidate = new 
org.apache.hadoop.fs.Path(
-   parent, "." + name + ".inprogress." + 
UUID.randomUUID().toString());
-   if (!fs.exists(candidate)) {
-   return candidate;
-   }
-   }
+   return new org.apache.hadoop.fs.Path(parent, "." + name + 
".inprogress");
 
 Review comment:
   I added this as a simple way to avoid collisions, without relying on the 
semantics of the code that invokes this.
   
   I would try and think through what happens after a failure/recovery when an 
old task (running on a network-partitioned TM that has not yet cancelled) is 
still writing to the same file as a recovered task. Can that happen? If yes, 
what is the behavior then? Do we reduce the guarantees in that case? The 
partitioned TM should not be able to commit a checkpoint (to not roll over to a 
proper file), but can it mess up / overwrite the staging contents? My initial 
thought is that it probably could - not 100% sure though how HDFS leases would 
behave then, could they prevent that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-22 Thread GitBox
azagrebin commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r249829200
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ##
 @@ -370,8 +370,8 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes,
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
+   final T value = 
deserializer.deserialize(
 
 Review comment:
   I would be ok with having `Exception` in the signature of `deserialize`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-22 Thread GitBox
tvielgouarin edited a comment on issue #7405: [FLINK-11249] 
FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#issuecomment-456391275
 
 
   hi @pnowojski , @yanghua ,
   I come from @cjolif 's team and I am currently working on the implementation 
of the Migration test in order to accelerate the merge of this PR. I came here 
with several questions in mind. I do understand that it needs to implement:
   
   - migration tests of `FlinkKafkaProducer011` between Flink versions
   - migration test from `FlinkKafkaProducer011` to universal 
`FlinkKafkaProducer`
   
   But for migration tests of universal `FlinkKafkaProducer` between Flink 
versions :  I read in the doc that this connector is supported since flink 1.7 
. So currently,  no migration test should be passing ? 
   
   Also, do we just want to check that the connector can start from a previous 
version checkpoint ?
   
   Furthermore I've worked on a simple implementation of Migration test for 
`FlinkKafkaProducer`: 
   
https://github.com/tvielgouarin/flink/commit/973756430c226a0cd5011fedc1eab1345a27cc2a
   
   Most of the time the test fail with `The producer attempted to use a 
producer id which is not currently assigned to its transactional id`. I'm not 
sure, but I guess it's because there is a different KafkaServer's instance 
between the snapshot version and the test, and the transactional Id 
assignations are lost (?). If so, do you have and idea of the the best work 
around ? 
   
   Anyway if you can have a look at what I've done and criticize it, it would 
be much appreciated. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on issue #6873: [hotfix] Add Review Progress section to PR description template.

2019-01-22 Thread GitBox
fhueske commented on issue #6873: [hotfix] Add Review Progress section to PR 
description template.
URL: https://github.com/apache/flink/pull/6873#issuecomment-456436978
 
 
   Sorry, did not read your comment before I sent out the mail. 
   You can just reply to the thread. Maybe, we can even leave out this PR and 
go directly with the bot.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-22 Thread GitBox
tvielgouarin edited a comment on issue #7405: [FLINK-11249] 
FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#issuecomment-456391275
 
 
   hi @pnowojski , @yanghua ,
   I come from @cjolif 's team and I am currently working on the implementation 
of the Migration test in order to accelerate the merge of this PR. I came here 
with several questions in mind. I do understand that it needs to implement:
   
   - migration tests of `FlinkKafkaProducer011` between Flink versions
   - migration test from `FlinkKafkaProducer011` to universal 
`FlinkKafkaProducer`
   
   for migration tests of universal `FlinkKafkaProducer` between Flink versions 
:  I read in the doc that this connector is supported since flink 1.7 . So 
currently,  no migration test should be passing ? 
   
   Also what should be the validation conditions ( producerConfig stay the same 
 (?) ) 
   
   Furthermore I've worked on a simple implementation of Migration test for 
`FlinkKafkaProducer`: 
   
https://github.com/tvielgouarin/flink/commit/973756430c226a0cd5011fedc1eab1345a27cc2a
   
   Most of the time the test fail with `The producer attempted to use a 
producer id which is not currently assigned to its transactional id`. I'm not 
sure, but I guess it's because there is a different KafkaServer's instance 
between the snapshot version and the test, and the transactional Id 
assignations are lost (?). If so, do you have and idea of the the best work 
around ? 
   
   Anyway if you can have a look at what I've done and criticize it, it would 
be much appreciated. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-22 Thread GitBox
tvielgouarin edited a comment on issue #7405: [FLINK-11249] 
FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#issuecomment-456391275
 
 
   hi @pnowojski , @yanghua ,
   I come from @cjolif 's team and I am currently working on the implementation 
of the Migration test in order to accelerate the merge of this PR. I came here 
with several questions in mind. I do understand that it needs to implement:
   
   - migration tests of `FlinkKafkaProducer011` between Flink versions
   - migration test from `FlinkKafkaProducer011` to universal 
`FlinkKafkaProducer`
   
   for migration tests of universal `FlinkKafkaProducer` between Flink versions 
:  I read in the doc that this connector is supported since flink 1.7 . So 
currently,  no migration test should be applied ? 
   
   However what should be the validation conditions ( producerConfig stay the 
same  (?) ) 
   
   Furthermore I've worked on a simple implementation of Migration test for 
`FlinkKafkaProducer`: 
   
https://github.com/tvielgouarin/flink/commit/973756430c226a0cd5011fedc1eab1345a27cc2a
   
   Most of the time the test fail with `The producer attempted to use a 
producer id which is not currently assigned to its transactional id`. I'm not 
sure, but I guess it's because there is a different KafkaServer's instance 
between the snapshot version and the test, and the transactional Id 
assignations are lost (?). If so, do you have and idea of the the best work 
around ? 
   
   Anyway if you can have a look at what I've done and criticize it, it would 
be much appreciated. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11401) Allow compression on ParquetBulkWriter

2019-01-22 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16748795#comment-16748795
 ] 

Stephan Ewen commented on FLINK-11401:
--

I can see that being useful.

Please bear in mind that bulk writers currently have the implication that they 
need to roll on checkpoint, because many formats (like Parquet) don't make it 
easy to intermediately persist and resume writes.
Avro's row-by-row append nature makes it possible to write part files across 
checkpoints.

One could think of letting the row-formats add a header, when opening a part 
file. That would allow the Avro writes to keep the property of writing part 
files across checkpoints.

> Allow compression on ParquetBulkWriter
> --
>
> Key: FLINK-11401
> URL: https://issues.apache.org/jira/browse/FLINK-11401
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.7.1
>Reporter: Fokko Driesprong
>Assignee: Fokko Driesprong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kezhuw opened a new pull request #7556: [hotfix][javadocs] Fix typos in javadoc

2019-01-22 Thread GitBox
kezhuw opened a new pull request #7556: [hotfix][javadocs] Fix typos in javadoc
URL: https://github.com/apache/flink/pull/7556
 
 
   Multiple typo fixes since I read Flink docs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-22 Thread GitBox
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r249798168
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ##
 @@ -370,8 +370,8 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes,
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
+   final T value = 
deserializer.deserialize(
 
 Review comment:
   Methods of 0.8 _ConsumerRecord_ have different signatures compared to modern 
Kafka - namely they throw base Exception.
   
   1. Change signature of deserialize(ConsumerRecord) to throw Exception 
instead of IOException. 
   2. Wrap and re-throw in every deserialize(ConsumerRecord)  implementation 
which is 0.8 or for Kafka-base (KeyedDeserializationSchemaWrapper, 
JSONKeyValueDeserializationSchema, NonContinousOffsetsDeserializationSchema and 
TypeInformationKeyValueSerializationSchema) 
   
   @azagrebin, @stevenzwu which approach do you prefer?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-22 Thread GitBox
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r249798168
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ##
 @@ -370,8 +370,8 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes,
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
+   final T value = 
deserializer.deserialize(
 
 Review comment:
   Methods of 0.8 _ConsumerRecord_ have different signatures comparing to 
modern Kafka - namely they throw base Exception.
   
   1. Change signature of deserialize(ConsumerRecord) to throw Exception 
instead of IOException. 
   2. Wrap and re-throw in every deserialize(ConsumerRecord)  implementation 
which is 0.8 or for Kafka-base (KeyedDeserializationSchemaWrapper, 
JSONKeyValueDeserializationSchema, NonContinousOffsetsDeserializationSchema and 
TypeInformationKeyValueSerializationSchema) 
   
   @azagrebin, @stevenzwu which approach do you prefer?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmetzger commented on issue #6873: [hotfix] Add Review Progress section to PR description template.

2019-01-22 Thread GitBox
rmetzger commented on issue #6873: [hotfix] Add Review Progress section to PR 
description template.
URL: https://github.com/apache/flink/pull/6873#issuecomment-456420688
 
 
   Nice. I'm almost done with the bot (famous last words in software 
engineering).
   I might be able to finish it till tomorrow. Should we wait with the 
announcement till then?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249801381
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
+val inNames = calc.getInput.getRowType.getFieldNames
+calc.getProgram.getNamedProjects
+  .map(p => {
+calc.getProgram.expandLocalRef(p.left) match {
+  // output field is forwarded input field
+  case r: RexInputRef => (r.getIndex, p.right)
+  // output field is renamed input field
+  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+a.getOperands.get(0) match {
+  case ref: RexInputRef =>
+(ref.getIndex, p.right)
+  case _ =>
+(-1, p.right)
 
 Review comment:
   Yes, I am trying to refactor it as what you suggested. :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249795484
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, child)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+new FlinkLogicalUpsertToRetraction(cluster, traitSet, inputs.get(0), 
keyNames)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+val child = this.getInput
+val rowCnt = mq.getRowCount(child)
+// take rowCnt and fieldCnt into account, so that cost will be smaller 
when generate
+// UpsertToRetractionConverter after Calc.
+planner.getCostFactory.makeCost(rowCnt, rowCnt * 
child.getRowType.getFieldCount, 0)
 
 Review comment:
   Shouldn't you use `RelMetadataQuery#getAverageRowSize()` or at least 
duplicate the logic of `FlinkLogicalJoinBase#computeSelfCost` instead of using 
`getFieldCount`?
   
   Also Isn't the io cost missing? To be consistent with join cost this should 
return:
   ```
   planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(child.getRowType))
   ```
   
   edit: probably a better idea is to use `estimateRowSize` and maybe in the 
future update it to use `RelMetadataQuery#getAverageRowSize()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249800524
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
 
 Review comment:
   The reason I use `getInputFromOutputName` is the return type can be an 
`Option[String]`, because any two output names will surely not be the same. If 
we use `mapInputToOutputName` then we need return a Option[Seq[String]]. Both 
methods can solve the problem. Only we have to do is to define the method 
clearly.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249797875
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
 
 Review comment:
   Isn't this class missing a proper implementation of 
`AbstractRelNode#accept(org.apache.calcite.rex.RexShuttle)` method? It is used 
by calcite for example for gathering the "used" columns by this node. If 
`FlinkLogicalUpsertToRetraction#accept` do not visit primary key, I would be 
afraid that primary key might be pruned from the input of 
`FlinkLogicalUpsertToRetraction`.
   
   Also it is used for renaming fields etc. After optimisation your `keyNames: 
Seq[String]` can be not valid any more.
   
   As far as I (vaguely) recall from when I was implementing Temporal Table 
Joins, storing fields in form of `String` references was not very useful and 
using `RexNode` was a better option. 
   
   Probably this needs more investigation and some additional unit test(s).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249795484
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, child)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+new FlinkLogicalUpsertToRetraction(cluster, traitSet, inputs.get(0), 
keyNames)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+val child = this.getInput
+val rowCnt = mq.getRowCount(child)
+// take rowCnt and fieldCnt into account, so that cost will be smaller 
when generate
+// UpsertToRetractionConverter after Calc.
+planner.getCostFactory.makeCost(rowCnt, rowCnt * 
child.getRowType.getFieldCount, 0)
 
 Review comment:
   Shouldn't you use `RelMetadataQuery#getAverageRowSize()` or at least 
duplicate the logic of `FlinkLogicalJoinBase#computeSelfCost` instead of using 
`getFieldCount`?
   
   Also Isn't the io cost missing? To be consistent with join cost this should 
return:
   ```
   planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(child.getRowType))
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on issue #6873: [hotfix] Add Review Progress section to PR description template.

2019-01-22 Thread GitBox
fhueske commented on issue #6873: [hotfix] Add Review Progress section to PR 
description template.
URL: https://github.com/apache/flink/pull/6873#issuecomment-456417079
 
 
   I've updated the PR.
   
   Before merging I'll post to the dev mailing list to remind everybody about 
the upcoming change and the new review process.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-22 Thread GitBox
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r249798168
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ##
 @@ -370,8 +370,8 @@ else if (partitionsRemoved) {

keyPayload.get(keyBytes);
}
 
-   final T value = 
deserializer.deserialize(keyBytes, valueBytes,
-   
currentPartition.getTopic(), currentPartition.getPartition(), offset);
+   final T value = 
deserializer.deserialize(
 
 Review comment:
   Methods of 0.8 Consumer have different signatures compared to modern Kafka - 
namely they throw base Exception.
   
   1. Change signature of deserialize(ConsumerRecord) to throw Exception 
instead of IOException. 
   2. Wrap and re-throw in every deserialize(ConsumerRecord)  implementation 
which is 0.8 or for Kafka-base (KeyedDeserializationSchemaWrapper, 
JSONKeyValueDeserializationSchema, NonContinousOffsetsDeserializationSchema and 
TypeInformationKeyValueSerializationSchema) 
   
   @azagrebin, @stevenzwu which approach do you prefer?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249790833
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, 
unaryNode}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import java.lang.{Boolean => JBool}
+
+class FromUpsertStreamTest extends TableTestBase {
+
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+  @Test
+  def testRemoveUpsertToRetraction() = {
+streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))](
+  "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+UpsertTableNode(0),
+term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime",
+  "CAST(rowtime) AS rowtime")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = {
 
 Review comment:
   Yes, that was my point. If you drop the first condition it will also 
simplify this test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249788943
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
 
 Review comment:
   Also ditto :) I think here you shouldn't care if the field is duplicated or 
not, so if you decide not to use `mapInputToOutputName` method, implementing 
`CommonCalc#getInputToOutputNamesMapping()` that return a map or multimap is 
probably a better option then `Seq[String]`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249790503
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
+val inNames = calc.getInput.getRowType.getFieldNames
+calc.getProgram.getNamedProjects
+  .map(p => {
+calc.getProgram.expandLocalRef(p.left) match {
+  // output field is forwarded input field
+  case r: RexInputRef => (r.getIndex, p.right)
+  // output field is renamed input field
+  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+a.getOperands.get(0) match {
+  case ref: RexInputRef =>
+(ref.getIndex, p.right)
+  case _ =>
+(-1, p.right)
 
 Review comment:
   What do you mean? 
   
   Maybe to rephrase my comment: I think that instead of having this `match 
case` construct a better idea might to implement it's logic as 
`RexVisitorImpl`, probably as a private static nested class inside 
`FlinkLogicalCalc` or `CommonCalc`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249786791
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
 
 Review comment:
   Good point with `CommonCalc`. Probably it's a better place (btw, doesn't 
Calcite have some util method for that that works on `Calc`?).
   
   Regarding `getInputFromOutputName`. I don't mind which method you would add, 
both `getInputFromOutputName` and `mapInputToOutputNames` are valuable 
addition. However will the `getInputFromOutputName` help you? Don't you need 
the mapping in the opposite direction? With `calc` following the 
`UpsertToRetraction` node, you want to convert key names after transposing 
those two nodes. So you need to perform an operation like this:
   ```
   val keyNameAfterTransposition = 
calc.mapInputToOutputName(upsertToRetraction.key)
   ```
   or more precisely:
   ```
   val newKeys = upsertToRetraction.keyNames.map(calc.mapInputToOutputName) // 
plus code to checkstate that `Option` is present
   ```
   Right? 
   
   Secondly:
   > A field may contain multi-output. For example, `select key as key1, key as 
key2 from T`
   
   1. Does it matter? I guess one possibility would be to return the first 
encountered of those fields
   2. `Seq[String] mapInputToOutputNames(String input)` is also an option. It 
might be more general. Even you could implement: 
   ```
   Option[String] mapInputToOutputName(String input) {
 return mapInputToOutputNames.collectFirst(input);
   }
   ``` 


This is an automated 

[GitHub] GJL commented on a change in pull request #7524: [FLINK-11351][tests] Port JobManagerCleanupITCase to new code base

2019-01-22 Thread GitBox
GJL commented on a change in pull request #7524:  [FLINK-11351][tests] Port 
JobManagerCleanupITCase to new code base
URL: https://github.com/apache/flink/pull/7524#discussion_r249786009
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
 ##
 @@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Small test to check that the {@link 
org.apache.flink.runtime.blob.BlobServer} cleanup is executed
+ * after job termination.
+ */
+public class BlobsCleanupITCase extends TestLogger {
+
+   private static final long RETRY_INTERVAL = 100L;
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static MiniClusterResource miniClusterResource;
+
+   private static Configuration configuration;
+
+   private static File blobBaseDir;
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   blobBaseDir = TEMPORARY_FOLDER.newFolder();
+
+   configuration = new Configuration();
+   configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, 
blobBaseDir.getAbsolutePath());
+   configuration.setString(ConfigConstants.RESTART_STRATEGY, 
"fixeddelay");
+   
configuration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
1);
+   // BLOBs are deleted from BlobCache between 1s and 2s after 
last reference
+   // -> the BlobCache may still have the BLOB or not (let's test 
both cases randomly)
+   configuration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
 
 Review comment:
   I guess it would be wrong to allow modifications of `configuration` in 
tests. I would use `UnmodifiableConfiguration` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2019-01-22 Thread GitBox
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] 
[flink-connectors] Add ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r249787929
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ##
 @@ -82,93 +84,110 @@
  */
 @Internal
 public abstract class FlinkKafkaConsumerBase extends 
RichParallelSourceFunction implements
-   CheckpointListener,
-   ResultTypeQueryable,
-   CheckpointedFunction {
+   CheckpointListener,
 
 Review comment:
   
   522589e


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API

2019-01-22 Thread GitBox
dawidwys commented on issue #7553: [FLINK-11328] [core] Migrate all 
parameterless serializers to use new serialization compatibility API
URL: https://github.com/apache/flink/pull/7553#issuecomment-456406468
 
 
   Hi @tzulitai I had a look just at the commit you linked. I've put some 
suggestions. Most of it is just nitpicking, the biggest concern I have is maybe 
we should still return a fixed size for `EventIdSerializer`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249770611
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
 
 Review comment:
   A field may contain multi-output. For example, `select key as key1, key as 
key2 from T`.
   
   Maybe refactor it into:
   class CommonCalc:
   ```
 /**
  * Returns empty if output field is not forwarded from the input for the 
calc.
  */
 Option[String] getInputFromOutputName(Calc calc, String output)
   ```
   Put it in `CommonCalc` so that this method can be used by logical and 
physical calc nodes. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249775473
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249775396
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
+val inNames = calc.getInput.getRowType.getFieldNames
+calc.getProgram.getNamedProjects
+  .map(p => {
+calc.getProgram.expandLocalRef(p.left) match {
+  // output field is forwarded input field
+  case r: RexInputRef => (r.getIndex, p.right)
+  // output field is renamed input field
+  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+a.getOperands.get(0) match {
+  case ref: RexInputRef =>
+(ref.getIndex, p.right)
+  case _ =>
+(-1, p.right)
 
 Review comment:
   Maybe we can use `RexVisitorImpl` in  `FlinkLogicalCalc`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-22 Thread GitBox
tvielgouarin edited a comment on issue #7405: [FLINK-11249] 
FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#issuecomment-456391275
 
 
   hi @pnowojski , @yanghua ,
   I come from @cjolif 's team and I am currently working on the implementation 
of the Migration test in order to accelerate the merge of this PR. I came here 
with several questions in mind. I do understand that it needs to implement:
   
   - migration tests of `FlinkKafkaProducer011` between Flink versions
   - migration tests of universal `FlinkKafkaProducer` between Flink versions
   - migration test from `FlinkKafkaProducer011` to universal 
`FlinkKafkaProducer`
   
   However what should be the validation conditions ( producerConfig stay the 
same  (?) ) 
   
   Furthermore I've worked on a simple implementation of Migration test for 
`FlinkKafkaProducer`: 
   
https://github.com/tvielgouarin/flink/commit/973756430c226a0cd5011fedc1eab1345a27cc2a
   
   Most of the time the test fail with `The producer attempted to use a 
producer id which is not currently assigned to its transactional id`. I'm not 
sure, but I guess it's because there is a different KafkaServer's instance 
between the snapshot version and the test, and the transactional Id 
assignations are lost (?). If so, do you have and idea of the the best work 
around ? 
   
   Anyway if you can have a look at what I've done and criticize it, it would 
be much appreciated. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249770611
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
 
 Review comment:
   A field may contain multi-output. For example, `select key as key1, key as 
key2 from T`.
   Maybe refactor it into:
   class FlinkLogicalCalc:
   ```
 /**
  * Returns empty if output field is not forwarded from the input.
  */
 Option[String] getInputFromOutputName(String output)
   ```
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tvielgouarin commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-22 Thread GitBox
tvielgouarin commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can 
not be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#issuecomment-456391275
 
 
   hi @pnowojski , @yanghua ,
   I come from @cjolif 's team and I am currently working on the implementation 
of the Migration test in order to accelerate the merge of this PR. I came here 
with several questions in mind. I do understand that it needs to implement:
   
   - migration tests of `FlinkKafkaProducer011` between Flink versions
   - migration tests of universal `FlinkKafkaProducer` between Flink versions
   - migration test from `FlinkKafkaProducer011` to universal 
`FlinkKafkaProducer`
   
   However what should be the validation conditions ( producerConfig stay the 
same  (?) ) 
   
   Furthermore I've worked on a simple implementation of Migration test for 
`FlinkKafkaProducer`: 
   
https://github.com/tvielgouarin/flink/commit/973756430c226a0cd5011fedc1eab1345a27cc2a
   
   Most of the time the test fail with `The producer attempted to use a 
producer id which is not currently assigned to its transactional id`. I'm not 
sure, but I guess it's because there is a different KafkaServer's instance 
between the snapshot version and the test, and the transactional Id 
assignations are lost (?). If so, do you have and idea of the the best work 
around ? 
   
   Anyway if you can have a look at what I've done a criticize it, it would be 
much appreciated. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249762211
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
 
 Review comment:
   Good point! It is better to remove `calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount` and keep 
`fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)`. The first condition 
should be solved by cost, the second condition guards the correctness.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249760569
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
 
 Review comment:
   I will remove the comments. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249759883
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, 
unaryNode}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import java.lang.{Boolean => JBool}
+
+class FromUpsertStreamTest extends TableTestBase {
+
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+  @Test
+  def testRemoveUpsertToRetraction() = {
+streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))](
+  "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+UpsertTableNode(0),
+term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime",
+  "CAST(rowtime) AS rowtime")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = {
 
 Review comment:
   > But testCalcTransposeUpsertToRetraction is a subset of 
testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose. 
   
   Ok, I will remove `testCalcTransposeUpsertToRetraction()`
   
   > On the other hand you are not testing for the condition:
   
   I think both conditions have been tested as it is an `&`?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11373) CliFrontend cuts off reason for error messages

2019-01-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11373:
---
Labels: pull-request-available starter  (was: starter)

> CliFrontend cuts off reason for error messages
> --
>
> Key: FLINK-11373
> URL: https://issues.apache.org/jira/browse/FLINK-11373
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.5.6, 1.6.3, 1.7.1
>Reporter: Maximilian Michels
>Assignee: leesf
>Priority: Minor
>  Labels: pull-request-available, starter
>
> The CliFrontend seems to only print the first message in the strace trace and 
> not any of its causes.
> {noformat}
> bin/flink run /non-existing/path
> Could not build the program from JAR file.
> Use the help option (-h or --help) to get help on the command.
> {noformat}
> Notice, the underlying cause of this message is FileNotFoundException.
> Consider changing 
> a) the error message for this particular case 
> b) the way the stack trace messages are trimmed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] leesf opened a new pull request #7555: [FLINK-11373] CliFrontend cuts off reason for error messages

2019-01-22 Thread GitBox
leesf opened a new pull request #7555: [FLINK-11373] CliFrontend cuts off 
reason for error messages
URL: https://github.com/apache/flink/pull/7555
 
 
   
   ## What is the purpose of the change
   
   Show error message of CliArgsException in CliFrontend.
   
   
   ## Brief change log
   
   Change System.out.println(e.getMessage()) to e.printStackTrace().
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #7438: [FLINK-11282][network] 
Merge StreamRecordWriter into RecordWriter
URL: https://github.com/apache/flink/pull/7438#discussion_r249741465
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
 ##
 @@ -71,7 +71,7 @@ public void setUp(long flushTimeout) throws Exception {
 *
 * @param flushTimeout
 *  output flushing interval of the
-*  {@link 
org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher 
thread
+*  {@link 
org.apache.flink.runtime.io.network.api.writer}'s output flusher thread
 
 Review comment:
   nit: missing `RecordWriter` reference?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249726595
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
 
 Review comment:
   Either rename the method or drop the `// key fields should not be changed` 
comment. Usually something is wrong if you need to write a comment explaining 
method name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249729097
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249729279
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
 
 Review comment:
   `getInputToOutputNamesMapping` and drop the comment? This method seems more 
generic and that it could be used in other places other then this rule 
`CalcUpsertToRetractionTransposeRule ` here. What do you think about placing it 
inside `FlinkLogicalCalc`? Maybe even refactoring it into something like this:
   ```
   class FlinkLogicalCalc:
 /**
  * Returns empty if field is not forwarded.
  */
 Option[String] mapInputToOutputName(String input)
   ```
   
   `Seq[Option[String]] mapInputToOutputNames(Seq[String] inputs)` would be 
probably more efficient, but maybe less readable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249735236
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
 
 Review comment:
   Some kind of `map` usually better expresses mappings between two things 
compared to list of tuples. Is there a reason that I do not see why have you 
chosen list of tuples? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249728531
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
 
 Review comment:
   drop this comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249735123
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
+val inNames = calc.getInput.getRowType.getFieldNames
+calc.getProgram.getNamedProjects
+  .map(p => {
+calc.getProgram.expandLocalRef(p.left) match {
+  // output field is forwarded input field
+  case r: RexInputRef => (r.getIndex, p.right)
+  // output field is renamed input field
+  case a: RexCall if a.getKind.equals(SqlKind.AS) =>
+a.getOperands.get(0) match {
+  case ref: RexInputRef =>
+(ref.getIndex, p.right)
+  case _ =>
+(-1, p.right)
 
 Review comment:
   Why do you create a tuples `(int, string)` and later filtering out based on 
integers instead of for example inserting or not something to an array or a 
map? Another remark, I think that maybe implementing `RexVisitorImpl` (like 
`InputRefVisitor`) would be a better, more re-usable approach which would for 
example solve the problem of duplicating the code for `case r: RexInputRef`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249729068
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
 
 Review comment:
   drop comment it duplicates method name


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249731317
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+
+val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+
+call.transformTo(newUpsertToRetraction)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+// get input output names
+val inOutNames = getInOutNames(calc)
+// contains all fields
+inOutNames.map(_._1).containsAll(fields)
+  }
+
+  /**
+* Get related output field names for input field names for Calc.
+*/
+  private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = {
+val inNames = calc.getInput.getRowType.getFieldNames
+calc.getProgram.getNamedProjects
+  .map(p => {
 
 Review comment:
   Please do not use single letter/abbreviated variables  `io`,`p`, `r`, `a`, 
`ref`, it makes code cryptic. Also naming what `p.right` and `p.left` means by 
assigning those values to a named local variable would be useful.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249728818
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// get new calc
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+// get new upsertToRetraction
 
 Review comment:
   Extract to method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249726595
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
+// key fields should not be changed
+  fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
 
 Review comment:
   Either rename the method or drop the `// key fields should not be changed` 
comment. Something is wrong if you need to write a comment explaining method 
name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249727908
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
+calc.getRowType.getFieldCount <= 
upsertToRetraction.getRowType.getFieldCount &&
 
 Review comment:
   However I see one more issue here. Does this condition make sense? Shouldn't 
this be resolved/decided by cost base optimiser? Why do we need this heuristic? 
What if calc adds column(s) while also being a very selective filter? IMO this 
condition should be completely dropped and only conditions that guards the 
correctness should be tested here. Whether `calc` should or shouldn't be pushed 
down through `UpsertToRetraction` should be decided by the cost of the plan. 
Cost of `UpsertToRetraction` should reflect both the number of rows and size of 
the rows. Is this happening? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249725951
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, 
unaryNode}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import java.lang.{Boolean => JBool}
+
+class FromUpsertStreamTest extends TableTestBase {
+
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+  @Test
+  def testRemoveUpsertToRetraction() = {
+streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))](
+  "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+UpsertTableNode(0),
+term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime",
+  "CAST(rowtime) AS rowtime")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = {
 
 Review comment:
   Generally speaking naming of the those tests is not perfect (I'm really 
struggling to understand what are they doing), but I first wanted to understand 
what are you testing for.
   
   But `testCalcTransposeUpsertToRetraction` is a subset of 
`testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose`. They both 
test exact same code paths/branches, aren't they? On the other hand you are not 
testing for the condition:
   ```
   calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount
   ```
   The last test checks only for the:
   ```
   // key fields should not be changed
 fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249728417
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+// column pruning or push Filter down
 
 Review comment:
   Same here. Instead of writing a comment explaining 
`calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount` 
encapsulate this logic inside a simple method which name would explain it.
   
   And ditto in other places. There are comments that do not explain anything 
or comments that should be replaced by extracted method names.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11357) Check and port LeaderChangeJobRecoveryTest to new code base if necessary

2019-01-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11357:
---
Labels: pull-request-available  (was: )

> Check and port LeaderChangeJobRecoveryTest to new code base if necessary
> 
>
> Key: FLINK-11357
> URL: https://issues.apache.org/jira/browse/FLINK-11357
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: lining
>Priority: Major
>  Labels: pull-request-available
>
> Check and port {{LeaderChangeJobRecoveryTest}} to new code base if necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] jinglining opened a new pull request #7554: [FLINK-11357][test]Check and port LeaderChangeJobRecoveryTest to new …

2019-01-22 Thread GitBox
jinglining opened a new pull request #7554: [FLINK-11357][test]Check and port 
LeaderChangeJobRecoveryTest to new …
URL: https://github.com/apache/flink/pull/7554
 
 
   ## What is the purpose of the change
   
   Port `LeaderChangeJobRecoveryTest` to new code base. See code this test is 
still necessary.
   
   
   ## Brief change log
   
   - Update LeaderChangeJobRecoveryTest, then can pass CheckStyle. Delete no 
need code.
   
   
   ## Verifying this change
   
   This change is itself a test.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API

2019-01-22 Thread GitBox
tzulitai commented on issue #7553: [FLINK-11328] [core] Migrate all 
parameterless serializers to use new serialization compatibility API
URL: https://github.com/apache/flink/pull/7553#issuecomment-456357940
 
 
   @dawidwys 
   Regarding the changes to the CEP serializers, could you take a look at 
898f6c54035206a97cfbb04bdad4f7568264dab2?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11328) Migrate all parameterless serializers / TypeSerializerSingleton's to use new serializer snapshot abstractions

2019-01-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11328:
---
Labels: pull-request-available  (was: )

> Migrate all parameterless serializers / TypeSerializerSingleton's to use new 
> serializer snapshot abstractions
> -
>
> Key: FLINK-11328
> URL: https://issues.apache.org/jira/browse/FLINK-11328
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> This subtask covers migration of:
> * All subclasses of TypeSerializerSingleton
> * All serializers that are still using the 
> ParameterlessTypeSerializerConfigSnapshot
> to use the new serialization compatibility APIs ({{TypeSerializerSnapshot}} 
> and {{TypeSerializerSchemaCompatibility).
> Serializers are only considered to have completed migration according to the 
> defined list of things to check in FLINK-11327.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai opened a new pull request #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API

2019-01-22 Thread GitBox
tzulitai opened a new pull request #7553: [FLINK-11328] [core] Migrate all 
parameterless serializers to use new serialization compatibility API
URL: https://github.com/apache/flink/pull/7553
 
 
   ## What is the purpose of the change
   
   This PR migrates all parameterless serializers (in Flink, they happen to all 
be subclasses of `TypeSerializerSingleton`) to use the new serialization 
compatibility APIs, so that the serializers themselves are no longer 
Java-serialized into savepoints.
   
   ## Brief change log
   
   - edf6d59: This commit essentially does 2 things: 1) improve usability of 
the new `SimpleTypeSerializerSnapshot` base class, so that it may be used by 
serializers that have varying ways for instantiation, and 2) let all subclasses 
of `TypeSerializerSingleton` return their own implementation of a 
`SimpleTypeSerializerSnapshot`, while removing the base implementation of 
`snapshotConfiguration` in `TypeSerializerSingleton`.
   - 898f6c5: Some CEP serializers for NFA data structures use nested 
serializers, so they should actually use the `CompositeTypeSerializerSnapshot` 
snapshot class. This commit addresses that.
   - 74adb84 to b4910d4: Some cleanup in tests.
   - 79c85ca to 18187c0: Commits that add migration tests for the touched 
serializers.
   
   ## Verifying this change
   
   All new subclasses of `TypeSerializerSnapshotMigrationTestBase` should pass.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7486: [FLINK-11316] [tests] Drop JarFileCreator

2019-01-22 Thread GitBox
GJL commented on issue #7486: [FLINK-11316] [tests] Drop JarFileCreator
URL: https://github.com/apache/flink/pull/7486#issuecomment-456354600
 
 
   @aljoscha `ClassLoaderUtilsTest#testWithURLClassLoader` passes with Java 9. 
If you think this PR is fine, please merge. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL edited a comment on issue #7486: [FLINK-11316] [tests] Drop JarFileCreator

2019-01-22 Thread GitBox
GJL edited a comment on issue #7486: [FLINK-11316] [tests] Drop JarFileCreator
URL: https://github.com/apache/flink/pull/7486#issuecomment-456354600
 
 
   @aljoscha `ClassLoaderUtilsTest#testWithURLClassLoader` passes with Java 9 
now. If you think this PR is fine, please merge. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-01-22 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r249725255
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ##
 @@ -183,6 +183,17 @@ object UpdatingPlanChecker {
   lJoinKeys.zip(rJoinKeys)
 )
   }
+
+case l: DataStreamUpsertToRetraction =>
+  val uniqueKeyNames = l.getRowType.getFieldNames.zipWithIndex
+.filter(e => l.keyIndexes.contains(e._2))
+.map(_._1)
+  Some(uniqueKeyNames.map(e => (e, e)))
+
+case scan: UpsertStreamScan =>
 
 Review comment:
   Agree with you. I think we can use `MetadataHandler` to solve it later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base

2019-01-22 Thread GitBox
GJL commented on a change in pull request #7541: [FLINK-11356][tests] Port 
JobManagerStartupTest to new code base
URL: https://github.com/apache/flink/pull/7541#discussion_r249718882
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerTest.java
 ##
 @@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for the {@link BlobServer}.
+ */
+public class BlobServerTest extends TestLogger {
+
+   @ClassRule
+   public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that the {@link BlobServer} fails if the blob storage directory
+* cannot be created.
+*/
+   @Test
+   public void testFailureIfStorageDirectoryCannotBeCreated() throws 
IOException {
+   assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't 
work on Windows.
 
 Review comment:
   I would prefer to move the _assumption_ closer to 
`createNonWritableDirectory`. This makes the comment clearer. Also consider 
`assumeFalse` as `!` can be overlooked.
   ```suggestion
assumeFalse(OperatingSystem.isWindows()); //setWritable doesn't 
work on Windows.
final File blobStorageDirectory = createNonWritableDirectory();
   
final Configuration configuration = new Configuration();
final String nonExistDirectory = new File(blobStorageDirectory, 
"does_not_exist_for_sure").getAbsolutePath();
configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, 
nonExistDirectory);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base

2019-01-22 Thread GitBox
GJL commented on a change in pull request #7541: [FLINK-11356][tests] Port 
JobManagerStartupTest to new code base
URL: https://github.com/apache/flink/pull/7541#discussion_r249714551
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
 ##
 @@ -360,4 +368,32 @@ public void testConcurrentActorSystemCreation() throws 
Exception {
ExecutorUtils.gracefulShutdown(1L, 
TimeUnit.MILLISECONDS, executorService);
}
}
+
+   /**
+* Tests that the {@link ActorSystem} fails with an expressive 
exception if it cannot be
+* instantiated due to an occupied port.
+*/
+   @Test
+   public void testActorSystemInstantiationFailureWhenPortOccupied() 
throws Exception {
+   ServerSocket portOccupier;
+   final int port;
+
+   try {
+   port = NetUtils.getAvailablePort();
+   portOccupier = new ServerSocket(port, 10, 
InetAddress.getByName("0.0.0.0"));
+   }
+   catch (Throwable t) {
+   // could not find free port, or open a connection there
 
 Review comment:
   It's acceptable to silently fail if we cannot open a socket?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10819) The instability problem of CI, JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure test fail.

2019-01-22 Thread Gary Yao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16748563#comment-16748563
 ] 

Gary Yao commented on FLINK-10819:
--

It happened again: https://api.travis-ci.org/v3/job/482518482/log.txt

> The instability problem of CI, 
> JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure test 
> fail.
> ---
>
> Key: FLINK-10819
> URL: https://issues.apache.org/jira/browse/FLINK-10819
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.8.0
>
>
> Found the following error in the process of CI:
> Results :
> Tests in error: 
>  JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » 
> IllegalArgument
> Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29
> 18:40:55.828 [INFO] 
> 
> 18:40:55.829 [INFO] BUILD FAILURE
> 18:40:55.829 [INFO] 
> 
> 18:40:55.830 [INFO] Total time: 30:19 min
> 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00
> 18:40:56.294 [INFO] Final Memory: 92M/678M
> 18:40:56.294 [INFO] 
> 
> 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be 
> activated because it does not exist.
> 18:40:56.295 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test 
> (integration-tests) on project flink-tests_2.11: There are test failures.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] Please refer to 
> /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports 
> for the individual test results.
> 18:40:56.295 [ERROR] -> [Help 1]
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven 
> with the -e switch.
> 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug 
> logging.
> 18:40:56.295 [ERROR] 
> 18:40:56.295 [ERROR] For more information about the errors and possible 
> solutions, please read the following articles:
> 18:40:56.295 [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> MVN exited with EXIT CODE: 1.
> Trying to KILL watchdog (11329).
> ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog
> PRODUCED build artifacts.
> But after the rerun, the error disappeared. 
> Currently,no specific reasons are found, and will continue to pay attention.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #6445: [FLINK-8302] [table] Add SHIFT_LEFT and SHIFT_RIGHT

2019-01-22 Thread GitBox
twalthr commented on issue #6445: [FLINK-8302] [table] Add SHIFT_LEFT and 
SHIFT_RIGHT
URL: https://github.com/apache/flink/pull/6445#issuecomment-456328423
 
 
   Thanks for updating the PR @xueyumusic and @pnowojski. I will take a final 
look today and merge it if everything is ok.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11405) rest api can see task and task attempt exception by start end time filter

2019-01-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11405:
---
Labels: pull-request-available  (was: )

> rest api can see task and task attempt exception by start end time filter
> -
>
> Key: FLINK-11405
> URL: https://issues.apache.org/jira/browse/FLINK-11405
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Reporter: lining
>Assignee: lining
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11374) See more failover and can filter by time range

2019-01-22 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16748362#comment-16748362
 ] 

lining edited comment on FLINK-11374 at 1/22/19 9:03 AM:
-

Hi, [~till.rohrmann]. I have added some details about this. What do you think 
about it?

I have updated this task. I will update rest api. [~vthinkxie] will update the 
web ui.


was (Author: lining):
Hi, [~till.rohrmann]. I have added some details about this. What do you think 
about it?

> See more failover and can filter by time range
> --
>
> Key: FLINK-11374
> URL: https://issues.apache.org/jira/browse/FLINK-11374
> Project: Flink
>  Issue Type: Improvement
>  Components: REST, Webfrontend
>Reporter: lining
>Assignee: lining
>Priority: Major
> Attachments: image-2019-01-22-11-40-53-135.png, 
> image-2019-01-22-11-42-33-808.png
>
>
> Now failover just show limit size task failover latest time. If task has 
> failed many time, we can not see the earlier time failover. Can we add filter 
> by time to see failover which contains task attemp fail msg.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] jinglining opened a new pull request #7552: [FLINK-11405][rest]can see task and task attempt exception by start e…

2019-01-22 Thread GitBox
jinglining opened a new pull request #7552: [FLINK-11405][rest]can see task and 
task attempt exception by start e…
URL: https://github.com/apache/flink/pull/7552
 
 
   ## What is the purpose of the change
   
   - This pull request makes rest api JobExceptionsHandler can see more 
exception message. 
   - Add task attempt exception show.
   - can filter exception by time start and end.
   
   
   ## Brief change log
 - JobExceptionsHandler create exception add task attemp exceptions
 - JobExceptionsHandler queryParameter add start and end 
 - JobExceptionsInfo.ExecutionExceptionInfo add fileds(vertexID, 
vertexName, subtaskIndex, attemptNum)
 - Add FixedSortedSetTest for see FixedSize exception with order
   
   
   ## Verifying this change
   
   
   - update JobExceptionsInfoNoRootTest and JobExceptionsInfoTest.
   - Added FixedSortedSetTest 
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (don't know)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no )
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >