[GitHub] 123avi commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according
123avi commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according URL: https://github.com/apache/flink/pull/7418#discussion_r253757234 ## File path: docs/dev/connectors/filesystem_sink.md ## @@ -117,11 +117,11 @@ input.addSink(sink); {% highlight scala %} -val input: DataStream[Tuple2[IntWritable, Text]] = ... +val input: DataStream[(IntWritable, Text)] = ??? -val sink = new BucketingSink[String]("/base/path") -sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))) -sink.setWriter(new SequenceFileWriter[IntWritable, Text]()) +val sink = new BucketingSink[(IntWritable, Text)]("/base/path") +sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm")) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on issue #7645: [FLINK-11521] Make RetryingRegistration configurable
tillrohrmann commented on issue #7645: [FLINK-11521] Make RetryingRegistration configurable URL: https://github.com/apache/flink/pull/7645#issuecomment-460535964 Thanks for the review @GJL. Merging this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#issuecomment-460533675 @pnowojski Hi, thank you very much for the review. I will address the new comments and update the PR ASAP. Probably, it's easier to be reviewed with two updates: submit an update first then try to rebase to the current master(as the current master is quite different from the previous one). Furthermore, looking forward to the merge plan. Let me know if anything else I can do. Thank you again for the review. Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] 123avi commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according
123avi commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according URL: https://github.com/apache/flink/pull/7418#discussion_r247153581 ## File path: docs/dev/connectors/filesystem_sink.md ## @@ -105,8 +105,8 @@ Example: {% highlight java %} DataStream> input = ...; -BucketingSink sink = new BucketingSink("/base/path"); -sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))); +BucketingSink> sink = new BucketingSink>("/base/path"); +sink.setBucketer(new DateTimeBuckete("-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))); Review comment: very well This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] 123avi commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according
123avi commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according URL: https://github.com/apache/flink/pull/7418#discussion_r247152403 ## File path: docs/dev/connectors/filesystem_sink.md ## @@ -117,11 +117,12 @@ input.addSink(sink); {% highlight scala %} -val input: DataStream[Tuple2[IntWritable, Text]] = ... +import org.apache.flink.api.java.tuple.Tuple2 +val input: DataStream[Tuple2[A, B]] = ??? //we need to use java Tuple2 for the SequenceFileWriter Review comment: I changed the comment to clarify the usage. is that ok ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253748320 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] +call.transformTo(getNewUpsertToRetraction(calc, upsertToRetraction)) + } + + private def getNewUpsertToRetraction( +calc: FlinkLogicalCalc, +upsertToRetraction: FlinkLogicalUpsertToRetraction): FlinkLogicalUpsertToRetraction = { + +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) + +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) +new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +calc.getRowType.getFieldNames + .flatMap(calc.getInputFromOutputName(calc, _)) Review comment: The `flatMap` unwraps `Option` returned from the `getInputFromOutputName`. If `Option` is empty, the result will be an empty list. :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253745265 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/UpsertToRetractionProcessFunction.scala ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Function used to convert upsert to retractions. + * + * @param rowTypeInfo the output row type info. + * @param queryConfig the configuration for the query. + */ +class UpsertToRetractionProcessFunction( Review comment: A good question! We can't use `LAST_VALUE` aggregate function here. An aggregate function ignores null input according to the SQL standard. Considering we have the following two inputs:`(key, value1, null)` and then `(key, null, value2)`, the result will be `(key, value1, value2)` if we use LAST_VALUE, while the result should be `(key, null, value2)`, i.e., the last row of the input. That's why I named the operator `LastRow` before. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Myasuka commented on issue #7639: [FLINK-9920] Only check part files in BucketingSinkFaultToleranceITCase
Myasuka commented on issue #7639: [FLINK-9920] Only check part files in BucketingSinkFaultToleranceITCase URL: https://github.com/apache/flink/pull/7639#issuecomment-460522078 Sorry for not replying late, we just had the Chinese new year. LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11447) Deprecate "new Table(TableEnvironment, String)"
[ https://issues.apache.org/jira/browse/FLINK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760474#comment-16760474 ] Dian Fu commented on FLINK-11447: - [~twalthr] Thanks a lot for offering the help. I have uploaded the PR and also updated the documentation and the java docs according to your suggestions. Looking forward to your feedback. I'll keep updating the PR as soon as possible to make sure it can be committed in this week. :) > Deprecate "new Table(TableEnvironment, String)" > --- > > Key: FLINK-11447 > URL: https://issues.apache.org/jira/browse/FLINK-11447 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dian Fu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > A more detailed description can be found in > [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions]. > Once table is an interface we can easily replace the underlying > implementation at any time. The constructor call prevents us from converting > it into an interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11523) TestHarness should provide way to configure TypeSerailizers for side output
Alexey Trenikhin created FLINK-11523: Summary: TestHarness should provide way to configure TypeSerailizers for side output Key: FLINK-11523 URL: https://issues.apache.org/jira/browse/FLINK-11523 Project: Flink Issue Type: Improvement Reporter: Alexey Trenikhin Currently AbstractStreamOperatorTestHarness allows to configure TypeSerializer for main output: {code:java} public void setup(TypeSerializer outputSerializer) {code} It should be possible to specify TypeSerializers for side outputs as well (or alternatively is to use TypeInformation from OutputTags) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flinkbot edited a comment on issue #7650: [FLINK-11522][table] Deprecate ExternalCatalogTable.builder()
flinkbot edited a comment on issue #7650: [FLINK-11522][table] Deprecate ExternalCatalogTable.builder() URL: https://github.com/apache/flink/pull/7650#issuecomment-460234012 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The [architecture] is sound. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on issue #7650: [FLINK-11522][table] Deprecate ExternalCatalogTable.builder()
dianfu commented on issue #7650: [FLINK-11522][table] Deprecate ExternalCatalogTable.builder() URL: https://github.com/apache/flink/pull/7650#issuecomment-460503074 @twalthr Could you help to take a look at this quick PR? Thanks a lot. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String)
dianfu commented on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String) URL: https://github.com/apache/flink/pull/7651#issuecomment-460502918 @twalthr Thanks a lot for the remind. I have updated the documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253721880 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -19,22 +19,138 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.rex.RexNode -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.{TableConfig, Types} import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row -import org.apache.flink.table.runtime.CRowOutputProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner} trait StreamScan extends CommonScan[CRow] with DataStreamRel { + protected def convertUpsertToInternalRow( Review comment: 1)The type of an upsert stream should be scala or java tuple, while we don't have this constraint on append stream. 2&3)Good suggestions. I will try to refactor this making less duplicating logic and more smaller methods. 4)The else branch for java tuple2 hasn't been converted by the tests. I will add one. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253714414 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -19,22 +19,138 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.rex.RexNode -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.{TableConfig, Types} import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row -import org.apache.flink.table.runtime.CRowOutputProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner} trait StreamScan extends CommonScan[CRow] with DataStreamRel { + protected def convertUpsertToInternalRow( + schema: RowSchema, + input: DataStream[Any], + fieldIdxs: Array[Int], + config: TableConfig, + rowtimeExpression: Option[RexNode]): DataStream[CRow] = { + +val internalType = schema.typeInfo +val cRowType = CRowTypeInfo(internalType) + +val hasTimeIndicator = fieldIdxs.exists(f => + f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER || +f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER) + +val dsType = input.getType + +dsType match { +// Scala tuple + case t: CaseClassTypeInfo[_] +if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == Types.BOOLEAN => + +val inputType = t.getTypeAt[Any](1) +if (inputType == internalType && !hasTimeIndicator) { + // input is already of correct type. Only need to wrap it as CRow + input.asInstanceOf[DataStream[(Boolean, Row)]] +.map(new RichMapFunction[(Boolean, Row), CRow] { + @transient private var outCRow: CRow = _ + override def open(parameters: Configuration): Unit = { +outCRow = new CRow(null, change = true) + } + + override def map(v: (Boolean, Row)): CRow = { +outCRow.row = v._2 +outCRow.change = v._1 +outCRow + } +}).returns(cRowType) + +} else { + // input needs to be converted and wrapped as CRow or time indicators need to be generated + + val function = generateConversionProcessFunction( +config, +inputType.asInstanceOf[TypeInformation[Any]], +internalType, +"UpsertStreamSourceConversion", +schema.fieldNames, +fieldIdxs, +rowtimeExpression + ) + + val processFunc = new ScalaTupleToCRowProcessRunner( +function.name, +function.code, +cRowType) + + val opName = s"from: (${schema.fieldNames.mkString(", ")})" + + input +.asInstanceOf[DataStream[(Boolean, Any)]] +.process(processFunc).name(opName).returns(cRowType) +} + + // Java tuple + case t: TupleTypeInfo[_] +if t.getTypeClass == classOf[JTuple2[_, _]] && t.getTypeAt(0) == Types.BOOLEAN => + +val inputType = t.getTypeAt[Any](1) +if (inputType == internalType && !hasTimeIndicator) { + // input is already of correct type. Only need to wrap it as CRow + input.asInstanceOf[DataStream[JTuple2[JBool, Row]]] +.map(new RichMapFunction[JTuple2[JBool, Row], CRow] { + @transient private var outCRow: CRow = _ + override def open(parameters: Configuration): Unit = { +outCRow = new CRow(null, change = true) + } + + override def map(v: JTuple2[JBool, Row]): CRow = { +outCRow.row = v.f1 +outCRow.change = v.f0 +outCRow + } +}).returns(cRowType) + +} else { + // input needs to be converted and wrapped as CRow or time indicators n
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253710139 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -426,7 +426,7 @@ abstract class StreamTableEnvironment( converterFunction match { case Some(func) => -new CRowMapRunner[OUT](func.name, func.code, func.returnType) +new CRowToExternalTypeMapRunner[OUT](func.name, func.code, func.returnType) Review comment: Make sense, I will revert these changes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253707232 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( Review comment: Thank you very much for looking into this. I will also keep an eye on the Calcite dev mailing list. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760284#comment-16760284 ] Ying Xu commented on FLINK-4582: Thanks [~tinder-dthomson] . Internally we use the *new ObjectMapper()* initialization but didn't observe similar issue. But we may run with an older Flink distribution. Yes please report this as a separate bug and perhaps attach full stack trace. If you already have a fix, feel free to post the PR as well. > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flinkbot edited a comment on issue #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
flinkbot edited a comment on issue #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/7643#issuecomment-459849424 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @twalthr [PMC] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @twalthr [PMC] * ❔ 3. Needs [attention] from. * ❌ 4. The [architecture] is sound. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/7643#discussion_r253674313 ## File path: flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/catalog/exceptions/TableAlreadyExistException.java ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.catalog.exceptions; + +import org.apache.flink.table.api.catalog.ObjectPath; + +/** + * Exception for trying to create a table that already exists. Review comment: "... table **or view** that " This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/7643#discussion_r253673128 ## File path: flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/catalog/ReadableWritableCatalog.java ## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.catalog; + +import org.apache.flink.table.api.TableAlreadyExistException; +import org.apache.flink.table.api.TableNotExistException; +import org.apache.flink.table.api.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.api.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.plan.stats.TableStats; + +/** + * An interface responsible for manipulating catalog metadata. + */ +public interface ReadableWritableCatalog extends ReadableCatalog { + + // -- databases -- + + /** +* Adds a database to this catalog. +* +* @param dbNameThe name of the database to add. +* @param databaseThe database to add. +* @param ignoreIfExists Flag to specify behavior if a database with the given name already +* exists: if set to false, it throws a DatabaseAlreadyExistException, +* if set to true, nothing happens. +* @throws DatabaseAlreadyExistException +* thrown if the database does already exist in the catalog +* and ignoreIfExists is false +*/ + void createDatabase(String dbName, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException; + + /** +* Deletes a database from this catalog. +* +* @param dbNameName of the database to delete. +* @param ignoreIfNotExists Flag to specify behavior if the database does not exist: +* if set to false, throw an exception, +* if set to true, nothing happens. +* @throws DatabaseNotExistException thrown if the database does not exist in the catalog +*/ + void dropDatabase(String dbName, boolean ignoreIfNotExists) throws DatabaseNotExistException; + + /** +* Modifies an existing database. +* +* @param dbNameName of the database to modify. +* @param newDatabase The new database to replace the existing database. +* @param ignoreIfNotExists Flag to specify behavior if the database does not exist: +* if set to false, throw an exception, +* if set to true, nothing happens. +* @throws DatabaseNotExistException thrown if the database does not exist in the catalog +*/ + void alterDatabase(String dbName, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException; + + /** +* Renames an existing database. +* +* @param dbNameName of the database to modify. +* @param newDbName New name of the database. +* @param ignoreIfNotExists Flag to specify behavior if the database does not exist: +* if set to false, throw an exception, +* if set to true, nothing happens. +* @throws DatabaseNotExistException thrown if the database does not exist in the catalog +*/ + void renameDatabase(String dbName, String newDbName, boolean ignoreIfNotExists) Review comment: does it need to throw a 'DatabaseAlreadyExistsException' when the 'newDbName' is already taken? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/7643#discussion_r253673629 ## File path: flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/catalog/ReadableWritableCatalog.java ## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.catalog; + +import org.apache.flink.table.api.TableAlreadyExistException; +import org.apache.flink.table.api.TableNotExistException; +import org.apache.flink.table.api.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.api.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.plan.stats.TableStats; + +/** + * An interface responsible for manipulating catalog metadata. + */ +public interface ReadableWritableCatalog extends ReadableCatalog { + + // -- databases -- + + /** +* Adds a database to this catalog. +* +* @param dbNameThe name of the database to add. +* @param databaseThe database to add. +* @param ignoreIfExists Flag to specify behavior if a database with the given name already +* exists: if set to false, it throws a DatabaseAlreadyExistException, +* if set to true, nothing happens. +* @throws DatabaseAlreadyExistException +* thrown if the database does already exist in the catalog +* and ignoreIfExists is false +*/ + void createDatabase(String dbName, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException; + + /** +* Deletes a database from this catalog. +* +* @param dbNameName of the database to delete. +* @param ignoreIfNotExists Flag to specify behavior if the database does not exist: +* if set to false, throw an exception, +* if set to true, nothing happens. +* @throws DatabaseNotExistException thrown if the database does not exist in the catalog +*/ + void dropDatabase(String dbName, boolean ignoreIfNotExists) throws DatabaseNotExistException; + + /** +* Modifies an existing database. +* +* @param dbNameName of the database to modify. +* @param newDatabase The new database to replace the existing database. +* @param ignoreIfNotExists Flag to specify behavior if the database does not exist: +* if set to false, throw an exception, +* if set to true, nothing happens. +* @throws DatabaseNotExistException thrown if the database does not exist in the catalog +*/ + void alterDatabase(String dbName, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException; + + /** +* Renames an existing database. +* +* @param dbNameName of the database to modify. +* @param newDbName New name of the database. +* @param ignoreIfNotExists Flag to specify behavior if the database does not exist: +* if set to false, throw an exception, +* if set to true, nothing happens. +* @throws DatabaseNotExistException thrown if the database does not exist in the catalog +*/ + void renameDatabase(String dbName, String newDbName, boolean ignoreIfNotExists) + throws DatabaseNotExistException; + + // -- tables and views -- + + /** +* Deletes a table or view. +* +* @param objectName Path of the table or view to delete. +* @param ignoreIfNotExists Flag to specify behavior if the table or view does not exist: +* if set to false, throw an exception, +* if set to true, nothing happens. +* @
[GitHub] bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/7643#discussion_r253674435 ## File path: flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/catalog/exceptions/TableNotExistException.java ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.catalog.exceptions; + +import org.apache.flink.table.api.catalog.ObjectPath; + +/** + * Exception for trying to operate on a table that doesn't exist. + */ +public class TableNotExistException extends RuntimeException { + + private static final String MSG = "Table %s in catalog %s does not exist."; Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/7643#discussion_r253674164 ## File path: flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/catalog/exceptions/TableAlreadyExistException.java ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.catalog.exceptions; + +import org.apache.flink.table.api.catalog.ObjectPath; + +/** + * Exception for trying to create a table that already exists. + */ +public class TableAlreadyExistException extends RuntimeException { + + private static final String MSG = "Table $catalog.$table already exists."; Review comment: since this is shared between tables and views, shall the message be "Table **or view** ... already exists."? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/7643#discussion_r253674392 ## File path: flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/catalog/exceptions/TableNotExistException.java ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.catalog.exceptions; + +import org.apache.flink.table.api.catalog.ObjectPath; + +/** + * Exception for trying to operate on a table that doesn't exist. Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other … URL: https://github.com/apache/flink/pull/7643#discussion_r253671017 ## File path: flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/catalog/CatalogView.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.catalog; + +/** + * Represents a view in a catalog. + */ +public interface CatalogView extends CommonTable { + + /** +* Original text of the view definition. +* +* @return the original string literal provided by the user. +*/ + String getOriginalQuery(); + + /** +* Expanded text of the original view definition +* This is needed because the context such as current DB is +* lost after the session, in which view is defined, is gone. +* Expanded query text takes care of the this, as an example. Review comment: "... takes care of ~~the~~ this ..." shall we also add an example? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760263#comment-16760263 ] Thomas Weise commented on FLINK-4582: - [~tinder-dthomson] Thanks for the report and yes, please track it as a separate issue and mark the fix version as 1.8 so that we get it resolved as part of 1.8. > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760250#comment-16760250 ] Devin Thomson commented on FLINK-4582: -- [~yxu-lyft] circling back here, looks like this got merged in, congrats! I've been preemptively cutting over from my implementation to this one and noticed one (blocking) bug: {code:java} j.l.IllegalArgumentException: Conflicting setter definitions for property \"eventName\": org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params) vs org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params) at c.f.j.d.i.POJOPropertyBuilder.getSetter(POJOPropertyBuilder.java:300) at c.f.j.d.d.BeanDeserializerFactory.filterBeanProps(BeanDeserializerFactory.java:619) at c.f.j.d.d.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:515) at c.f.j.d.d.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:256) at c.f.j.d.d.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:169) at c.f.j.d.d.DeserializerCache._createDeserializer2(DeserializerCache.java:403) at c.f.j.d.d.DeserializerCache._createDeserializer(DeserializerCache.java:352) at c.f.j.d.d.DeserializerCache._createAndCache2(DeserializerCache.java:264) ... 15 common frames omitted Wrapped by: c.f.j.d.JsonMappingException: Conflicting setter definitions for property \"eventName\": org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params... {code} It looks like the root cause is that DynamoDBStreamsSchema.java defines the object mapper as follows: {code:java} private static final ObjectMapper MAPPER = new ObjectMapper(); {code} When it should be using the appropriate mix-ins offered by the dynamodb stream adapter library: {code:java} private static final ObjectMapper MAPPER = new RecordObjectMapper(); {code} This appears to resolve the issue, I tested by using my own deserializer implementation. Not sure if it makes sense to track this as a separate issue or not since this is still a 1.8-SNAPSHOT feature. Let me know if you have any questions! - Devin > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760250#comment-16760250 ] Devin Thomson edited comment on FLINK-4582 at 2/4/19 9:45 PM: -- [~yxu-lyft] circling back here, looks like this got merged in, congrats! I've been preemptively cutting over from my implementation to this one and noticed one (blocking) bug: {code:java} j.l.IllegalArgumentException: Conflicting setter definitions for property \"eventName\": org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params) vs org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params) at c.f.j.d.i.POJOPropertyBuilder.getSetter(POJOPropertyBuilder.java:300) at c.f.j.d.d.BeanDeserializerFactory.filterBeanProps(BeanDeserializerFactory.java:619) at c.f.j.d.d.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:515) at c.f.j.d.d.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:256) at c.f.j.d.d.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:169) at c.f.j.d.d.DeserializerCache._createDeserializer2(DeserializerCache.java:403) at c.f.j.d.d.DeserializerCache._createDeserializer(DeserializerCache.java:352) at c.f.j.d.d.DeserializerCache._createAndCache2(DeserializerCache.java:264) ... 15 common frames omitted Wrapped by: c.f.j.d.JsonMappingException: Conflicting setter definitions for property \"eventName\": org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params... {code} It looks like the root cause is that DynamoDBStreamsSchema.java defines the object mapper as follows: {code:java} private static final ObjectMapper MAPPER = new ObjectMapper(); {code} When it should be using the appropriate mix-ins offered by the dynamodb stream adapter library: {code:java} private static final ObjectMapper MAPPER = new RecordObjectMapper(); {code} This appears to resolve the issue, I tested by using my own deserializer implementation. Not sure if it makes sense to track this as a separate issue or not since this is still a 1.8-SNAPSHOT feature. Let me know if you have any questions! Devin was (Author: tinder-dthomson): [~yxu-lyft] circling back here, looks like this got merged in, congrats! I've been preemptively cutting over from my implementation to this one and noticed one (blocking) bug: {code:java} j.l.IllegalArgumentException: Conflicting setter definitions for property \"eventName\": org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params) vs org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params) at c.f.j.d.i.POJOPropertyBuilder.getSetter(POJOPropertyBuilder.java:300) at c.f.j.d.d.BeanDeserializerFactory.filterBeanProps(BeanDeserializerFactory.java:619) at c.f.j.d.d.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:515) at c.f.j.d.d.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:256) at c.f.j.d.d.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:169) at c.f.j.d.d.DeserializerCache._createDeserializer2(DeserializerCache.java:403) at c.f.j.d.d.DeserializerCache._createDeserializer(DeserializerCache.java:352) at c.f.j.d.d.DeserializerCache._createAndCache2(DeserializerCache.java:264) ... 15 common frames omitted Wrapped by: c.f.j.d.JsonMappingException: Conflicting setter definitions for property \"eventName\": org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params... {code} It looks like the root cause is that DynamoDBStreamsSchema.java defines the object mapper as follows: {code:java} private static final ObjectMapper MAPPER = new ObjectMapper(); {code} When it should be using the appropriate mix-ins offered by the dynamodb stream adapter library: {code:java} private static final ObjectMapper MAPPER = new RecordObjectMapper(); {code} This appears to resolve the issue, I tested by using my own deserializer implementation. Not sure if it makes sense to track this as a separate issue or not since this is still a 1.8-SNAPSHOT feature. Let me know if you have any questions! - Devin > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Rema
[GitHub] GJL removed a comment on issue #7645: [FLINK-11521] Make RetryingRegistration configurable
GJL removed a comment on issue #7645: [FLINK-11521] Make RetryingRegistration configurable URL: https://github.com/apache/flink/pull/7645#issuecomment-460333822 Thanks @flinkbot 🤦♂️ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…
flinkbot commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co… URL: https://github.com/apache/flink/pull/7632#issuecomment-460347329 This PR should now have its status reflected properly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot edited a comment on issue #7645: [FLINK-11521] Make RetryingRegistration configurable
flinkbot edited a comment on issue #7645: [FLINK-11521] Make RetryingRegistration configurable URL: https://github.com/apache/flink/pull/7645#issuecomment-459969990 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @GJL [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @GJL [committer] * ❔ 3. Needs [attention] from. * ✅ 4. The [architecture] is sound. - Approved by @GJL [committer] * ✅ 5. Overall code [quality] is good. - Approved by @GJL [committer] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot edited a comment on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String)
flinkbot edited a comment on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String) URL: https://github.com/apache/flink/pull/7651#issuecomment-460316970 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @twalthr [PMC] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @twalthr [PMC] * ❔ 3. Needs [attention] from. * ❌ 4. The [architecture] is sound. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot edited a comment on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…
flinkbot edited a comment on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co… URL: https://github.com/apache/flink/pull/7632#issuecomment-459738321 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @GJL [committer], @rmetzger [PMC] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @GJL [committer] * ❔ 3. Needs [attention] from. * ✅ 4. The [architecture] is sound. - Approved by @GJL [committer] * ✅ 5. Overall code [quality] is good. - Approved by @GJL [committer] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253463228 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -426,7 +426,7 @@ abstract class StreamTableEnvironment( converterFunction match { case Some(func) => -new CRowMapRunner[OUT](func.name, func.code, func.returnType) +new CRowToExternalTypeMapRunner[OUT](func.name, func.code, func.returnType) Review comment: As a general rule please do not mix simple refactorings like class renames and moving classes around with functional changes. Embedding those two things inside one commit makes sense only if the refactoring/rename is tightly coupled with the functional change and here it doesn't seem so. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7645: [FLINK-11521] Make RetryingRegistration configurable
GJL commented on issue #7645: [FLINK-11521] Make RetryingRegistration configurable URL: https://github.com/apache/flink/pull/7645#issuecomment-460332966 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7645: [FLINK-11521] Make RetryingRegistration configurable
GJL commented on issue #7645: [FLINK-11521] Make RetryingRegistration configurable URL: https://github.com/apache/flink/pull/7645#issuecomment-460333822 Thanks @flinkbot 🤦♂️ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253559564 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( Review comment: I'm asking on Calcite dev mailing list, trying to understand those methods. Until we get a clear response from them we can leave this as it is now. `Aggregate` node (which is almost identical from the semantics perspective as this one) also doesn't implement those methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253559564 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalUpsertToRetraction( Review comment: I'm syncing with Calcite dev mailing list, trying to understand those methods. Until we get a clear response from them lets leave this as it is now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253554512 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ## @@ -19,18 +19,49 @@ package org.apache.flink.table.plan.nodes import org.apache.calcite.plan.{RelOptCost, RelOptPlanner} +import org.apache.calcite.rel.core.Calc import org.apache.calcite.rel.metadata.RelMdUtil import org.apache.calcite.rex._ +import org.apache.calcite.sql.SqlKind import org.apache.flink.api.common.functions.Function import org.apache.flink.table.api.TableConfig import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ trait CommonCalc { + /** +* Returns empty if output field is not forwarded from the input for the calc. +*/ + private[flink] def getInputFromOutputName(calc: Calc, outputFieldName: String): Option[String] = { Review comment: Could you add a simple unit test for this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253551875 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/UpsertToRetractionProcessFunction.scala ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Function used to convert upsert to retractions. + * + * @param rowTypeInfo the output row type info. + * @param queryConfig the configuration for the query. + */ +class UpsertToRetractionProcessFunction( Review comment: Why not using `GroupAggProcessFunction` with `LAST_VALUE()` aggregate function? It would allow us to share the logic and adding `LAST_VALUE()` is valuable addition to Flink SQL on it's own. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253546583 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -19,22 +19,138 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.rex.RexNode -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.{TableConfig, Types} import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row -import org.apache.flink.table.runtime.CRowOutputProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner} trait StreamScan extends CommonScan[CRow] with DataStreamRel { + protected def convertUpsertToInternalRow( Review comment: Couple of remarks regarding this method: 1. Why does it have this `switch/case` for scala/java tuples while the `convertToInternalRow` doesn't? 2. I'm not sure, but It looks like it's duplicating logic from `convertToInternalRow`, maybe they should be deduplicated into one method and passing the `upsert/append` mode as a parameter for a couple of `if` statements? 3. Please split it into smaller methods and you could do the same for `convertToInternalRow`. More or less were ever there is a comment like `// Scala tuple` or `// input is already of correct type. Only need to wrap it as CRow` extract the matching piece of code to a named method like `convertScalaTuple(...)` or `wrapAsCRow(...)` 4. are all of the branches covered by the tests? Like the both branches of `if` under the java tuple? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253465887 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -19,22 +19,138 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.rex.RexNode -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.{TableConfig, Types} import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row -import org.apache.flink.table.runtime.CRowOutputProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner} trait StreamScan extends CommonScan[CRow] with DataStreamRel { + protected def convertUpsertToInternalRow( + schema: RowSchema, + input: DataStream[Any], + fieldIdxs: Array[Int], + config: TableConfig, + rowtimeExpression: Option[RexNode]): DataStream[CRow] = { + +val internalType = schema.typeInfo +val cRowType = CRowTypeInfo(internalType) + +val hasTimeIndicator = fieldIdxs.exists(f => + f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER || +f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER) + +val dsType = input.getType + +dsType match { +// Scala tuple + case t: CaseClassTypeInfo[_] +if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == Types.BOOLEAN => + +val inputType = t.getTypeAt[Any](1) +if (inputType == internalType && !hasTimeIndicator) { + // input is already of correct type. Only need to wrap it as CRow + input.asInstanceOf[DataStream[(Boolean, Row)]] +.map(new RichMapFunction[(Boolean, Row), CRow] { + @transient private var outCRow: CRow = _ + override def open(parameters: Configuration): Unit = { +outCRow = new CRow(null, change = true) + } + + override def map(v: (Boolean, Row)): CRow = { +outCRow.row = v._2 +outCRow.change = v._1 +outCRow + } +}).returns(cRowType) + +} else { + // input needs to be converted and wrapped as CRow or time indicators need to be generated + + val function = generateConversionProcessFunction( +config, +inputType.asInstanceOf[TypeInformation[Any]], +internalType, +"UpsertStreamSourceConversion", +schema.fieldNames, +fieldIdxs, +rowtimeExpression + ) + + val processFunc = new ScalaTupleToCRowProcessRunner( +function.name, +function.code, +cRowType) + + val opName = s"from: (${schema.fieldNames.mkString(", ")})" + + input +.asInstanceOf[DataStream[(Boolean, Any)]] +.process(processFunc).name(opName).returns(cRowType) +} + + // Java tuple + case t: TupleTypeInfo[_] +if t.getTypeClass == classOf[JTuple2[_, _]] && t.getTypeAt(0) == Types.BOOLEAN => + +val inputType = t.getTypeAt[Any](1) +if (inputType == internalType && !hasTimeIndicator) { + // input is already of correct type. Only need to wrap it as CRow + input.asInstanceOf[DataStream[JTuple2[JBool, Row]]] +.map(new RichMapFunction[JTuple2[JBool, Row], CRow] { + @transient private var outCRow: CRow = _ + override def open(parameters: Configuration): Unit = { +outCRow = new CRow(null, change = true) + } + + override def map(v: JTuple2[JBool, Row]): CRow = { +outCRow.row = v.f1 +outCRow.change = v.f0 +outCRow + } +}).returns(cRowType) + +} else { + // input needs to be converted and wrapped as CRow or time indicators n
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253468143 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -19,22 +19,138 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.rex.RexNode -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.{TableConfig, Types} import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row -import org.apache.flink.table.runtime.CRowOutputProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner} trait StreamScan extends CommonScan[CRow] with DataStreamRel { + protected def convertUpsertToInternalRow( + schema: RowSchema, + input: DataStream[Any], + fieldIdxs: Array[Int], + config: TableConfig, + rowtimeExpression: Option[RexNode]): DataStream[CRow] = { + +val internalType = schema.typeInfo +val cRowType = CRowTypeInfo(internalType) + +val hasTimeIndicator = fieldIdxs.exists(f => Review comment: rename `f` -> `fieldIndex` `fieldIdxs` -> `fieldIndexes` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253463228 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -426,7 +426,7 @@ abstract class StreamTableEnvironment( converterFunction match { case Some(func) => -new CRowMapRunner[OUT](func.name, func.code, func.returnType) +new CRowToExternalTypeMapRunner[OUT](func.name, func.code, func.returnType) Review comment: Please, as a general rule do not mix simple refactorings like class renames and moving classes around with functional changes. Embedding those two things inside one commit makes sense only if the refactoring/rename is tightly coupled with the functional change and here it doesn't seem so. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r253557485 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalUpsertToRetraction} + +import scala.collection.JavaConversions._ + +/** + * Use this rule to transpose Calc through UpsertToRetraction relnode. It is beneficial if we get + * smaller state size in upsertToRetraction. + */ +class CalcUpsertToRetractionTransposeRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], +operand(classOf[FlinkLogicalUpsertToRetraction], none)), + "CalcUpsertToRetractionTransposeRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] + +fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc) + } + + override def onMatch(call: RelOptRuleCall) { +val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc] +val upsertToRetraction = call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction] +call.transformTo(getNewUpsertToRetraction(calc, upsertToRetraction)) + } + + private def getNewUpsertToRetraction( +calc: FlinkLogicalCalc, +upsertToRetraction: FlinkLogicalUpsertToRetraction): FlinkLogicalUpsertToRetraction = { + +val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, calc.getProgram) + +val oldKeyNames = upsertToRetraction.keyNames +val newKeyNames = getNamesAfterCalc(oldKeyNames, calc) +new FlinkLogicalUpsertToRetraction( + upsertToRetraction.getCluster, + upsertToRetraction.getTraitSet, + newCalc, + newKeyNames) + } + + private def fieldsRemainAfterCalc(fields: Seq[String], calc: FlinkLogicalCalc): Boolean = { +calc.getRowType.getFieldNames + .flatMap(calc.getInputFromOutputName(calc, _)) Review comment: [newbie scala question] How does it work? `flatMap` here is unwrapping `Option` returned from `getInputFromOutputName`? If `Option` is empty, the result list will have null, will it be empty or will it throw an exception? (two first options are acceptable ;) ) [/newbie scala question] This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String)
twalthr commented on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String) URL: https://github.com/apache/flink/pull/7651#issuecomment-460321719 @flinkbot approve description @flinkbot approve consensus This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu opened a new pull request #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String)
dianfu opened a new pull request #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String) URL: https://github.com/apache/flink/pull/7651 ## What is the purpose of the change *This pull request deprecates the constructor new Table(TableEnvironment, String)* ## Brief change log - *Deprecates new Table(TableEnvironment, String)* - *Introduces Table.lateralJoin, Table.leftOuterLateralJoin* - *Move the implicit of 'TableFunction to Table' to ExpressionDsl as the otherwise this implicit always has the highest priority* ## Verifying this change This change added tests and can be verified as follows: *(example:)* - *Added tests CorrelateStringExpressionTest.testCorrelateJoinsWithJoinLateral* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (yes) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs & JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11447) Deprecate "new Table(TableEnvironment, String)"
[ https://issues.apache.org/jira/browse/FLINK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-11447: - Priority: Blocker (was: Major) > Deprecate "new Table(TableEnvironment, String)" > --- > > Key: FLINK-11447 > URL: https://issues.apache.org/jira/browse/FLINK-11447 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dian Fu >Priority: Blocker > > A more detailed description can be found in > [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions]. > Once table is an interface we can easily replace the underlying > implementation at any time. The constructor call prevents us from converting > it into an interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flinkbot commented on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String)
flinkbot commented on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String) URL: https://github.com/apache/flink/pull/7651#issuecomment-460316970 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❌ 1. The [description] looks good. * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The [architecture] is sound. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10644) Batch Job: Speculative execution
[ https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760004#comment-16760004 ] Greg Hogan commented on FLINK-10644: I'm not so sure that speculative execution is a good fit for Apache Flink. In MapReduce there are two concepts conducive to speculative execution not present in Flink: 1) In MapReduce map/reduce/task to mapper/reducer/container ratio is often 10:1 or higher. In Flink all tasks are immediately assigned and processed in parallel. 2) In MapReduce intermediate and output data is always persisted whereas in Flink only state is persisted (and only in streaming). Input is assumed to be replayable but speculative execution would presumably also work for intermediate tasks. As noted, Spark has included speculative execution and the Spark processing model is closer to Flink's. I'm just not clear on the circumstances where it is beneficial to start a catch-up task so late. I haven't followed the work on unification of batch and streaming but it seems more valuable to focus on transition a task from a straggler machine rather than start that task over. > Batch Job: Speculative execution > > > Key: FLINK-10644 > URL: https://issues.apache.org/jira/browse/FLINK-10644 > Project: Flink > Issue Type: New Feature > Components: JobManager >Reporter: JIN SUN >Assignee: ryantaocer >Priority: Major > Fix For: 1.8.0 > > > Strugglers/outlier are tasks that run slower than most of the all tasks in a > Batch Job, this somehow impact job latency, as pretty much this straggler > will be in the critical path of the job and become as the bottleneck. > Tasks may be slow for various reasons, including hardware degradation, or > software mis-configuration, or noise neighboring. It's hard for JM to predict > the runtime. > To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark > has *_speculative execution_*. Speculative execution is a health-check > procedure that checks for tasks to be speculated, i.e. running slower in a > ExecutionJobVertex than the median of all successfully completed tasks in > that EJV, Such slow tasks will be re-submitted to another TM. It will not > stop the slow tasks, but run a new copy in parallel. And will kill the others > if one of them complete. > This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be > append later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dianfu commented on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String)
dianfu commented on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String) URL: https://github.com/apache/flink/pull/7651#issuecomment-460317427 @twalthr @dawidwys Could you help to take a look at this PR when it's convenient for you? Thanks in advance. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11449) Uncouple the Expression class from RexNodes
[ https://issues.apache.org/jira/browse/FLINK-11449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760010#comment-16760010 ] Timo Walther commented on FLINK-11449: -- Thanks for working on this [~sunjincheng121]. I think most of the changes should be clearly defined now. Feel free to start developing. I will summarize the approach again here: In {{flink-table-api-common}}, because we can even put those expression classes into flink-table-common for FilterableTableSources in the future: {code} public class FunctionDefinition { public String name; } public class ScalarFunctionDefinition extends FunctionDefinition { // the name is ignored for now public UserDefinedFunction f; } public class TableFunctionDefinition extends FunctionDefinition { // the name is ignored for now public UserDefinedFunction f; } public class AggregateFunctionDefinition extends FunctionDefinition { // the name is ignored for now public UserDefinedFunction f; } public class Expression { abstract List getChildren(); abstract R accept(ExpressionVisitor visitor); // do we need more methods here? } public final class FunctionDefinitions { // implement just a basic definition (only name) for know // we can extend this functionality in the future with type inference logic etc. public static final FunctionDefinition TRIM = new FunctionDefinition("trim"); public static final FunctionDefinition CAST = new FunctionDefinition("cast"); public static List getDefinitions() { // use reflection for all definitions similar to Calcite's SqlStdOperatorTable } } CallExpression(FunctionDefinition, Expression*) extends Expression SymbolExpression(TableSymbol ???) extends Expression FieldReferenceExpression(String) extends Expression TypeLiteralExpression(TypeInformation) extends Expression ValueLiteralExpression(Object) extends Expression {code} In {{flink-table-api-java}}: {code} TableReferenceExpression(String, Table) extends Expression {code} In {{flink-table-planner}}: {code} + visitor pattern in the planner to translate (can be in Scala) in Flink: to Flink's case classes in Blink: to Blink's case classes visit(call Call) match { case TRIM => } {code} expressionDsl.scala and ExpressionParser.scala can still remain implemented in Scala. > Uncouple the Expression class from RexNodes > --- > > Key: FLINK-11449 > URL: https://issues.apache.org/jira/browse/FLINK-11449 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: sunjincheng >Priority: Major > > A more detailed description can be found in > [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions]. > Calcite will not be part of any API module anymore. Therefore, RexNode > translation must happen in a different layer. This issue will require a new > design document. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11447) Deprecate "new Table(TableEnvironment, String)"
[ https://issues.apache.org/jira/browse/FLINK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11447: --- Labels: pull-request-available (was: ) > Deprecate "new Table(TableEnvironment, String)" > --- > > Key: FLINK-11447 > URL: https://issues.apache.org/jira/browse/FLINK-11447 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dian Fu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.8.0 > > > A more detailed description can be found in > [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions]. > Once table is an interface we can easily replace the underlying > implementation at any time. The constructor call prevents us from converting > it into an interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11447) Deprecate "new Table(TableEnvironment, String)"
[ https://issues.apache.org/jira/browse/FLINK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16759990#comment-16759990 ] Timo Walther commented on FLINK-11447: -- Happy Chinese New Year everyone :) [~dian.fu] thanks for working on this issue. It would be great to get this issue done as quickly as possible. I will be out of office next week, so it would be great to merge it even this week. Let me know if I can help. I can also implement it myself if you don't have capacities. I will make this issue a blocker for the next Flink release. > Deprecate "new Table(TableEnvironment, String)" > --- > > Key: FLINK-11447 > URL: https://issues.apache.org/jira/browse/FLINK-11447 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dian Fu >Priority: Major > > A more detailed description can be found in > [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions]. > Once table is an interface we can easily replace the underlying > implementation at any time. The constructor call prevents us from converting > it into an interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11447) Deprecate "new Table(TableEnvironment, String)"
[ https://issues.apache.org/jira/browse/FLINK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-11447: - Fix Version/s: 1.8.0 > Deprecate "new Table(TableEnvironment, String)" > --- > > Key: FLINK-11447 > URL: https://issues.apache.org/jira/browse/FLINK-11447 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dian Fu >Priority: Blocker > Fix For: 1.8.0 > > > A more detailed description can be found in > [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions]. > Once table is an interface we can easily replace the underlying > implementation at any time. The constructor call prevents us from converting > it into an interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
StefanRRichter commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253534721 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ## @@ -608,12 +569,11 @@ public void run() { Review comment: I think all wait related things can go away. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
asfgit closed pull request #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11352) Check and port JobManagerHACheckpointRecoveryITCase to new code base if necessary
[ https://issues.apache.org/jira/browse/FLINK-11352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-11352. - Resolution: Fixed Fix Version/s: 1.8.0 Fixed via 8d25e88ec7de962927de350b4702ec183699afb3 > Check and port JobManagerHACheckpointRecoveryITCase to new code base if > necessary > - > > Key: FLINK-11352 > URL: https://issues.apache.org/jira/browse/FLINK-11352 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Till Rohrmann >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Check and port {{JobManagerHACheckpointRecoveryITCase}} to new code base if > necessary -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
tillrohrmann commented on a change in pull request #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#discussion_r253529750 ## File path: flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java ## @@ -220,6 +221,187 @@ public void flatMap(Long value, Collector out) throws Exception { env.execute(); } + /** +* Tests that the JobManager logs failures during recovery properly. +* +* @see https://issues.apache.org/jira/browse/FLINK-3185";>FLINK-3185 +*/ + @Test + public void testJobManagerRecoveryFailureLog() throws Exception { + final Time timeout = Time.seconds(30L); + final File zookeeperStoragePath = temporaryFolder.newFolder(); + + // Config + final int numberOfJobManagers = 2; + final int numberOfTaskManagers = 2; + final int numberOfSlotsPerTaskManager = 2; + + assertEquals(PARALLELISM, numberOfTaskManagers * numberOfSlotsPerTaskManager); + + // Job managers + final DispatcherProcess[] dispatcherProcesses = new DispatcherProcess[numberOfJobManagers]; + + // Task managers + TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numberOfTaskManagers]; + + HighAvailabilityServices highAvailabilityServices = null; + + LeaderRetrievalService leaderRetrievalService = null; + + // Coordination between the processes goes through a directory + File coordinateTempDir = null; + + // Cluster config + Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( + zooKeeper.getConnectString(), zookeeperStoragePath.getPath()); + // Task manager configuration + config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m"); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); + + final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config); + + try { + final Deadline deadline = TestTimeOut.fromNow(); + + // Coordination directory + coordinateTempDir = temporaryFolder.newFolder(); + + // Start first process + dispatcherProcesses[0] = new DispatcherProcess(0, config); + dispatcherProcesses[0].startProcess(); + + highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( + config, + TestingUtils.defaultExecutor()); + + // Start the task manager process + for (int i = 0; i < numberOfTaskManagers; i++) { + taskManagerRunners[i] = new TaskManagerRunner(config, ResourceID.generate()); + taskManagerRunners[i].start(); + } + + // Leader listener + TestingListener leaderListener = new TestingListener(); + leaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); + leaderRetrievalService.start(leaderListener); + + // Initial submission + leaderListener.waitForNewLeader(deadline.timeLeft().toMillis()); + + String leaderAddress = leaderListener.getAddress(); + UUID leaderId = leaderListener.getLeaderSessionID(); + + final CompletableFuture dispatcherGatewayFuture = rpcService.connect( + leaderAddress, + DispatcherId.fromUuid(leaderId), + DispatcherGateway.class); + final DispatcherGateway dispatcherGateway = dispatcherGatewayFuture.get(); + + // Wait for all task managers to connect to the leading job manager + waitForTaskManagers(numberOfTaskManagers, dispatcherGateway, deadline.timeLeft()); + + final File coordinateDirClosure = coordinateTempDir; + final Throwable[] errorRef = new Throwable[1]; + + // we trigger program execution in a separate thread + Thread programTrigger = new Thread("Program Trigger") { + @Override + public void run() { + try { + testJobManagerFailur
[GitHub] tillrohrmann commented on a change in pull request #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
tillrohrmann commented on a change in pull request #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#discussion_r253532313 ## File path: flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java ## @@ -220,6 +221,187 @@ public void flatMap(Long value, Collector out) throws Exception { env.execute(); } + /** +* Tests that the JobManager logs failures during recovery properly. +* +* @see https://issues.apache.org/jira/browse/FLINK-3185";>FLINK-3185 +*/ + @Test + public void testJobManagerRecoveryFailureLog() throws Exception { Review comment: I think it would be better to test the behaviour more fine grained. What we actually tested was that the recovery logs a message if there is a recovery failure due to non-existing state handles. However, the behaviour slightly changed such that a failing recovery will lead to a `Dispatcher` termination (see `DispatcherHATest#testFailingRecoveryIsAFatalError`). Thus, I think this test is no longer necessary and can be removed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rmetzger commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…
rmetzger commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co… URL: https://github.com/apache/flink/pull/7632#issuecomment-460301104 Yes, I will. I was able to reproduce the issue. Re-sending the message will not fix the problem. Mentioning the bot again might help. I'll let you know when there's an update This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…
GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co… URL: https://github.com/apache/flink/pull/7632#issuecomment-460300135 @rmetzger Let me know if you find something. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…
GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co… URL: https://github.com/apache/flink/pull/7632#issuecomment-460299894 Ugh... 🤦♂️ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…
GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co… URL: https://github.com/apache/flink/pull/7632#issuecomment-460299749 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#issuecomment-460102649 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @tillrohrmann [PMC] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @tillrohrmann [PMC] * ❗ 3. Needs [attention] from. - Needs attention by @flinkbot * ❌ 4. The [architecture] is sound. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#issuecomment-460102649 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @tillrohrmann [PMC] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @tillrohrmann [PMC] * ❗ 3. Needs [attention] from. - Needs attention by @flinkbot * ❌ 4. The [architecture] is sound. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann removed a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
tillrohrmann removed a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#issuecomment-460294817 @flinkbot approve attention @flinkbot This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#issuecomment-460295289 @flinkbot attention @flinkbot This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#issuecomment-460294817 @flinkbot approve attention @flinkbot This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11447) Deprecate "new Table(TableEnvironment, String)"
[ https://issues.apache.org/jira/browse/FLINK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16759943#comment-16759943 ] sunjincheng commented on FLINK-11447: - I am very happy that everyone has reached an agreement. Looking forward the PR! And Happy Chinese New Year ! > Deprecate "new Table(TableEnvironment, String)" > --- > > Key: FLINK-11447 > URL: https://issues.apache.org/jira/browse/FLINK-11447 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Dian Fu >Priority: Major > > A more detailed description can be found in > [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions]. > Once table is an interface we can easily replace the underlying > implementation at any time. The constructor call prevents us from converting > it into an interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] rmetzger commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…
rmetzger commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co… URL: https://github.com/apache/flink/pull/7632#issuecomment-460278469 The bot has still not updated the tracking comment. I'm looking into it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…
GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co… URL: https://github.com/apache/flink/pull/7632#issuecomment-460271888 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…
GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co… URL: https://github.com/apache/flink/pull/7632#issuecomment-460271152 @flinkbot disapprove description This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…
GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co… URL: https://github.com/apache/flink/pull/7632#issuecomment-460271199 @flinkbot approve description This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#issuecomment-460102649 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @tillrohrmann [PMC] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @tillrohrmann [PMC] * ❔ 3. Needs [attention] from. * ❌ 4. The [architecture] is sound. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…
GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co… URL: https://github.com/apache/flink/pull/7632#issuecomment-460271030 @flinkbot approve architecture This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…
GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co… URL: https://github.com/apache/flink/pull/7632#issuecomment-460270959 @flinkbot disapprove architecture This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#issuecomment-460102649 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @tillrohrmann [PMC] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @tillrohrmann [PMC] * ❔ 3. Needs [attention] from. * ❌ 4. The [architecture] is sound. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#issuecomment-460270690 @flinkbot approve consensus This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#issuecomment-460102649 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @tillrohrmann [PMC] * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The [architecture] is sound. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#issuecomment-460102649 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @tillrohrmann [PMC] * ❌ 2. There is [consensus] that the contribution should go into to Flink. * ❔ 3. Needs [attention] from. * ❌ 4. The [architecture] is sound. * ❌ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve the 1st aspect (similarly, it also supports the `consensus`, `architecture` and `quality` keywords) - `@flinkbot approve all` to approve all aspects - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#issuecomment-460270092 @flinkbot approve description This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann removed a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
tillrohrmann removed a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#issuecomment-460269748 @flinkbot approve description This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#issuecomment-460269748 @flinkbot approve description consensus This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
StefanRRichter commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253491234 ## File path: flink-end-to-end-tests/test-scripts/test_ha_datastream.sh ## @@ -51,6 +51,7 @@ function run_ha_test() { # change the pid dir to start log files always from 0, this is important for checks in the # jm killing loop set_conf "env.pid.dir" "${TEST_DATA_DIR}" +set_conf "env.java.opts" "-ea" Review comment: I think we might want to reach consensus about that on a broader scope than just this PR. IMO, `-ea` should be enabled in all tests. I enabled it here because it was relevant for my PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
tillrohrmann edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base URL: https://github.com/apache/flink/pull/7647#issuecomment-460269748 @flinkbot approve description This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253485428 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestRestartStrategy.java ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.executiongraph.restart.RestartCallback; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; + +import javax.annotation.Nonnull; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * A {@link RestartStrategy} for tests that gives fine-grained control over the point in time when restart actions are + * performed. + */ +public class TestRestartStrategy implements RestartStrategy { + + @Nonnull + private final Queue actionsQueue; + + private final int maxRestarts; + + private int restartAttempts; + + private boolean manuallyTriggeredExecution; + + public TestRestartStrategy() { + this(true); + } + + public TestRestartStrategy(boolean manuallyTriggeredExecution) { + this(-1, manuallyTriggeredExecution); + } + + public TestRestartStrategy(int maxRestarts, boolean manuallyTriggeredExecution) { + this(new LinkedList<>(), maxRestarts, manuallyTriggeredExecution); + } + + public TestRestartStrategy( + @Nonnull Queue actionsQueue, + int maxRestarts, + boolean manuallyTriggeredExecution) { + + this.actionsQueue = actionsQueue; + this.maxRestarts = maxRestarts; + this.manuallyTriggeredExecution = manuallyTriggeredExecution; + } + + @Override + public boolean canRestart() { + return maxRestarts < 0 || maxRestarts - restartAttempts > 0; + } + + @Override + public void restart(RestartCallback restarter, ScheduledExecutor executor) { + + ++restartAttempts; + ExecutorAction executorAction = new ExecutorAction(restarter::triggerFullRecovery, executor); + if (manuallyTriggeredExecution) { + synchronized (actionsQueue) { + actionsQueue.add(executorAction); + } + } else { + executorAction.trigger(); + } + } + + public int getNumberOfQueuedActions() { + synchronized (actionsQueue) { + return actionsQueue.size(); + } + } + + public CompletableFuture triggerNextAction() { + synchronized (actionsQueue) { + return actionsQueue.remove().trigger(); + } + } + + public CompletableFuture triggerAll() { + + synchronized (actionsQueue) { + + if (actionsQueue.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + CompletableFuture[] completableFutures = new CompletableFuture[actionsQueue.size()]; + for (int i = 0; i < completableFutures.length; ++i) { + completableFutures[i] = triggerNextAction(); + } + return FutureUtils.ConjunctFuture.allOf(completableFutures); Review comment: Better to use `CompletableFuture.allOf(completableFutures)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253429216 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1789,4 +1835,10 @@ void notifyExecutionChange( } } } + + void assertRunningInJobMasterMainThread() { + if (!(jobMasterMainThreadExecutor instanceof ComponentMainThreadExecutor.DummyComponentMainThreadExecutor)) { + jobMasterMainThreadExecutor.assertRunningInMainThread(); + } + } } Review comment: Nice, in the end it's very few changes to the `ExecutionGraph` :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253420517 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledFutureAdapter.java ## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.annotation.VisibleForTesting; + +import javax.annotation.Nonnull; + +import java.util.Objects; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * Adapter from {@link Future} to {@link ScheduledFuture}. This enriches the basic future with scheduling information. + * @param value type of the future. + */ +public class ScheduledFutureAdapter implements ScheduledFuture { + + /** The uid sequence generator. */ + private static final AtomicLong SEQUENCE_GEN = new AtomicLong(); + + /** The encapsulated basic future to which execution is delegated. */ + @Nonnull + private final Future delegate; + + /** Tie-breaker for {@link #compareTo(Delayed)}. */ + private final long tieBreakerUid; + + /** The time when this is scheduled for execution in nanoseconds.*/ + private final long scheduleTimeNanos; + + public ScheduledFutureAdapter( + @Nonnull Future delegate, + long delay, + @Nonnull TimeUnit timeUnit) { + this( + delegate, + System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, timeUnit), + SEQUENCE_GEN.incrementAndGet()); + } + + @VisibleForTesting + ScheduledFutureAdapter( + @Nonnull Future delegate, + long scheduleTimeNanos, + long tieBreakerUid) { + + this.delegate = delegate; + this.scheduleTimeNanos = scheduleTimeNanos; + this.tieBreakerUid = tieBreakerUid; + } + + @Override + public long getDelay(@Nonnull TimeUnit unit) { + return unit.convert(scheduleTimeNanos - System.nanoTime(), TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(@Nonnull Delayed o) { + + if (o == this) { + return 0; + } + + // tie breaking for ScheduledFutureAdapter objects + if (o instanceof ScheduledFutureAdapter) { + int cmp = Long.compare(scheduleTimeNanos, ((ScheduledFutureAdapter) o).scheduleTimeNanos); Review comment: Nitpick: Cast could be `ScheduledFutureAdapter` in order to avoid a warning of raw class usage. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253418747 ## File path: flink-end-to-end-tests/test-scripts/test_ha_datastream.sh ## @@ -51,6 +51,7 @@ function run_ha_test() { # change the pid dir to start log files always from 0, this is important for checks in the # jm killing loop set_conf "env.pid.dir" "${TEST_DATA_DIR}" +set_conf "env.java.opts" "-ea" Review comment: Do we only want to enable assertions forthis specific test or for all e2e tests? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253480625 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ## @@ -512,31 +584,38 @@ public void testSuspendWhileRestarting() throws Exception { controllableRestartStrategy, scheduler); - eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + eg.start(testMainThread.getMainThreadExecutor()); - assertEquals(JobStatus.CREATED, eg.getState()); + testMainThread.execute(() -> { + eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); - eg.scheduleForExecution(); + assertEquals(JobStatus.CREATED, eg.getState()); - assertEquals(JobStatus.RUNNING, eg.getState()); + eg.scheduleForExecution(); + assertEquals(JobStatus.RUNNING, eg.getState()); + }); - instance.markDead(); + testMainThread.execute(instance::markDead); controllableRestartStrategy.getReachedCanRestart().await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); - assertEquals(JobStatus.RESTARTING, eg.getState()); + testMainThread.getMainThreadExecutor().execute(() -> { - eg.suspend(new Exception("Test exception")); + assertEquals(JobStatus.RESTARTING, eg.getState()); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + eg.suspend(new Exception("Test exception")); + assertEquals(JobStatus.SUSPENDED, eg.getState()); + }); controllableRestartStrategy.unlockRestart(); - controllableRestartStrategy.getRestartDone().await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); - assertEquals(JobStatus.SUSPENDED, eg.getState()); + testMainThread.execute(() -> { + assertEquals(JobStatus.SUSPENDED, eg.getState()); + }); } + @Ignore @Test public void testConcurrentLocalFailAndRestart() throws Exception { Review comment: I would suggest to rename this test case since it is not concurrent anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253448003 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ScheduledFutureAdapterTest.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Unit tests for {@link ScheduledFutureAdapter}. + */ +public class ScheduledFutureAdapterTest extends TestLogger { + + private ScheduledFutureAdapter objectUnderTest; + private TestFuture innerDelegate; + + @Before + public void before() throws Exception { + this.innerDelegate = new TestFuture(); + this.objectUnderTest = new ScheduledFutureAdapter<>(innerDelegate, 420321L, TimeUnit.NANOSECONDS); + } + + @Test + public void testForwardedMethods() throws Exception { + + Assert.assertEquals((Integer) 4711, objectUnderTest.get()); + Assert.assertEquals(1, innerDelegate.getGetInvocationCount()); + + Assert.assertEquals((Integer) 4711, objectUnderTest.get(42L, TimeUnit.SECONDS)); + Assert.assertEquals(1, innerDelegate.getGetTimeoutInvocationCount()); + + Assert.assertEquals(innerDelegate.isCancelExpected(), objectUnderTest.cancel(true)); + Assert.assertEquals(1, innerDelegate.getCancelInvocationCount()); + + innerDelegate.setCancelResult(!innerDelegate.isCancelExpected()); + Assert.assertEquals(innerDelegate.isCancelExpected(), objectUnderTest.cancel(true)); + Assert.assertEquals(2, innerDelegate.getCancelInvocationCount()); + + Assert.assertEquals(innerDelegate.isCancelledExpected(), objectUnderTest.isCancelled()); + Assert.assertEquals(1, innerDelegate.getIsCancelledInvocationCount()); + + innerDelegate.setIsCancelledResult(!innerDelegate.isCancelledExpected()); + Assert.assertEquals(innerDelegate.isCancelledExpected(), objectUnderTest.isCancelled()); + Assert.assertEquals(2, innerDelegate.getIsCancelledInvocationCount()); + + Assert.assertEquals(innerDelegate.isDoneExpected(), objectUnderTest.isDone()); + Assert.assertEquals(1, innerDelegate.getIsDoneInvocationCount()); + + innerDelegate.setIsDoneExpected(!innerDelegate.isDoneExpected()); + Assert.assertEquals(innerDelegate.isDoneExpected(), objectUnderTest.isDone()); + Assert.assertEquals(2, innerDelegate.getIsDoneInvocationCount()); + } + + @Test + public void testCompareToEqualsHashCode() { + + Assert.assertEquals(0, objectUnderTest.compareTo(objectUnderTest)); + Assert.assertEquals(0, objectUnderTest.compareTo(objectUnderTest)); Review comment: Is it intentional that we test twice `equals`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253448367 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ComponentMainThreadTestExecutor.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.util.function.FunctionUtils; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.junit.rules.ExternalResource; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Util to run test calls with a provided main thread executor. + */ +public class ComponentMainThreadTestExecutor { + + /** The main thread executor to which execution is delegated. */ + @Nonnull + private final TestComponentMainThreadExecutor mainThreadExecutor; + + public ComponentMainThreadTestExecutor(@Nonnull TestComponentMainThreadExecutor mainThreadExecutor) { + this.mainThreadExecutor = mainThreadExecutor; + } + + /** +* Executes the given supplier with the main thread executor until completion, returns the result or a exception. +* This method blocks until the execution is complete. +*/ + public U execute(@Nonnull SupplierWithException supplierWithException) { + return CompletableFuture.supplyAsync( + FunctionUtils.uncheckedSupplier(supplierWithException), + mainThreadExecutor) + .join(); + } + + /** +* Executes the given runnable with the main thread executor and blocks until completion. +*/ + public void execute(@Nonnull ThrowingRunnable throwingRunnable) { + execute(() -> { + throwingRunnable.run(); + return null; + }); + } + + @Nonnull + public TestComponentMainThreadExecutor getMainThreadExecutor() { + return mainThreadExecutor; + } + + /** +* Test resource for convenience. +*/ + public static class Resource extends ExternalResource { + + private ComponentMainThreadTestExecutor componentMainThreadTestExecutor; + private ScheduledExecutorService innerExecutorService; + + @Override + protected void before() throws Throwable { + this.innerExecutorService = Executors.newSingleThreadScheduledExecutor(); + this.componentMainThreadTestExecutor = + new ComponentMainThreadTestExecutor( + TestComponentMainThreadExecutor.forSingleThreadExecutor(innerExecutorService)); + } + + @Override + protected void after() { + this.innerExecutorService.shutdown(); + this.innerExecutorService.shutdownNow(); + try { + this.innerExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } Review comment: Here we could use the `ExecutorUtils#gracefulShutdown` to avoid code duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253486672 ## File path: flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredDirectExecutor.java ## @@ -32,8 +33,17 @@ */ public class ManuallyTriggeredDirectExecutor implements Executor { Review comment: What is the difference of this class and `ManuallyTriggeredScheduledExecutor`? Could they be unified? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253480445 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ## @@ -608,12 +569,11 @@ public void run() { Review comment: Do we still need these wait calls? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253484809 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestComponentMainThreadExecutor.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; + +/** + * An implementation of {@link org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor} for tests. + */ +public class TestComponentMainThreadExecutor extends ComponentMainThreadExecutorServiceAdapter { Review comment: I have to admit that I'm a bit confused by the naming `TestComponentMainThreadExecutor` and `ComponentMainThreadTestExecutor`. Especially since the `ComponentMainThreadTestExecutor` takes a `TestComponentMainThreadExecutor` as a constructor argument. Maybe we could name the one `TestingComponentMainThreadExecutorServiceAdapter` and the other one `TestingComponentMainThreadExecutor`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253445788 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java ## @@ -44,4 +51,28 @@ public void exitMainThread() { assert(endpoint.currentMainThread.compareAndSet(Thread.currentThread(), null)) : "The RpcEndpoint has concurrent access from " + endpoint.currentMainThread.get(); } + + /** +* Returns true iff the current thread is equals to the provided expected thread and logs violations. +* +* @param expected the expected main thread. +* @return true iff the current thread is equals to the provided expected thread. +*/ + public static boolean isRunningInExpectedThread(@Nullable Thread expected) { + Thread actual = Thread.currentThread(); + if (expected != actual) { + + String violationMsg = "Violation of main thread constraint detected: expected <" + + expected + "> but running in <" + actual + ">."; + + log.warn(violationMsg, + expected, + actual, Review comment: I think `expected` and `actual` is no longer needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253422204 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java ## @@ -108,25 +92,28 @@ public void onTaskFailure(Execution taskExecution, Throwable cause) { // it helps to support better testing final CompletableFuture terminationFuture = taskExecution.getTerminalStateFuture(); - final ExecutionVertex vertexToRecover = taskExecution.getVertex(); - final long globalModVersion = taskExecution.getGlobalModVersion(); - - terminationFuture.thenAcceptAsync( - (ExecutionState value) -> { - try { - long createTimestamp = System.currentTimeMillis(); - Execution newExecution = vertexToRecover.resetForNewExecution(createTimestamp, globalModVersion); - newExecution.scheduleForExecution(); - } - catch (GlobalModVersionMismatch e) { - // this happens if a concurrent global recovery happens. simply do nothing. - } - catch (Exception e) { - executionGraph.failGlobal( - new Exception("Error during fine grained recovery - triggering full recovery", e)); - } - }, - callbackExecutor); + Consumer restartProcedure = (ExecutionState executionState) -> { + performExecutionVertexRestart( + taskExecution.getVertex(), + taskExecution.getGlobalModVersion()); + }; + + terminationFuture.thenAccept(restartProcedure); Review comment: nit: this could be `terminationFuture.thenRun(() -> performExecutionVertexRestart(taskExecution.getVertex(), taskExecution.getGlobalModVersion()))` and then we would not need to create the `restartProcedure` consumer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread URL: https://github.com/apache/flink/pull/7568#discussion_r253483809 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestComponentMainThreadExecutor.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; + +/** + * An implementation of {@link org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor} for tests. + */ +public class TestComponentMainThreadExecutor extends ComponentMainThreadExecutorServiceAdapter { + + public TestComponentMainThreadExecutor( + @Nonnull ScheduledExecutorService scheduledExecutorService, + @Nonnull Thread mainThread) { + + super(scheduledExecutorService, () -> { + assert MainThreadValidatorUtil.isRunningInExpectedThread(mainThread); + }); + } + + public static TestComponentMainThreadExecutor forMainThread() { + final Thread main = Thread.currentThread(); + return new TestComponentMainThreadExecutor(new DirectScheduledExecutorService() { + @Override + public void execute(Runnable command) { + assert MainThreadValidatorUtil.isRunningInExpectedThread(main); + super.execute(command); + } + }, main); + } + + /** +* Creates a test executor that delegates to the given {@link ScheduledExecutorService}. The given executor must +* execute all submissions with the same thread. +*/ + public static TestComponentMainThreadExecutor forSingleThreadExecutor( + @Nonnull ScheduledExecutorService singleThreadExecutor) { + try { + Thread thread = CompletableFuture.supplyAsync(Thread::currentThread, singleThreadExecutor).get(); Review comment: If you use `join()` here, then you don't need the try-catch block. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services