[jira] [Updated] (FLINK-11411) Failover regions number of RestartPipelinedRegionStrategy not show in LOG due to incorrect parameter
[ https://issues.apache.org/jira/browse/FLINK-11411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11411: --- Labels: pull-request-available (was: ) > Failover regions number of RestartPipelinedRegionStrategy not show in LOG due > to incorrect parameter > > > Key: FLINK-11411 > URL: https://issues.apache.org/jira/browse/FLINK-11411 > Project: Flink > Issue Type: Improvement >Reporter: BoWang >Assignee: BoWang >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] eaglewatcherwb opened a new pull request #7561: [FLINK-11411][log] show correct failover region number
eaglewatcherwb opened a new pull request #7561: [FLINK-11411][log] show correct failover region number URL: https://github.com/apache/flink/pull/7561 Change-Id: I118b7ea28d36edccbb3007fdf946b197b9f88865 ## What is the purpose of the change show correct failover region number in log ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11411) Failover regions number of RestartPipelinedRegionStrategy not show in LOG due to incorrect parameter
BoWang created FLINK-11411: -- Summary: Failover regions number of RestartPipelinedRegionStrategy not show in LOG due to incorrect parameter Key: FLINK-11411 URL: https://issues.apache.org/jira/browse/FLINK-11411 Project: Flink Issue Type: Improvement Reporter: BoWang Assignee: BoWang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] eaglewatcherwb opened a new pull request #7560: [FLINK-11344][webfrontend] display all execution attempts on web
eaglewatcherwb opened a new pull request #7560: [FLINK-11344][webfrontend] display all execution attempts on web URL: https://github.com/apache/flink/pull/7560 dashboard Change-Id: I859cf593ca567006778330b5a22f3b8af5acc965 ## What is the purpose of the change display all execution attempts on web ## Brief change log - display all execution attempts on web ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] eaglewatcherwb closed pull request #7503: [FLINK-11344][webfrontend] display all execution attempts on web
eaglewatcherwb closed pull request #7503: [FLINK-11344][webfrontend] display all execution attempts on web URL: https://github.com/apache/flink/pull/7503 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11336) Flink HA didn't remove ZK metadata
[ https://issues.apache.org/jira/browse/FLINK-11336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16749548#comment-16749548 ] shengjk1 commented on FLINK-11336: -- Yarn (per job or as a session) > Flink HA didn't remove ZK metadata > -- > > Key: FLINK-11336 > URL: https://issues.apache.org/jira/browse/FLINK-11336 > Project: Flink > Issue Type: Improvement >Reporter: shengjk1 >Priority: Major > Attachments: image-2019-01-15-19-42-21-902.png > > > Flink HA didn't remove ZK metadata > such as > go to zk cli : ls /flinkone > !image-2019-01-15-19-42-21-902.png! > > i suggest we should delete this metadata when the application cancel or > throw exception -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11336) Flink HA didn't remove ZK metadata
[ https://issues.apache.org/jira/browse/FLINK-11336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16749548#comment-16749548 ] shengjk1 edited comment on FLINK-11336 at 1/23/19 5:53 AM: --- Yarn (per job or as a session) was (Author: shengjk1): Yarn (per job or as a session) > Flink HA didn't remove ZK metadata > -- > > Key: FLINK-11336 > URL: https://issues.apache.org/jira/browse/FLINK-11336 > Project: Flink > Issue Type: Improvement >Reporter: shengjk1 >Priority: Major > Attachments: image-2019-01-15-19-42-21-902.png > > > Flink HA didn't remove ZK metadata > such as > go to zk cli : ls /flinkone > !image-2019-01-15-19-42-21-902.png! > > i suggest we should delete this metadata when the application cancel or > throw exception -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhijiangW commented on issue #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter
zhijiangW commented on issue #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter URL: https://github.com/apache/flink/pull/7438#issuecomment-456673652 @pnowojski , I have updated the codes for addressing the comment. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhijiangW commented on a change in pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter
zhijiangW commented on a change in pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter URL: https://github.com/apache/flink/pull/7438#discussion_r250035752 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java ## @@ -71,7 +71,7 @@ public void setUp(long flushTimeout) throws Exception { * * @param flushTimeout * output flushing interval of the -* {@link org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher thread +* {@link org.apache.flink.runtime.io.network.api.writer}'s output flusher thread Review comment: ,I would add it in a fixup commit. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] allenxwang commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
allenxwang commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r250026480 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java ## @@ -55,4 +57,13 @@ * @return null or the target topic */ String getTargetTopic(T element); + + /** +* +* @param element The incoming element to be serialized +* @return collection of headers (maybe empty) +*/ + default Iterable> headers(T element) { Review comment: What's the reason to use `Map.Entry` vs. `Tuple2`? `Tuple2` seems to be easier to construct than `Map.Entry` in implementation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249800524 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { Review comment: The reason I use `getInputFromOutputName` is the return type can be an `Option[String]`, because any two output names will surely not be the same. If we use `mapInputToOutputName` then we need return a Seq[String]. Both methods can solve the problem. Only we have to do is to define the method clearly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] KurtYoung commented on a change in pull request #6873: [hotfix] Add Review Progress section to PR description template.
KurtYoung commented on a change in pull request #6873: [hotfix] Add Review Progress section to PR description template. URL: https://github.com/apache/flink/pull/6873#discussion_r250022810 ## File path: .github/PULL_REQUEST_TEMPLATE.md ## @@ -70,3 +70,15 @@ This change added tests and can be verified as follows: - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) + + + +# Review Progress + +* [ ] 1. The contribution is well-described. +* [ ] 2. There is consensus that the contribution should go into to Flink. Review comment: Can you give some examples to show what makes a consensus, let developers have a good understanding? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tweise opened a new pull request #7559: [FLINK-11048] Mark new RemoteStreamEnvironment constructor PublicEvolving
tweise opened a new pull request #7559: [FLINK-11048] Mark new RemoteStreamEnvironment constructor PublicEvolving URL: https://github.com/apache/flink/pull/7559 Trivial follow-up for #7249 to not lock down this constructor, in case we want to add any more settings. ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11410) Kubernetes Setup page gives incorrect url of Flink UI
[ https://issues.apache.org/jira/browse/FLINK-11410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frank Huang closed FLINK-11410. --- Resolution: Invalid > Kubernetes Setup page gives incorrect url of Flink UI > - > > Key: FLINK-11410 > URL: https://issues.apache.org/jira/browse/FLINK-11410 > Project: Flink > Issue Type: Bug >Reporter: Frank Huang >Priority: Major > > in this > [section|https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/kubernetes.html#deploy-flink-session-cluster-on-kubernetes], > url should be > [http://localhost:8081/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy|http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy]. > The port should be 8081 instead of 8001. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11402) User code can fail with an UnsatisfiedLinkError in the presence of multiple classloaders
[ https://issues.apache.org/jira/browse/FLINK-11402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16749170#comment-16749170 ] Chesnay Schepler commented on FLINK-11402: -- [~uce] What happens if snappy-java is placed in /lib? Judging from FLINK-5408 this sounds like something we can't fix in general nor detect easily. As such I'd say we add a native section to the classloading-debugging docs, stating that in order to use system-provided native libraries the user must ensure that they are loaded through the system-classloader, requiring the loading classes to be placed in /lib. > User code can fail with an UnsatisfiedLinkError in the presence of multiple > classloaders > > > Key: FLINK-11402 > URL: https://issues.apache.org/jira/browse/FLINK-11402 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Local Runtime >Affects Versions: 1.7.0 >Reporter: Ufuk Celebi >Priority: Major > Attachments: hello-snappy-1.0-SNAPSHOT.jar, hello-snappy.tgz > > > As reported on the user mailing list thread "[`env.java.opts` not persisting > after job canceled or failed and then > restarted|https://lists.apache.org/thread.html/37cc1b628e16ca6c0bacced5e825de8057f88a8d601b90a355b6a291@%3Cuser.flink.apache.org%3E];, > there can be issues with using native libraries and user code class loading. > h2. Steps to reproduce > I was able to reproduce the issue reported on the mailing list using > [snappy-java|https://github.com/xerial/snappy-java] in a user program. > Running the attached user program works fine on initial submission, but > results in a failure when re-executed. > I'm using Flink 1.7.0 using a standalone cluster started via > {{bin/start-cluster.sh}}. > 0. Unpack attached Maven project and build using {{mvn clean package}} *or* > directly use attached {{hello-snappy-1.0-SNAPSHOT.jar}} > 1. Download > [snappy-java-1.1.7.2.jar|http://central.maven.org/maven2/org/xerial/snappy/snappy-java/1.1.7.2/snappy-java-1.1.7.2.jar] > and unpack libsnappyjava for your system: > {code} > jar tf snappy-java-1.1.7.2.jar | grep libsnappy > ... > org/xerial/snappy/native/Linux/x86_64/libsnappyjava.so > ... > org/xerial/snappy/native/Mac/x86_64/libsnappyjava.jnilib > ... > {code} > 2. Configure system library path to {{libsnappyjava}} in {{flink-conf.yaml}} > (path needs to be adjusted for your system): > {code} > env.java.opts: -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64 > {code} > 3. Run attached {{hello-snappy-1.0-SNAPSHOT.jar}} > {code} > bin/flink run hello-snappy-1.0-SNAPSHOT.jar > Starting execution of program > Program execution finished > Job with JobID ae815b918dd7bc64ac8959e4e224f2b4 has finished. > Job Runtime: 359 ms > {code} > 4. Rerun attached {{hello-snappy-1.0-SNAPSHOT.jar}} > {code} > bin/flink run hello-snappy-1.0-SNAPSHOT.jar > Starting execution of program > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: Job failed. > (JobID: 7d69baca58f33180cb9251449ddcd396) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at com.github.uce.HelloSnappy.main(HelloSnappy.java:18) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution
[GitHub] casidiablo opened a new pull request #7558: Fix typo in RocksDB error
casidiablo opened a new pull request #7558: Fix typo in RocksDB error URL: https://github.com/apache/flink/pull/7558 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11410) Kubernetes Setup page gives incorrect url of Flink UI
Frank Huang created FLINK-11410: --- Summary: Kubernetes Setup page gives incorrect url of Flink UI Key: FLINK-11410 URL: https://issues.apache.org/jira/browse/FLINK-11410 Project: Flink Issue Type: Bug Reporter: Frank Huang in this [section|https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/kubernetes.html#deploy-flink-session-cluster-on-kubernetes], url should be [http://localhost:8081/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy|http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy]. The port should be 8081 instead of 8001. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11409) Make `ProcessFunction`, `ProcessWindowFunction` and etc. pure interfaces
Kezhu Wang created FLINK-11409: -- Summary: Make `ProcessFunction`, `ProcessWindowFunction` and etc. pure interfaces Key: FLINK-11409 URL: https://issues.apache.org/jira/browse/FLINK-11409 Project: Flink Issue Type: Improvement Reporter: Kezhu Wang I found these functions express no opinionated demands from implementing classes. It would be nice to implement as interfaces not abstract classes as abstract class is intrusive and hampers caller user cases. For example, client can't write an `AbstractFlinkRichFunction` to unify lifecycle management for all data processing functions in easy way. I dive history of some of these functions, and find that some functions were converted as abstract class from interface due to default method implementation, such as `ProcessFunction` and `CoProcessFunction` were converted to abstract classes in FLINK-4460 which predate FLINK-7274. After FLINK-7274, [Java 8 default method|https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html] would be a better solution. I notice also that some functions which are introduced after FLINK-7274, such as `ProcessJoinFunction`, are implemented as abstract classes. I think it would be better to establish a well-known principle to guide both api authors and callers of data processing functions. Personally, I prefer interface for all exported function callbacks for the reason I express in first paragraph. Besides this, with `AbstractRichFunction` and interfaces for data processing functions I think lots of rich data processing functions can be eliminated as they are plain classes extending `AbstractRichFunction` and implementing data processing interfaces, clients can write this in one line code with clear intention of both data processing and lifecycle management. Following is a possible incomplete list of data processing functions implemented as abstract classes currently: * `ProcessFunction`, `KeyedProcessFunction`, `CoProcessFunction` and `ProcessJoinFunction` * `ProcessWindowFunction` and `ProcessAllWindowFunction` * `BaseBroadcastProcessFunction`, `BroadcastProcessFunction` and `KeyedBroadcastProcessFunction` All above functions are annotated with `@PublicEvolving`, making they interfaces won't break Flink's compatibility guarantee but compatibility is still a big consideration to evaluate this proposal. Any thoughts on this proposal ? Please must comment out. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun closed pull request #7528: [hotfix] [utils] Implement utils without cost of "enum"
TisonKun closed pull request #7528: [hotfix] [utils] Implement utils without cost of "enum" URL: https://github.com/apache/flink/pull/7528 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on issue #7529: [hotfix] [blob] Generalize usage of NoOpTransientBlobService
TisonKun commented on issue #7529: [hotfix] [blob] Generalize usage of NoOpTransientBlobService URL: https://github.com/apache/flink/pull/7529#issuecomment-456481455 @GJL alright we can defer such a `generalization` when we actually want to use it somewhere else. For using `enum` as a strategy to implement singletons, I've learned it from zentol how Flink community think of it #7528 . I'll close it as left. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun closed pull request #7529: [hotfix] [blob] Generalize usage of NoOpTransientBlobService
TisonKun closed pull request #7529: [hotfix] [blob] Generalize usage of NoOpTransientBlobService URL: https://github.com/apache/flink/pull/7529 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11406) Return TypeSerializerSchemaCompatibility.incompatible() when arity of nested serializers don't match in CompositeTypeSerializerSnapshot's compatibility check
[ https://issues.apache.org/jira/browse/FLINK-11406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11406: --- Labels: pull-request-available (was: ) > Return TypeSerializerSchemaCompatibility.incompatible() when arity of nested > serializers don't match in CompositeTypeSerializerSnapshot's compatibility > check > - > > Key: FLINK-11406 > URL: https://issues.apache.org/jira/browse/FLINK-11406 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > Right now, in > {{CompositeTypeSerializerSnapshot.resolveSchemaCompatibility(...)}}, if arity > of nested serializers don't match between the snapshot and the provided new > serializer, a runtime exception is thrown. > More ideally, this should return > {{TypeSerializerSchemaCompatibility.incompatible()}}. > We should also make it more clearer in the Javadocs that the > {{CompositeTypeSerializerSnapshot}} is intended for product serializers that > have a fixed arity of nested serializers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tzulitai opened a new pull request #7557: [FLINK-11406] [core] Return INCOMPATIBLE when nested serializers arity don't match in CompositeTypeSerializerSnapshot
tzulitai opened a new pull request #7557: [FLINK-11406] [core] Return INCOMPATIBLE when nested serializers arity don't match in CompositeTypeSerializerSnapshot URL: https://github.com/apache/flink/pull/7557 ## What is the purpose of the change Right now, in `CompositeTypeSerializerSnapshot.resolveSchemaCompatibility(...)`, if arity of nested serializers don't match between the snapshot and the provided new serializer, a runtime exception is thrown. More ideally, this should return `TypeSerializerSchemaCompatibility.incompatible()`. ## Verifying this change A new unit test `CompositeTypeSerializerSnapshotTest.testNestedFieldSerializerArityMismatchPrecedence` covers this change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL edited a comment on issue #7529: [hotfix] [blob] Generalize usage of NoOpTransientBlobService
GJL edited a comment on issue #7529: [hotfix] [blob] Generalize usage of NoOpTransientBlobService URL: https://github.com/apache/flink/pull/7529#issuecomment-456470455 Are you planning to use the `NoOpTransientBlobService` somewhere else? If not, I would not like to merge this PR. I find it hard to believe that an implementation that throws an exception for almost every method can be used for testing in a sensible way. Also `enum` is a common strategy to implement singletons. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7529: [hotfix] [blob] Generalize usage of NoOpTransientBlobService
GJL commented on issue #7529: [hotfix] [blob] Generalize usage of NoOpTransientBlobService URL: https://github.com/apache/flink/pull/7529#issuecomment-456470455 Are you planning to use the `NoOpTransientBlobService` somewhere else? If not, I would not like to merge this PR. I find it hard to believe that an implementation that throws an exception for almost every method can be used for testing in a sensible way. Also `enum` are a common strategy to implement singletons. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-456466692 Thanks for showing the interest @tvielgouarin in this topic! > But for migration tests of universal FlinkKafkaProducer between Flink versions : I read in the doc that this connector is supported since flink 1.7 . So currently, no migration test should be passing Both for the universal and `FlinkKafkaProducer011` the most important test to catch regressions is to test restoring from savepoint that was taken using the latest stable version (`1.7`). It would test if current code in the master (future `1.8` release) has state compatibility with `1.7` savepoints. > Also, do we just want to check that the connector can start from a previous version checkpoint ? I think at least for now only checking the compatibility with `1.7` is enough. > Most of the time the test fail with `The producer attempted to use a producer id which is not currently assigned to its transactional id` I guess your test is failing because you are restoring Flink's state from savepoint correctly, but you are ignoring the state of Kafka brokers. I haven't thought about this issue before and it looks like nobody else did it as well... By the way, thanks for discovering this issue. I guess this is the first migration test that must take care of some external state (besides Flink's internal state). Probably Kafka migration tests must store not only "savepoint" in the resources but also in one way or another store the state of kafka cluster as it was just after completing the savepoint from which we want to restore (transaction log? topics content?) Whatever that means... Couple of random thoughts: 1. it might be possible to identify the list of files that define the internal state of Kafka that we need to archive, place it in the resources alongside of savepoint and use it during `KafkaTestBase` initialisation 2. maybe it will be better to express this logic in end to end test? CC @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-456466692 Thanks for showing the interest @tvielgouarin in this topic! > But for migration tests of universal FlinkKafkaProducer between Flink versions : I read in the doc that this connector is supported since flink 1.7 . So currently, no migration test should be passing Both for the universal and `FlinkKafkaProducer011` the most important test to catch regressions is to test restoring from savepoint that was taken using the latest stable version (`1.7`). It would test if current code in the master (future `1.8` release) has state compatibility with `1.7` savepoints. > Also, do we just want to check that the connector can start from a previous version checkpoint ? I think at least for now only checking the compatibility with `1.7` is enough. > Most of the time the test fail with `The producer attempted to use a producer id which is not currently assigned to its transactional id` I guess your test is failing because you are restoring Flink's state from savepoint correctly, but you are ignoring the state of Kafka brokers. I haven't thought about this issue before and it looks like nobody else did it as well... By the way, thanks for discovering this issue. I guess this is the first migration test that must take care of some external state (besides Flink's internal state). Probably Kafka migration tests must store not only "savepoint" in the resources but also in one way or another store the state of kafka cluster as it was just after completing the savepoint from which we want to restore (transaction log? topics content?) Whatever that means... Couple of random thoughts: 1. it might be possible to identify the list of files that define the internal state of Kafka that we need to archive, place it in the resources alongside of savepoint and use it during `KafkaTestBase` initialisation 2. maybe it will be better to express this logic in end to end tests? CC @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
pnowojski commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-456466692 Thanks for showing the interest @tvielgouarin in this topic! > But for migration tests of universal FlinkKafkaProducer between Flink versions : I read in the doc that this connector is supported since flink 1.7 . So currently, no migration test should be passing Both for the universal and `FlinkKafkaProducer011` the most important test to catch regressions is to test restoring from savepoint that was taken using the latest stable version (`1.7`). It would test if current code in the master (future `1.8` release) has state compatibility with `1.7` savepoints. > Also, do we just want to check that the connector can start from a previous version checkpoint ? I think at least for now only checking the compatibility with `1.7` is enough. > Most of the time the test fail with `The producer attempted to use a producer id which is not currently assigned to its transactional id` I guess your test is failing because you are restoring Flink's state from savepoint correctly, but you are ignoring the state of Kafka brokers. I haven't thought about this issue before and it looks like nobody else did it as well... I guess this is the first migration test that must take care of some external state (besides Flink's internal state). Probably Kafka migration tests must store not only "savepoint" in the resources but also in one way or another store the state of kafka cluster as it was just after completing the savepoint from which we want to restore (transaction log? topics content?) Whatever that means... Couple of random thoughts: 1. it might be possible to identify the list of files that define the internal state of Kafka that we need to archive, place it in the resources alongside of savepoint and use it during `KafkaTestBase` initialisation 2. maybe it will be better to express this logic in end to end tests? CC @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11408) ContinuousProcessingTimeTrigger: NPE on clear() and state is lost on merge
Cristian created FLINK-11408: Summary: ContinuousProcessingTimeTrigger: NPE on clear() and state is lost on merge Key: FLINK-11408 URL: https://issues.apache.org/jira/browse/FLINK-11408 Project: Flink Issue Type: Bug Environment: Put both bugs in [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs] This is running Flink 1.7.1 locally. Reporter: Cristian I was testing session windows using processing time and found a couple of problems with the ContinuousProcessingTimeTrigger. The first one is an NPE in the clear method: [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs/blob/master/src/main/java/flink/bug/Bug1.java] The second one, which is most likely related and the root cause of the first one, is that the way the state is merged for windows that are merged somehow makes it so that the trigger gets confused and it stops triggering: [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs/blob/master/src/main/java/flink/bug/Bug2.java] I managed to solve both of these by using a modified version of ContinuousProcessingTimeTrigger which does NOT call `ctx.mergePartitionedState(stateDesc);` in the onMerge method. This is what I understand it happens at the trigger level: * The first element in the stream sets an initial fire time (logic is in ContinuousProcessingTimeTrigger#onElement()), if there is no trigger set. * From then on, ContinuousProcessingTimeTrigger#onProcessingTime() is responsible for scheduling the next trigger. * When the windows are merged (in the case of session windows), somehow the clear and merge methods are called using the wrong window namespace (I think this is the root cause of the bug, but I'm not too familiar with that code). * Because the state is not cleared properly in the new window namespace, the previously scheduled trigger gets executed against the window that was cleared. * Moreover, the new window has the state of the previous window, which means that: ## onElement will NOT schedule a fire trigger ## onProcessingTime will never be called at all -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tzulitai commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API
tzulitai commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API URL: https://github.com/apache/flink/pull/7553#issuecomment-456460449 Thanks! Will merge after Travis confirms green .. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11407) Allow providing reason messages for TypeSerializerSchemaCompatibility.incompatible()
Tzu-Li (Gordon) Tai created FLINK-11407: --- Summary: Allow providing reason messages for TypeSerializerSchemaCompatibility.incompatible() Key: FLINK-11407 URL: https://issues.apache.org/jira/browse/FLINK-11407 Project: Flink Issue Type: Improvement Components: Type Serialization System Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Fix For: 1.8.0 There are a few different scenarios where a new serializer can be determined incompatible in a compatibility check. Allowing the incompatible result to be accompanied by a message indicating why the new serializer is incompatible would be beneficial, and allows the state backends to throw more meaningful exceptions when they do encounter an incompatible new serializer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dawidwys commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API
dawidwys commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API URL: https://github.com/apache/flink/pull/7553#issuecomment-456450150 Thanks @tzulitai for the update. It looks good for me now. +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11336) Flink HA didn't remove ZK metadata
[ https://issues.apache.org/jira/browse/FLINK-11336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16748851#comment-16748851 ] Stephan Ewen commented on FLINK-11336: -- What way did you start Flink? - standalone - Yarn (per job or as a session) - Mesos - Container > Flink HA didn't remove ZK metadata > -- > > Key: FLINK-11336 > URL: https://issues.apache.org/jira/browse/FLINK-11336 > Project: Flink > Issue Type: Improvement >Reporter: shengjk1 >Priority: Major > Attachments: image-2019-01-15-19-42-21-902.png > > > Flink HA didn't remove ZK metadata > such as > go to zk cli : ls /flinkone > !image-2019-01-15-19-42-21-902.png! > > i suggest we should delete this metadata when the application cancel or > throw exception -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11406) Return TypeSerializerSchemaCompatibility.incompatible() when arity of nested serializers don't match in CompositeTypeSerializerSnapshot's compatibility check
Tzu-Li (Gordon) Tai created FLINK-11406: --- Summary: Return TypeSerializerSchemaCompatibility.incompatible() when arity of nested serializers don't match in CompositeTypeSerializerSnapshot's compatibility check Key: FLINK-11406 URL: https://issues.apache.org/jira/browse/FLINK-11406 Project: Flink Issue Type: Improvement Components: Type Serialization System Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Fix For: 1.8.0 Right now, in {{CompositeTypeSerializerSnapshot.resolveSchemaCompatibility(...)}}, if arity of nested serializers don't match between the snapshot and the provided new serializer, a runtime exception is thrown. More ideally, this should return {{TypeSerializerSchemaCompatibility.incompatible()}}. We should also make it more clearer in the Javadocs that the {{CompositeTypeSerializerSnapshot}} is intended for product serializers that have a fixed arity of nested serializers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tzulitai commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API
tzulitai commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API URL: https://github.com/apache/flink/pull/7553#issuecomment-456445785 @dawidwys I've addressed your comments in fixup commit 6c7a3fe. Could you take a look and let me know if there are other issues, or a +1 if it looks good? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StephanEwen commented on a change in pull request #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.
StephanEwen commented on a change in pull request #7313: [FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers. URL: https://github.com/apache/flink/pull/7313#discussion_r249832214 ## File path: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java ## @@ -121,24 +127,14 @@ public boolean supportsResume() { return true; } - @VisibleForTesting - static org.apache.hadoop.fs.Path generateStagingTempFilePath( - org.apache.hadoop.fs.FileSystem fs, - org.apache.hadoop.fs.Path targetFile) throws IOException { - + private static org.apache.hadoop.fs.Path generateStagingTempFilePath(org.apache.hadoop.fs.Path targetFile) { checkArgument(targetFile.isAbsolute(), "targetFile must be absolute"); final org.apache.hadoop.fs.Path parent = targetFile.getParent(); final String name = targetFile.getName(); checkArgument(parent != null, "targetFile must not be the root directory"); - while (true) { - org.apache.hadoop.fs.Path candidate = new org.apache.hadoop.fs.Path( - parent, "." + name + ".inprogress." + UUID.randomUUID().toString()); - if (!fs.exists(candidate)) { - return candidate; - } - } + return new org.apache.hadoop.fs.Path(parent, "." + name + ".inprogress"); Review comment: I added this as a simple way to avoid collisions, without relying on the semantics of the code that invokes this. I would try and think through what happens after a failure/recovery when an old task (running on a network-partitioned TM that has not yet cancelled) is still writing to the same file as a recovered task. Can that happen? If yes, what is the behavior then? Do we reduce the guarantees in that case? The partitioned TM should not be able to commit a checkpoint (to not roll over to a proper file), but can it mess up / overwrite the staging contents? My initial thought is that it probably could - not 100% sure though how HDFS leases would behave then, could they prevent that? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
azagrebin commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r249829200 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -370,8 +370,8 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); + final T value = deserializer.deserialize( Review comment: I would be ok with having `Exception` in the signature of `deserialize`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-456391275 hi @pnowojski , @yanghua , I come from @cjolif 's team and I am currently working on the implementation of the Migration test in order to accelerate the merge of this PR. I came here with several questions in mind. I do understand that it needs to implement: - migration tests of `FlinkKafkaProducer011` between Flink versions - migration test from `FlinkKafkaProducer011` to universal `FlinkKafkaProducer` But for migration tests of universal `FlinkKafkaProducer` between Flink versions : I read in the doc that this connector is supported since flink 1.7 . So currently, no migration test should be passing ? Also, do we just want to check that the connector can start from a previous version checkpoint ? Furthermore I've worked on a simple implementation of Migration test for `FlinkKafkaProducer`: https://github.com/tvielgouarin/flink/commit/973756430c226a0cd5011fedc1eab1345a27cc2a Most of the time the test fail with `The producer attempted to use a producer id which is not currently assigned to its transactional id`. I'm not sure, but I guess it's because there is a different KafkaServer's instance between the snapshot version and the test, and the transactional Id assignations are lost (?). If so, do you have and idea of the the best work around ? Anyway if you can have a look at what I've done and criticize it, it would be much appreciated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on issue #6873: [hotfix] Add Review Progress section to PR description template.
fhueske commented on issue #6873: [hotfix] Add Review Progress section to PR description template. URL: https://github.com/apache/flink/pull/6873#issuecomment-456436978 Sorry, did not read your comment before I sent out the mail. You can just reply to the thread. Maybe, we can even leave out this PR and go directly with the bot. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-456391275 hi @pnowojski , @yanghua , I come from @cjolif 's team and I am currently working on the implementation of the Migration test in order to accelerate the merge of this PR. I came here with several questions in mind. I do understand that it needs to implement: - migration tests of `FlinkKafkaProducer011` between Flink versions - migration test from `FlinkKafkaProducer011` to universal `FlinkKafkaProducer` for migration tests of universal `FlinkKafkaProducer` between Flink versions : I read in the doc that this connector is supported since flink 1.7 . So currently, no migration test should be passing ? Also what should be the validation conditions ( producerConfig stay the same (?) ) Furthermore I've worked on a simple implementation of Migration test for `FlinkKafkaProducer`: https://github.com/tvielgouarin/flink/commit/973756430c226a0cd5011fedc1eab1345a27cc2a Most of the time the test fail with `The producer attempted to use a producer id which is not currently assigned to its transactional id`. I'm not sure, but I guess it's because there is a different KafkaServer's instance between the snapshot version and the test, and the transactional Id assignations are lost (?). If so, do you have and idea of the the best work around ? Anyway if you can have a look at what I've done and criticize it, it would be much appreciated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-456391275 hi @pnowojski , @yanghua , I come from @cjolif 's team and I am currently working on the implementation of the Migration test in order to accelerate the merge of this PR. I came here with several questions in mind. I do understand that it needs to implement: - migration tests of `FlinkKafkaProducer011` between Flink versions - migration test from `FlinkKafkaProducer011` to universal `FlinkKafkaProducer` for migration tests of universal `FlinkKafkaProducer` between Flink versions : I read in the doc that this connector is supported since flink 1.7 . So currently, no migration test should be applied ? However what should be the validation conditions ( producerConfig stay the same (?) ) Furthermore I've worked on a simple implementation of Migration test for `FlinkKafkaProducer`: https://github.com/tvielgouarin/flink/commit/973756430c226a0cd5011fedc1eab1345a27cc2a Most of the time the test fail with `The producer attempted to use a producer id which is not currently assigned to its transactional id`. I'm not sure, but I guess it's because there is a different KafkaServer's instance between the snapshot version and the test, and the transactional Id assignations are lost (?). If so, do you have and idea of the the best work around ? Anyway if you can have a look at what I've done and criticize it, it would be much appreciated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11401) Allow compression on ParquetBulkWriter
[ https://issues.apache.org/jira/browse/FLINK-11401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16748795#comment-16748795 ] Stephan Ewen commented on FLINK-11401: -- I can see that being useful. Please bear in mind that bulk writers currently have the implication that they need to roll on checkpoint, because many formats (like Parquet) don't make it easy to intermediately persist and resume writes. Avro's row-by-row append nature makes it possible to write part files across checkpoints. One could think of letting the row-formats add a header, when opening a part file. That would allow the Avro writes to keep the property of writing part files across checkpoints. > Allow compression on ParquetBulkWriter > -- > > Key: FLINK-11401 > URL: https://issues.apache.org/jira/browse/FLINK-11401 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.7.1 >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kezhuw opened a new pull request #7556: [hotfix][javadocs] Fix typos in javadoc
kezhuw opened a new pull request #7556: [hotfix][javadocs] Fix typos in javadoc URL: https://github.com/apache/flink/pull/7556 Multiple typo fixes since I read Flink docs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r249798168 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -370,8 +370,8 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); + final T value = deserializer.deserialize( Review comment: Methods of 0.8 _ConsumerRecord_ have different signatures compared to modern Kafka - namely they throw base Exception. 1. Change signature of deserialize(ConsumerRecord) to throw Exception instead of IOException. 2. Wrap and re-throw in every deserialize(ConsumerRecord) implementation which is 0.8 or for Kafka-base (KeyedDeserializationSchemaWrapper, JSONKeyValueDeserializationSchema, NonContinousOffsetsDeserializationSchema and TypeInformationKeyValueSerializationSchema) @azagrebin, @stevenzwu which approach do you prefer? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r249798168 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -370,8 +370,8 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); + final T value = deserializer.deserialize( Review comment: Methods of 0.8 _ConsumerRecord_ have different signatures comparing to modern Kafka - namely they throw base Exception. 1. Change signature of deserialize(ConsumerRecord) to throw Exception instead of IOException. 2. Wrap and re-throw in every deserialize(ConsumerRecord) implementation which is 0.8 or for Kafka-base (KeyedDeserializationSchemaWrapper, JSONKeyValueDeserializationSchema, NonContinousOffsetsDeserializationSchema and TypeInformationKeyValueSerializationSchema) @azagrebin, @stevenzwu which approach do you prefer? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rmetzger commented on issue #6873: [hotfix] Add Review Progress section to PR description template.
rmetzger commented on issue #6873: [hotfix] Add Review Progress section to PR description template. URL: https://github.com/apache/flink/pull/6873#issuecomment-456420688 Nice. I'm almost done with the bot (famous last words in software engineering). I might be able to finish it till tomorrow. Should we wait with the announcement till then? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249801381 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { +val inNames = calc.getInput.getRowType.getFieldNames +calc.getProgram.getNamedProjects + .map(p => { +calc.getProgram.expandLocalRef(p.left) match { + // output field is forwarded input field + case r: RexInputRef => (r.getIndex, p.right) + // output field is renamed input field + case a: RexCall if a.getKind.equals(SqlKind.AS) => +a.getOperands.get(0) match { + case ref: RexInputRef => +(ref.getIndex, p.right) + case _ => +(-1, p.right) Review comment: Yes, I am trying to refactor it as what you suggested. :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249795484 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( +cluster: RelOptCluster, +traitSet: RelTraitSet, +child: RelNode, +val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, child) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { +new FlinkLogicalUpsertToRetraction(cluster, traitSet, inputs.get(0), keyNames) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +val child = this.getInput +val rowCnt = mq.getRowCount(child) +// take rowCnt and fieldCnt into account, so that cost will be smaller when generate +// UpsertToRetractionConverter after Calc. +planner.getCostFactory.makeCost(rowCnt, rowCnt * child.getRowType.getFieldCount, 0) Review comment: Shouldn't you use `RelMetadataQuery#getAverageRowSize()` or at least duplicate the logic of `FlinkLogicalJoinBase#computeSelfCost` instead of using `getFieldCount`? Also Isn't the io cost missing? To be consistent with join cost this should return: ``` planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(child.getRowType)) ``` edit: probably a better idea is to use `estimateRowSize` and maybe in the future update it to use `RelMetadataQuery#getAverageRowSize()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249800524 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { Review comment: The reason I use `getInputFromOutputName` is the return type can be an `Option[String]`, because any two output names will surely not be the same. If we use `mapInputToOutputName` then we need return a Option[Seq[String]]. Both methods can solve the problem. Only we have to do is to define the method clearly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249797875 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( Review comment: Isn't this class missing a proper implementation of `AbstractRelNode#accept(org.apache.calcite.rex.RexShuttle)` method? It is used by calcite for example for gathering the "used" columns by this node. If `FlinkLogicalUpsertToRetraction#accept` do not visit primary key, I would be afraid that primary key might be pruned from the input of `FlinkLogicalUpsertToRetraction`. Also it is used for renaming fields etc. After optimisation your `keyNames: Seq[String]` can be not valid any more. As far as I (vaguely) recall from when I was implementing Temporal Table Joins, storing fields in form of `String` references was not very useful and using `RexNode` was a better option. Probably this needs more investigation and some additional unit test(s). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249795484 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( +cluster: RelOptCluster, +traitSet: RelTraitSet, +child: RelNode, +val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, child) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { +new FlinkLogicalUpsertToRetraction(cluster, traitSet, inputs.get(0), keyNames) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +val child = this.getInput +val rowCnt = mq.getRowCount(child) +// take rowCnt and fieldCnt into account, so that cost will be smaller when generate +// UpsertToRetractionConverter after Calc. +planner.getCostFactory.makeCost(rowCnt, rowCnt * child.getRowType.getFieldCount, 0) Review comment: Shouldn't you use `RelMetadataQuery#getAverageRowSize()` or at least duplicate the logic of `FlinkLogicalJoinBase#computeSelfCost` instead of using `getFieldCount`? Also Isn't the io cost missing? To be consistent with join cost this should return: ``` planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(child.getRowType)) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on issue #6873: [hotfix] Add Review Progress section to PR description template.
fhueske commented on issue #6873: [hotfix] Add Review Progress section to PR description template. URL: https://github.com/apache/flink/pull/6873#issuecomment-456417079 I've updated the PR. Before merging I'll post to the dev mailing list to remind everybody about the upcoming change and the new review process. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r249798168 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -370,8 +370,8 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); + final T value = deserializer.deserialize( Review comment: Methods of 0.8 Consumer have different signatures compared to modern Kafka - namely they throw base Exception. 1. Change signature of deserialize(ConsumerRecord) to throw Exception instead of IOException. 2. Wrap and re-throw in every deserialize(ConsumerRecord) implementation which is 0.8 or for Kafka-base (KeyedDeserializationSchemaWrapper, JSONKeyValueDeserializationSchema, NonContinousOffsetsDeserializationSchema and TypeInformationKeyValueSerializationSchema) @azagrebin, @stevenzwu which approach do you prefer? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249790833 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, unaryNode} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import java.lang.{Boolean => JBool} + +class FromUpsertStreamTest extends TableTestBase { + + private val streamUtil: StreamTableTestUtil = streamTestUtil() + + @Test + def testRemoveUpsertToRetraction() = { +streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))]( + "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime) + +val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +UpsertTableNode(0), +term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime", + "CAST(rowtime) AS rowtime") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = { Review comment: Yes, that was my point. If you drop the first condition it will also simplify this test. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249788943 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { Review comment: Also ditto :) I think here you shouldn't care if the field is duplicated or not, so if you decide not to use `mapInputToOutputName` method, implementing `CommonCalc#getInputToOutputNamesMapping()` that return a map or multimap is probably a better option then `Seq[String]`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249790503 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { +val inNames = calc.getInput.getRowType.getFieldNames +calc.getProgram.getNamedProjects + .map(p => { +calc.getProgram.expandLocalRef(p.left) match { + // output field is forwarded input field + case r: RexInputRef => (r.getIndex, p.right) + // output field is renamed input field + case a: RexCall if a.getKind.equals(SqlKind.AS) => +a.getOperands.get(0) match { + case ref: RexInputRef => +(ref.getIndex, p.right) + case _ => +(-1, p.right) Review comment: What do you mean? Maybe to rephrase my comment: I think that instead of having this `match case` construct a better idea might to implement it's logic as `RexVisitorImpl`, probably as a private static nested class inside `FlinkLogicalCalc` or `CommonCalc`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249786791 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { Review comment: Good point with `CommonCalc`. Probably it's a better place (btw, doesn't Calcite have some util method for that that works on `Calc`?). Regarding `getInputFromOutputName`. I don't mind which method you would add, both `getInputFromOutputName` and `mapInputToOutputNames` are valuable addition. However will the `getInputFromOutputName` help you? Don't you need the mapping in the opposite direction? With `calc` following the `UpsertToRetraction` node, you want to convert key names after transposing those two nodes. So you need to perform an operation like this: ``` val keyNameAfterTransposition = calc.mapInputToOutputName(upsertToRetraction.key) ``` or more precisely: ``` val newKeys = upsertToRetraction.keyNames.map(calc.mapInputToOutputName) // plus code to checkstate that `Option` is present ``` Right? Secondly: > A field may contain multi-output. For example, `select key as key1, key as key2 from T` 1. Does it matter? I guess one possibility would be to return the first encountered of those fields 2. `Seq[String] mapInputToOutputNames(String input)` is also an option. It might be more general. Even you could implement: ``` Option[String] mapInputToOutputName(String input) { return mapInputToOutputNames.collectFirst(input); } ``` This is an automated
[GitHub] GJL commented on a change in pull request #7524: [FLINK-11351][tests] Port JobManagerCleanupITCase to new code base
GJL commented on a change in pull request #7524: [FLINK-11351][tests] Port JobManagerCleanupITCase to new code base URL: https://github.com/apache/flink/pull/7524#discussion_r249786009 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java ## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.testtasks.FailingBlockingInvokable; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.FilenameFilter; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Small test to check that the {@link org.apache.flink.runtime.blob.BlobServer} cleanup is executed + * after job termination. + */ +public class BlobsCleanupITCase extends TestLogger { + + private static final long RETRY_INTERVAL = 100L; + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static MiniClusterResource miniClusterResource; + + private static Configuration configuration; + + private static File blobBaseDir; + + @BeforeClass + public static void setup() throws Exception { + blobBaseDir = TEMPORARY_FOLDER.newFolder(); + + configuration = new Configuration(); + configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, blobBaseDir.getAbsolutePath()); + configuration.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay"); + configuration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); + // BLOBs are deleted from BlobCache between 1s and 2s after last reference + // -> the BlobCache may still have the BLOB or not (let's test both cases randomly) + configuration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L); Review comment: I guess it would be wrong to allow modifications of `configuration` in tests. I would use `UnmodifiableConfiguration` here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615#discussion_r249787929 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ## @@ -82,93 +84,110 @@ */ @Internal public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFunction implements - CheckpointListener, - ResultTypeQueryable, - CheckpointedFunction { + CheckpointListener, Review comment: 522589e This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API
dawidwys commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API URL: https://github.com/apache/flink/pull/7553#issuecomment-456406468 Hi @tzulitai I had a look just at the commit you linked. I've put some suggestions. Most of it is just nitpicking, the biggest concern I have is maybe we should still return a fixed size for `EventIdSerializer` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249770611 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { Review comment: A field may contain multi-output. For example, `select key as key1, key as key2 from T`. Maybe refactor it into: class CommonCalc: ``` /** * Returns empty if output field is not forwarded from the input for the calc. */ Option[String] getInputFromOutputName(Calc calc, String output) ``` Put it in `CommonCalc` so that this method can be used by logical and physical calc nodes. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249775473 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249775396 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { +val inNames = calc.getInput.getRowType.getFieldNames +calc.getProgram.getNamedProjects + .map(p => { +calc.getProgram.expandLocalRef(p.left) match { + // output field is forwarded input field + case r: RexInputRef => (r.getIndex, p.right) + // output field is renamed input field + case a: RexCall if a.getKind.equals(SqlKind.AS) => +a.getOperands.get(0) match { + case ref: RexInputRef => +(ref.getIndex, p.right) + case _ => +(-1, p.right) Review comment: Maybe we can use `RexVisitorImpl` in `FlinkLogicalCalc`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-456391275 hi @pnowojski , @yanghua , I come from @cjolif 's team and I am currently working on the implementation of the Migration test in order to accelerate the merge of this PR. I came here with several questions in mind. I do understand that it needs to implement: - migration tests of `FlinkKafkaProducer011` between Flink versions - migration tests of universal `FlinkKafkaProducer` between Flink versions - migration test from `FlinkKafkaProducer011` to universal `FlinkKafkaProducer` However what should be the validation conditions ( producerConfig stay the same (?) ) Furthermore I've worked on a simple implementation of Migration test for `FlinkKafkaProducer`: https://github.com/tvielgouarin/flink/commit/973756430c226a0cd5011fedc1eab1345a27cc2a Most of the time the test fail with `The producer attempted to use a producer id which is not currently assigned to its transactional id`. I'm not sure, but I guess it's because there is a different KafkaServer's instance between the snapshot version and the test, and the transactional Id assignations are lost (?). If so, do you have and idea of the the best work around ? Anyway if you can have a look at what I've done and criticize it, it would be much appreciated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249770611 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { Review comment: A field may contain multi-output. For example, `select key as key1, key as key2 from T`. Maybe refactor it into: class FlinkLogicalCalc: ``` /** * Returns empty if output field is not forwarded from the input. */ Option[String] getInputFromOutputName(String output) ``` What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tvielgouarin commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
tvielgouarin commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-456391275 hi @pnowojski , @yanghua , I come from @cjolif 's team and I am currently working on the implementation of the Migration test in order to accelerate the merge of this PR. I came here with several questions in mind. I do understand that it needs to implement: - migration tests of `FlinkKafkaProducer011` between Flink versions - migration tests of universal `FlinkKafkaProducer` between Flink versions - migration test from `FlinkKafkaProducer011` to universal `FlinkKafkaProducer` However what should be the validation conditions ( producerConfig stay the same (?) ) Furthermore I've worked on a simple implementation of Migration test for `FlinkKafkaProducer`: https://github.com/tvielgouarin/flink/commit/973756430c226a0cd5011fedc1eab1345a27cc2a Most of the time the test fail with `The producer attempted to use a producer id which is not currently assigned to its transactional id`. I'm not sure, but I guess it's because there is a different KafkaServer's instance between the snapshot version and the test, and the transactional Id assignations are lost (?). If so, do you have and idea of the the best work around ? Anyway if you can have a look at what I've done a criticize it, it would be much appreciated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249762211 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && Review comment: Good point! It is better to remove `calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount` and keep `fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)`. The first condition should be solved by cost, the second condition guards the correctness. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249760569 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) Review comment: I will remove the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249759883 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, unaryNode} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import java.lang.{Boolean => JBool} + +class FromUpsertStreamTest extends TableTestBase { + + private val streamUtil: StreamTableTestUtil = streamTestUtil() + + @Test + def testRemoveUpsertToRetraction() = { +streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))]( + "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime) + +val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +UpsertTableNode(0), +term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime", + "CAST(rowtime) AS rowtime") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = { Review comment: > But testCalcTransposeUpsertToRetraction is a subset of testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose. Ok, I will remove `testCalcTransposeUpsertToRetraction()` > On the other hand you are not testing for the condition: I think both conditions have been tested as it is an `&`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11373) CliFrontend cuts off reason for error messages
[ https://issues.apache.org/jira/browse/FLINK-11373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11373: --- Labels: pull-request-available starter (was: starter) > CliFrontend cuts off reason for error messages > -- > > Key: FLINK-11373 > URL: https://issues.apache.org/jira/browse/FLINK-11373 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.5.6, 1.6.3, 1.7.1 >Reporter: Maximilian Michels >Assignee: leesf >Priority: Minor > Labels: pull-request-available, starter > > The CliFrontend seems to only print the first message in the strace trace and > not any of its causes. > {noformat} > bin/flink run /non-existing/path > Could not build the program from JAR file. > Use the help option (-h or --help) to get help on the command. > {noformat} > Notice, the underlying cause of this message is FileNotFoundException. > Consider changing > a) the error message for this particular case > b) the way the stack trace messages are trimmed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] leesf opened a new pull request #7555: [FLINK-11373] CliFrontend cuts off reason for error messages
leesf opened a new pull request #7555: [FLINK-11373] CliFrontend cuts off reason for error messages URL: https://github.com/apache/flink/pull/7555 ## What is the purpose of the change Show error message of CliArgsException in CliFrontend. ## Brief change log Change System.out.println(e.getMessage()) to e.printStackTrace(). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter
pnowojski commented on a change in pull request #7438: [FLINK-11282][network] Merge StreamRecordWriter into RecordWriter URL: https://github.com/apache/flink/pull/7438#discussion_r249741465 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java ## @@ -71,7 +71,7 @@ public void setUp(long flushTimeout) throws Exception { * * @param flushTimeout * output flushing interval of the -* {@link org.apache.flink.streaming.runtime.io.StreamRecordWriter}'s output flusher thread +* {@link org.apache.flink.runtime.io.network.api.writer}'s output flusher thread Review comment: nit: missing `RecordWriter` reference? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249726595 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) Review comment: Either rename the method or drop the `// key fields should not be changed` comment. Usually something is wrong if you need to write a comment explaining method name. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249729097 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249729279 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { Review comment: `getInputToOutputNamesMapping` and drop the comment? This method seems more generic and that it could be used in other places other then this rule `CalcUpsertToRetractionTransposeRule ` here. What do you think about placing it inside `FlinkLogicalCalc`? Maybe even refactoring it into something like this: ``` class FlinkLogicalCalc: /** * Returns empty if field is not forwarded. */ Option[String] mapInputToOutputName(String input) ``` `Seq[Option[String]] mapInputToOutputNames(Seq[String] inputs)` would be probably more efficient, but maybe less readable? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249735236 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { Review comment: Some kind of `map` usually better expresses mappings between two things compared to list of tuples. Is there a reason that I do not see why have you chosen list of tuples? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249728531 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc Review comment: drop this comment? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249735123 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { +val inNames = calc.getInput.getRowType.getFieldNames +calc.getProgram.getNamedProjects + .map(p => { +calc.getProgram.expandLocalRef(p.left) match { + // output field is forwarded input field + case r: RexInputRef => (r.getIndex, p.right) + // output field is renamed input field + case a: RexCall if a.getKind.equals(SqlKind.AS) => +a.getOperands.get(0) match { + case ref: RexInputRef => +(ref.getIndex, p.right) + case _ => +(-1, p.right) Review comment: Why do you create a tuples `(int, string)` and later filtering out based on integers instead of for example inserting or not something to an array or a map? Another remark, I think that maybe implementing `RexVisitorImpl` (like `InputRefVisitor`) would be a better, more re-usable approach which would for example solve the problem of duplicating the code for `case r: RexInputRef`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249729068 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names Review comment: drop comment it duplicates method name This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249731317 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) + +val newUpsertToRetraction = new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + +call.transformTo(newUpsertToRetraction) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +// get input output names +val inOutNames = getInOutNames(calc) +// contains all fields +inOutNames.map(_._1).containsAll(fields) + } + + /** +* Get related output field names for input field names for Calc. +*/ + private def getInOutNames(calc: FlinkLogicalCalc): Seq[(String, String)] = { +val inNames = calc.getInput.getRowType.getFieldNames +calc.getProgram.getNamedProjects + .map(p => { Review comment: Please do not use single letter/abbreviated variables `io`,`p`, `r`, `a`, `ref`, it makes code cryptic. Also naming what `p.right` and `p.left` means by assigning those values to a named local variable would be useful. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249728818 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// get new calc +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) +// get new upsertToRetraction Review comment: Extract to method? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249726595 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && +// key fields should not be changed + fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) Review comment: Either rename the method or drop the `// key fields should not be changed` comment. Something is wrong if you need to write a comment explaining method name. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249727908 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down +calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount && Review comment: However I see one more issue here. Does this condition make sense? Shouldn't this be resolved/decided by cost base optimiser? Why do we need this heuristic? What if calc adds column(s) while also being a very selective filter? IMO this condition should be completely dropped and only conditions that guards the correctness should be tested here. Whether `calc` should or shouldn't be pushed down through `UpsertToRetraction` should be decided by the cost of the plan. Cost of `UpsertToRetraction` should reflect both the number of rows and size of the rows. Is this happening? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249725951 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, unaryNode} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.types.Row +import org.junit.Test +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import java.lang.{Boolean => JBool} + +class FromUpsertStreamTest extends TableTestBase { + + private val streamUtil: StreamTableTestUtil = streamTestUtil() + + @Test + def testRemoveUpsertToRetraction() = { +streamUtil.addTableFromUpsert[(Boolean, (Int, String, Long))]( + "MyTable", 'a, 'b.key, 'c, 'proctime.proctime, 'rowtime.rowtime) + +val sql = "SELECT a, b, c, proctime, rowtime FROM MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +UpsertTableNode(0), +term("select", "a", "b", "c", "PROCTIME(proctime) AS proctime", + "CAST(rowtime) AS rowtime") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose() = { Review comment: Generally speaking naming of the those tests is not perfect (I'm really struggling to understand what are they doing), but I first wanted to understand what are you testing for. But `testCalcTransposeUpsertToRetraction` is a subset of `testMaterializeTimeIndicatorAndCalcUpsertToRetractionTranspose`. They both test exact same code paths/branches, aren't they? On the other hand you are not testing for the condition: ``` calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount ``` The last test checks only for the: ``` // key fields should not be changed fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249728417 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.{RexCall, RexInputRef} +import org.apache.calcite.sql.SqlKind +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +// column pruning or push Filter down Review comment: Same here. Instead of writing a comment explaining `calc.getRowType.getFieldCount <= upsertToRetraction.getRowType.getFieldCount` encapsulate this logic inside a simple method which name would explain it. And ditto in other places. There are comments that do not explain anything or comments that should be replaced by extracted method names. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11357) Check and port LeaderChangeJobRecoveryTest to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11357: --- Labels: pull-request-available (was: ) > Check and port LeaderChangeJobRecoveryTest to new code base if necessary > > > Key: FLINK-11357 > URL: https://issues.apache.org/jira/browse/FLINK-11357 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: lining >Priority: Major > Labels: pull-request-available > > Check and port {{LeaderChangeJobRecoveryTest}} to new code base if necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] jinglining opened a new pull request #7554: [FLINK-11357][test]Check and port LeaderChangeJobRecoveryTest to new …
jinglining opened a new pull request #7554: [FLINK-11357][test]Check and port LeaderChangeJobRecoveryTest to new … URL: https://github.com/apache/flink/pull/7554 ## What is the purpose of the change Port `LeaderChangeJobRecoveryTest` to new code base. See code this test is still necessary. ## Brief change log - Update LeaderChangeJobRecoveryTest, then can pass CheckStyle. Delete no need code. ## Verifying this change This change is itself a test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API
tzulitai commented on issue #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API URL: https://github.com/apache/flink/pull/7553#issuecomment-456357940 @dawidwys Regarding the changes to the CEP serializers, could you take a look at 898f6c54035206a97cfbb04bdad4f7568264dab2? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11328) Migrate all parameterless serializers / TypeSerializerSingleton's to use new serializer snapshot abstractions
[ https://issues.apache.org/jira/browse/FLINK-11328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11328: --- Labels: pull-request-available (was: ) > Migrate all parameterless serializers / TypeSerializerSingleton's to use new > serializer snapshot abstractions > - > > Key: FLINK-11328 > URL: https://issues.apache.org/jira/browse/FLINK-11328 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing, Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > This subtask covers migration of: > * All subclasses of TypeSerializerSingleton > * All serializers that are still using the > ParameterlessTypeSerializerConfigSnapshot > to use the new serialization compatibility APIs ({{TypeSerializerSnapshot}} > and {{TypeSerializerSchemaCompatibility). > Serializers are only considered to have completed migration according to the > defined list of things to check in FLINK-11327. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tzulitai opened a new pull request #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API
tzulitai opened a new pull request #7553: [FLINK-11328] [core] Migrate all parameterless serializers to use new serialization compatibility API URL: https://github.com/apache/flink/pull/7553 ## What is the purpose of the change This PR migrates all parameterless serializers (in Flink, they happen to all be subclasses of `TypeSerializerSingleton`) to use the new serialization compatibility APIs, so that the serializers themselves are no longer Java-serialized into savepoints. ## Brief change log - edf6d59: This commit essentially does 2 things: 1) improve usability of the new `SimpleTypeSerializerSnapshot` base class, so that it may be used by serializers that have varying ways for instantiation, and 2) let all subclasses of `TypeSerializerSingleton` return their own implementation of a `SimpleTypeSerializerSnapshot`, while removing the base implementation of `snapshotConfiguration` in `TypeSerializerSingleton`. - 898f6c5: Some CEP serializers for NFA data structures use nested serializers, so they should actually use the `CompositeTypeSerializerSnapshot` snapshot class. This commit addresses that. - 74adb84 to b4910d4: Some cleanup in tests. - 79c85ca to 18187c0: Commits that add migration tests for the touched serializers. ## Verifying this change All new subclasses of `TypeSerializerSnapshotMigrationTestBase` should pass. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7486: [FLINK-11316] [tests] Drop JarFileCreator
GJL commented on issue #7486: [FLINK-11316] [tests] Drop JarFileCreator URL: https://github.com/apache/flink/pull/7486#issuecomment-456354600 @aljoscha `ClassLoaderUtilsTest#testWithURLClassLoader` passes with Java 9. If you think this PR is fine, please merge. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL edited a comment on issue #7486: [FLINK-11316] [tests] Drop JarFileCreator
GJL edited a comment on issue #7486: [FLINK-11316] [tests] Drop JarFileCreator URL: https://github.com/apache/flink/pull/7486#issuecomment-456354600 @aljoscha `ClassLoaderUtilsTest#testWithURLClassLoader` passes with Java 9 now. If you think this PR is fine, please merge. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r249725255 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala ## @@ -183,6 +183,17 @@ object UpdatingPlanChecker { lJoinKeys.zip(rJoinKeys) ) } + +case l: DataStreamUpsertToRetraction => + val uniqueKeyNames = l.getRowType.getFieldNames.zipWithIndex +.filter(e => l.keyIndexes.contains(e._2)) +.map(_._1) + Some(uniqueKeyNames.map(e => (e, e))) + +case scan: UpsertStreamScan => Review comment: Agree with you. I think we can use `MetadataHandler` to solve it later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base
GJL commented on a change in pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base URL: https://github.com/apache/flink/pull/7541#discussion_r249718882 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerTest.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; + +/** + * Tests for the {@link BlobServer}. + */ +public class BlobServerTest extends TestLogger { + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* Tests that the {@link BlobServer} fails if the blob storage directory +* cannot be created. +*/ + @Test + public void testFailureIfStorageDirectoryCannotBeCreated() throws IOException { + assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. Review comment: I would prefer to move the _assumption_ closer to `createNonWritableDirectory`. This makes the comment clearer. Also consider `assumeFalse` as `!` can be overlooked. ```suggestion assumeFalse(OperatingSystem.isWindows()); //setWritable doesn't work on Windows. final File blobStorageDirectory = createNonWritableDirectory(); final Configuration configuration = new Configuration(); final String nonExistDirectory = new File(blobStorageDirectory, "does_not_exist_for_sure").getAbsolutePath(); configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, nonExistDirectory); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base
GJL commented on a change in pull request #7541: [FLINK-11356][tests] Port JobManagerStartupTest to new code base URL: https://github.com/apache/flink/pull/7541#discussion_r249714551 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java ## @@ -360,4 +368,32 @@ public void testConcurrentActorSystemCreation() throws Exception { ExecutorUtils.gracefulShutdown(1L, TimeUnit.MILLISECONDS, executorService); } } + + /** +* Tests that the {@link ActorSystem} fails with an expressive exception if it cannot be +* instantiated due to an occupied port. +*/ + @Test + public void testActorSystemInstantiationFailureWhenPortOccupied() throws Exception { + ServerSocket portOccupier; + final int port; + + try { + port = NetUtils.getAvailablePort(); + portOccupier = new ServerSocket(port, 10, InetAddress.getByName("0.0.0.0")); + } + catch (Throwable t) { + // could not find free port, or open a connection there Review comment: It's acceptable to silently fail if we cannot open a socket? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10819) The instability problem of CI, JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure test fail.
[ https://issues.apache.org/jira/browse/FLINK-10819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16748563#comment-16748563 ] Gary Yao commented on FLINK-10819: -- It happened again: https://api.travis-ci.org/v3/job/482518482/log.txt > The instability problem of CI, > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure test > fail. > --- > > Key: FLINK-10819 > URL: https://issues.apache.org/jira/browse/FLINK-10819 > Project: Flink > Issue Type: Test > Components: Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Priority: Critical > Labels: test-stability > Fix For: 1.8.0 > > > Found the following error in the process of CI: > Results : > Tests in error: > JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure:331 » > IllegalArgument > Tests run: 1463, Failures: 0, Errors: 1, Skipped: 29 > 18:40:55.828 [INFO] > > 18:40:55.829 [INFO] BUILD FAILURE > 18:40:55.829 [INFO] > > 18:40:55.830 [INFO] Total time: 30:19 min > 18:40:55.830 [INFO] Finished at: 2018-11-07T18:40:55+00:00 > 18:40:56.294 [INFO] Final Memory: 92M/678M > 18:40:56.294 [INFO] > > 18:40:56.294 [WARNING] The requested profile "include-kinesis" could not be > activated because it does not exist. > 18:40:56.295 [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test > (integration-tests) on project flink-tests_2.11: There are test failures. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] Please refer to > /home/travis/build/sunjincheng121/flink/flink-tests/target/surefire-reports > for the individual test results. > 18:40:56.295 [ERROR] -> [Help 1] > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] To see the full stack trace of the errors, re-run Maven > with the -e switch. > 18:40:56.295 [ERROR] Re-run Maven using the -X switch to enable full debug > logging. > 18:40:56.295 [ERROR] > 18:40:56.295 [ERROR] For more information about the errors and possible > solutions, please read the following articles: > 18:40:56.295 [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > MVN exited with EXIT CODE: 1. > Trying to KILL watchdog (11329). > ./tools/travis_mvn_watchdog.sh: line 269: 11329 Terminated watchdog > PRODUCED build artifacts. > But after the rerun, the error disappeared. > Currently,no specific reasons are found, and will continue to pay attention. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #6445: [FLINK-8302] [table] Add SHIFT_LEFT and SHIFT_RIGHT
twalthr commented on issue #6445: [FLINK-8302] [table] Add SHIFT_LEFT and SHIFT_RIGHT URL: https://github.com/apache/flink/pull/6445#issuecomment-456328423 Thanks for updating the PR @xueyumusic and @pnowojski. I will take a final look today and merge it if everything is ok. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11405) rest api can see task and task attempt exception by start end time filter
[ https://issues.apache.org/jira/browse/FLINK-11405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11405: --- Labels: pull-request-available (was: ) > rest api can see task and task attempt exception by start end time filter > - > > Key: FLINK-11405 > URL: https://issues.apache.org/jira/browse/FLINK-11405 > Project: Flink > Issue Type: Sub-task > Components: REST >Reporter: lining >Assignee: lining >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11374) See more failover and can filter by time range
[ https://issues.apache.org/jira/browse/FLINK-11374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16748362#comment-16748362 ] lining edited comment on FLINK-11374 at 1/22/19 9:03 AM: - Hi, [~till.rohrmann]. I have added some details about this. What do you think about it? I have updated this task. I will update rest api. [~vthinkxie] will update the web ui. was (Author: lining): Hi, [~till.rohrmann]. I have added some details about this. What do you think about it? > See more failover and can filter by time range > -- > > Key: FLINK-11374 > URL: https://issues.apache.org/jira/browse/FLINK-11374 > Project: Flink > Issue Type: Improvement > Components: REST, Webfrontend >Reporter: lining >Assignee: lining >Priority: Major > Attachments: image-2019-01-22-11-40-53-135.png, > image-2019-01-22-11-42-33-808.png > > > Now failover just show limit size task failover latest time. If task has > failed many time, we can not see the earlier time failover. Can we add filter > by time to see failover which contains task attemp fail msg. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] jinglining opened a new pull request #7552: [FLINK-11405][rest]can see task and task attempt exception by start e…
jinglining opened a new pull request #7552: [FLINK-11405][rest]can see task and task attempt exception by start e… URL: https://github.com/apache/flink/pull/7552 ## What is the purpose of the change - This pull request makes rest api JobExceptionsHandler can see more exception message. - Add task attempt exception show. - can filter exception by time start and end. ## Brief change log - JobExceptionsHandler create exception add task attemp exceptions - JobExceptionsHandler queryParameter add start and end - JobExceptionsInfo.ExecutionExceptionInfo add fileds(vertexID, vertexName, subtaskIndex, attemptNum) - Add FixedSortedSetTest for see FixedSize exception with order ## Verifying this change - update JobExceptionsInfoNoRootTest and JobExceptionsInfoTest. - Added FixedSortedSetTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (don't know) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no ) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services