[GitHub] 123avi commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-02-04 Thread GitBox
123avi commented on a change in pull request #7418: FLINK-11053 Documentation - 
update scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#discussion_r253757234
 
 

 ##
 File path: docs/dev/connectors/filesystem_sink.md
 ##
 @@ -117,11 +117,11 @@ input.addSink(sink);
 
 
 {% highlight scala %}
-val input: DataStream[Tuple2[IntWritable, Text]] = ...
+val input: DataStream[(IntWritable, Text)] = ???
 
-val sink = new BucketingSink[String]("/base/path")
-sink.setBucketer(new DateTimeBucketer[String]("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")))
-sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
+val sink = new BucketingSink[(IntWritable, Text)]("/base/path")
+sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"))
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on issue #7645: [FLINK-11521] Make RetryingRegistration configurable

2019-02-04 Thread GitBox
tillrohrmann commented on issue #7645: [FLINK-11521] Make RetryingRegistration 
configurable
URL: https://github.com/apache/flink/pull/7645#issuecomment-460535964
 
 
   Thanks for the review @GJL. Merging this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-460533675
 
 
   @pnowojski Hi, thank you very much for the review. I will address the new 
comments and update the PR ASAP. Probably, it's easier to be reviewed with two 
updates: submit an update first then try to rebase to the current master(as the 
current master is quite different from the previous one).
   
   Furthermore, looking forward to the merge plan. Let me know if anything else 
I can do. 
   Thank you again for the review. 
   
   Best, Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] 123avi commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-02-04 Thread GitBox
123avi commented on a change in pull request #7418: FLINK-11053 Documentation - 
update scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#discussion_r247153581
 
 

 ##
 File path: docs/dev/connectors/filesystem_sink.md
 ##
 @@ -105,8 +105,8 @@ Example:
 {% highlight java %}
 DataStream> input = ...;
 
-BucketingSink sink = new BucketingSink("/base/path");
-sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")));
+BucketingSink> sink = new 
BucketingSink>("/base/path");
+sink.setBucketer(new DateTimeBuckete("-MM-dd--HHmm", 
ZoneId.of("America/Los_Angeles")));
 
 Review comment:
   very well


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] 123avi commented on a change in pull request #7418: FLINK-11053 Documentation - update scala sample code for bucketing sink according

2019-02-04 Thread GitBox
123avi commented on a change in pull request #7418: FLINK-11053 Documentation - 
update scala sample code for bucketing sink according
URL: https://github.com/apache/flink/pull/7418#discussion_r247152403
 
 

 ##
 File path: docs/dev/connectors/filesystem_sink.md
 ##
 @@ -117,11 +117,12 @@ input.addSink(sink);
 
 
 {% highlight scala %}
-val input: DataStream[Tuple2[IntWritable, Text]] = ...
+import org.apache.flink.api.java.tuple.Tuple2
+val input: DataStream[Tuple2[A, B]] = ??? //we need to use java Tuple2 for the 
SequenceFileWriter
 
 Review comment:
   I changed the comment to clarify the usage. is that ok ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253748320
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+call.transformTo(getNewUpsertToRetraction(calc, upsertToRetraction))
+  }
+
+  private def getNewUpsertToRetraction(
+calc: FlinkLogicalCalc,
+upsertToRetraction: FlinkLogicalUpsertToRetraction): 
FlinkLogicalUpsertToRetraction = {
+
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+calc.getRowType.getFieldNames
+  .flatMap(calc.getInputFromOutputName(calc, _))
 
 Review comment:
   The `flatMap` unwraps `Option` returned from the `getInputFromOutputName`. 
If `Option` is empty, the result will be an empty list. :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253745265
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/UpsertToRetractionProcessFunction.scala
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Function used to convert upsert to retractions.
+  *
+  * @param rowTypeInfo the output row type info.
+  * @param queryConfig the configuration for the query.
+  */
+class UpsertToRetractionProcessFunction(
 
 Review comment:
   A good question! We can't use `LAST_VALUE` aggregate function here. An 
aggregate function ignores null input according to the SQL standard. 
Considering we have the following two inputs:`(key, value1, null)` and then 
`(key, null, value2)`, the result will be `(key, value1, value2)` if we use 
LAST_VALUE, while the result should be `(key, null, value2)`, i.e., the last 
row of the input. That's why I named the operator `LastRow` before.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Myasuka commented on issue #7639: [FLINK-9920] Only check part files in BucketingSinkFaultToleranceITCase

2019-02-04 Thread GitBox
Myasuka commented on issue #7639: [FLINK-9920] Only check part files in 
BucketingSinkFaultToleranceITCase
URL: https://github.com/apache/flink/pull/7639#issuecomment-460522078
 
 
   Sorry for not replying late, we just had the Chinese new year.
   
   LGTM


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11447) Deprecate "new Table(TableEnvironment, String)"

2019-02-04 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760474#comment-16760474
 ] 

Dian Fu commented on FLINK-11447:
-

[~twalthr] Thanks a lot for offering the help. I have uploaded the PR and also 
updated the documentation and the java docs according to your suggestions. 
Looking forward to your feedback. I'll keep updating the PR  as soon as 
possible to make sure it can be committed in this week. :)

> Deprecate "new Table(TableEnvironment, String)"
> ---
>
> Key: FLINK-11447
> URL: https://issues.apache.org/jira/browse/FLINK-11447
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Dian Fu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> A more detailed description can be found in 
> [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions].
> Once table is an interface we can easily replace the underlying 
> implementation at any time. The constructor call prevents us from converting 
> it into an interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11523) TestHarness should provide way to configure TypeSerailizers for side output

2019-02-04 Thread Alexey Trenikhin (JIRA)
Alexey Trenikhin created FLINK-11523:


 Summary: TestHarness should provide way to configure 
TypeSerailizers for side output
 Key: FLINK-11523
 URL: https://issues.apache.org/jira/browse/FLINK-11523
 Project: Flink
  Issue Type: Improvement
Reporter: Alexey Trenikhin


Currently AbstractStreamOperatorTestHarness allows to configure TypeSerializer 
for main output:

{code:java}
public void setup(TypeSerializer outputSerializer) 
{code}

It should be possible to specify TypeSerializers for side outputs as well (or 
alternatively  is to use TypeInformation from OutputTags)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flinkbot edited a comment on issue #7650: [FLINK-11522][table] Deprecate ExternalCatalogTable.builder()

2019-02-04 Thread GitBox
flinkbot edited a comment on issue #7650: [FLINK-11522][table] Deprecate 
ExternalCatalogTable.builder()
URL: https://github.com/apache/flink/pull/7650#issuecomment-460234012
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❌ 1. The [description] looks good.
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The [architecture] is sound.
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on issue #7650: [FLINK-11522][table] Deprecate ExternalCatalogTable.builder()

2019-02-04 Thread GitBox
dianfu commented on issue #7650: [FLINK-11522][table] Deprecate 
ExternalCatalogTable.builder()
URL: https://github.com/apache/flink/pull/7650#issuecomment-460503074
 
 
   @twalthr  Could you help to take a look at this quick PR? Thanks a lot.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String)

2019-02-04 Thread GitBox
dianfu commented on issue #7651: [FLINK-11447][table] Deprecate new 
Table(TableEnvironment, String)
URL: https://github.com/apache/flink/pull/7651#issuecomment-460502918
 
 
   @twalthr  Thanks a lot for the remind. I have updated the documentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253721880
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -19,22 +19,138 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, Types}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
-import org.apache.flink.table.runtime.CRowOutputProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import 
org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, 
JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner}
 
 trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
+  protected def convertUpsertToInternalRow(
 
 Review comment:
   1)The type of an upsert stream should be scala or java tuple, while we don't 
have this constraint on append stream.
   2&3)Good suggestions. I will try to refactor this making less duplicating 
logic and more smaller methods.
   4)The else branch for java tuple2 hasn't been converted by the tests. I will 
add one. Thank you.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253714414
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -19,22 +19,138 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, Types}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
-import org.apache.flink.table.runtime.CRowOutputProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import 
org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, 
JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner}
 
 trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
+  protected def convertUpsertToInternalRow(
+  schema: RowSchema,
+  input: DataStream[Any],
+  fieldIdxs: Array[Int],
+  config: TableConfig,
+  rowtimeExpression: Option[RexNode]): DataStream[CRow] = {
+
+val internalType = schema.typeInfo
+val cRowType = CRowTypeInfo(internalType)
+
+val hasTimeIndicator = fieldIdxs.exists(f =>
+  f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER ||
+f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER)
+
+val dsType = input.getType
+
+dsType match {
+// Scala tuple
+  case t: CaseClassTypeInfo[_]
+if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == 
Types.BOOLEAN =>
+
+val inputType = t.getTypeAt[Any](1)
+if (inputType == internalType && !hasTimeIndicator) {
+  // input is already of correct type. Only need to wrap it as CRow
+  input.asInstanceOf[DataStream[(Boolean, Row)]]
+.map(new RichMapFunction[(Boolean, Row), CRow] {
+  @transient private var outCRow: CRow = _
+  override def open(parameters: Configuration): Unit = {
+outCRow = new CRow(null, change = true)
+  }
+
+  override def map(v: (Boolean, Row)): CRow = {
+outCRow.row = v._2
+outCRow.change = v._1
+outCRow
+  }
+}).returns(cRowType)
+
+} else {
+  // input needs to be converted and wrapped as CRow or time 
indicators need to be generated
+
+  val function = generateConversionProcessFunction(
+config,
+inputType.asInstanceOf[TypeInformation[Any]],
+internalType,
+"UpsertStreamSourceConversion",
+schema.fieldNames,
+fieldIdxs,
+rowtimeExpression
+  )
+
+  val processFunc = new ScalaTupleToCRowProcessRunner(
+function.name,
+function.code,
+cRowType)
+
+  val opName = s"from: (${schema.fieldNames.mkString(", ")})"
+
+  input
+.asInstanceOf[DataStream[(Boolean, Any)]]
+.process(processFunc).name(opName).returns(cRowType)
+}
+
+  // Java tuple
+  case t: TupleTypeInfo[_]
+if t.getTypeClass == classOf[JTuple2[_, _]] && t.getTypeAt(0) == 
Types.BOOLEAN =>
+
+val inputType = t.getTypeAt[Any](1)
+if (inputType == internalType && !hasTimeIndicator) {
+  // input is already of correct type. Only need to wrap it as CRow
+  input.asInstanceOf[DataStream[JTuple2[JBool, Row]]]
+.map(new RichMapFunction[JTuple2[JBool, Row], CRow] {
+  @transient private var outCRow: CRow = _
+  override def open(parameters: Configuration): Unit = {
+outCRow = new CRow(null, change = true)
+  }
+
+  override def map(v: JTuple2[JBool, Row]): CRow = {
+outCRow.row = v.f1
+outCRow.change = v.f0
+outCRow
+  }
+}).returns(cRowType)
+
+} else {
+  // input needs to be converted and wrapped as CRow or time 
indicators n

[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253710139
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -426,7 +426,7 @@ abstract class StreamTableEnvironment(
 converterFunction match {
 
   case Some(func) =>
-new CRowMapRunner[OUT](func.name, func.code, func.returnType)
+new CRowToExternalTypeMapRunner[OUT](func.name, func.code, 
func.returnType)
 
 Review comment:
   Make sense, I will revert these changes. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253707232
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
 
 Review comment:
   Thank you very much for looking into this. I will also keep an eye on the 
Calcite dev mailing list. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2019-02-04 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760284#comment-16760284
 ] 

Ying Xu commented on FLINK-4582:


Thanks [~tinder-dthomson] .   Internally we use the *new ObjectMapper()* 
initialization but didn't observe similar issue.  But we may run with an older 
Flink distribution.

Yes please report this as a separate bug and perhaps attach full stack trace.  
If you already have a fix, feel free to post the PR as well. 

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flinkbot edited a comment on issue #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-02-04 Thread GitBox
flinkbot edited a comment on issue #7643: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/7643#issuecomment-459849424
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @twalthr [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @twalthr [PMC]
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The [architecture] is sound.
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-02-04 Thread GitBox
bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/7643#discussion_r253674313
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/catalog/exceptions/TableAlreadyExistException.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog.exceptions;
+
+import org.apache.flink.table.api.catalog.ObjectPath;
+
+/**
+ * Exception for trying to create a table that already exists.
 
 Review comment:
   "... table **or view** that "


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-02-04 Thread GitBox
bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/7643#discussion_r253673128
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/catalog/ReadableWritableCatalog.java
 ##
 @@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import org.apache.flink.table.api.TableAlreadyExistException;
+import org.apache.flink.table.api.TableNotExistException;
+import 
org.apache.flink.table.api.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.api.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.plan.stats.TableStats;
+
+/**
+ * An interface responsible for manipulating catalog metadata.
+ */
+public interface ReadableWritableCatalog extends ReadableCatalog {
+
+   // -- databases --
+
+   /**
+* Adds a database to this catalog.
+*
+* @param dbNameThe name of the database to add.
+* @param databaseThe database to add.
+* @param ignoreIfExists Flag to specify behavior if a database with 
the given name already
+*   exists: if set to false, it throws a 
DatabaseAlreadyExistException,
+*   if set to true, nothing happens.
+* @throws DatabaseAlreadyExistException
+*   thrown if the database does already exist in 
the catalog
+*   and ignoreIfExists is false
+*/
+   void createDatabase(String dbName, CatalogDatabase database, boolean 
ignoreIfExists)
+   throws DatabaseAlreadyExistException;
+
+   /**
+* Deletes a database from this catalog.
+*
+* @param dbNameName of the database to delete.
+* @param ignoreIfNotExists Flag to specify behavior if the database 
does not exist:
+*  if set to false, throw an exception,
+*  if set to true, nothing happens.
+* @throws DatabaseNotExistException thrown if the database does not 
exist in the catalog
+*/
+   void dropDatabase(String dbName, boolean ignoreIfNotExists) throws 
DatabaseNotExistException;
+
+   /**
+* Modifies an existing database.
+*
+* @param dbNameName of the database to modify.
+* @param newDatabase   The new database to replace the 
existing database.
+* @param ignoreIfNotExists Flag to specify behavior if the database 
does not exist:
+*  if set to false, throw an exception,
+*  if set to true, nothing happens.
+* @throws DatabaseNotExistException thrown if the database does not 
exist in the catalog
+*/
+   void alterDatabase(String dbName, CatalogDatabase newDatabase, boolean 
ignoreIfNotExists)
+   throws DatabaseNotExistException;
+
+   /**
+* Renames an existing database.
+*
+* @param dbNameName of the database to modify.
+* @param newDbName New name of the database.
+* @param ignoreIfNotExists Flag to specify behavior if the database 
does not exist:
+*  if set to false, throw an exception,
+*  if set to true, nothing happens.
+* @throws DatabaseNotExistException thrown if the database does not 
exist in the catalog
+*/
+   void renameDatabase(String dbName, String newDbName, boolean 
ignoreIfNotExists)
 
 Review comment:
   does it need to throw a 'DatabaseAlreadyExistsException' when the 
'newDbName' is already taken?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-02-04 Thread GitBox
bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/7643#discussion_r253673629
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/catalog/ReadableWritableCatalog.java
 ##
 @@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import org.apache.flink.table.api.TableAlreadyExistException;
+import org.apache.flink.table.api.TableNotExistException;
+import 
org.apache.flink.table.api.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.api.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.plan.stats.TableStats;
+
+/**
+ * An interface responsible for manipulating catalog metadata.
+ */
+public interface ReadableWritableCatalog extends ReadableCatalog {
+
+   // -- databases --
+
+   /**
+* Adds a database to this catalog.
+*
+* @param dbNameThe name of the database to add.
+* @param databaseThe database to add.
+* @param ignoreIfExists Flag to specify behavior if a database with 
the given name already
+*   exists: if set to false, it throws a 
DatabaseAlreadyExistException,
+*   if set to true, nothing happens.
+* @throws DatabaseAlreadyExistException
+*   thrown if the database does already exist in 
the catalog
+*   and ignoreIfExists is false
+*/
+   void createDatabase(String dbName, CatalogDatabase database, boolean 
ignoreIfExists)
+   throws DatabaseAlreadyExistException;
+
+   /**
+* Deletes a database from this catalog.
+*
+* @param dbNameName of the database to delete.
+* @param ignoreIfNotExists Flag to specify behavior if the database 
does not exist:
+*  if set to false, throw an exception,
+*  if set to true, nothing happens.
+* @throws DatabaseNotExistException thrown if the database does not 
exist in the catalog
+*/
+   void dropDatabase(String dbName, boolean ignoreIfNotExists) throws 
DatabaseNotExistException;
+
+   /**
+* Modifies an existing database.
+*
+* @param dbNameName of the database to modify.
+* @param newDatabase   The new database to replace the 
existing database.
+* @param ignoreIfNotExists Flag to specify behavior if the database 
does not exist:
+*  if set to false, throw an exception,
+*  if set to true, nothing happens.
+* @throws DatabaseNotExistException thrown if the database does not 
exist in the catalog
+*/
+   void alterDatabase(String dbName, CatalogDatabase newDatabase, boolean 
ignoreIfNotExists)
+   throws DatabaseNotExistException;
+
+   /**
+* Renames an existing database.
+*
+* @param dbNameName of the database to modify.
+* @param newDbName New name of the database.
+* @param ignoreIfNotExists Flag to specify behavior if the database 
does not exist:
+*  if set to false, throw an exception,
+*  if set to true, nothing happens.
+* @throws DatabaseNotExistException thrown if the database does not 
exist in the catalog
+*/
+   void renameDatabase(String dbName, String newDbName, boolean 
ignoreIfNotExists)
+   throws DatabaseNotExistException;
+
+   // -- tables and views --
+
+   /**
+* Deletes a table or view.
+*
+* @param objectName Path of the table or view to delete.
+* @param ignoreIfNotExists Flag to specify behavior if the table or 
view does not exist:
+*  if set to false, throw an exception,
+*  if set to true, nothing happens.
+* @

[GitHub] bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-02-04 Thread GitBox
bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/7643#discussion_r253674435
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/catalog/exceptions/TableNotExistException.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog.exceptions;
+
+import org.apache.flink.table.api.catalog.ObjectPath;
+
+/**
+ * Exception for trying to operate on a table that doesn't exist.
+ */
+public class TableNotExistException extends RuntimeException {
+
+   private static final String MSG = "Table %s in catalog %s does not 
exist.";
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-02-04 Thread GitBox
bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/7643#discussion_r253674164
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/catalog/exceptions/TableAlreadyExistException.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog.exceptions;
+
+import org.apache.flink.table.api.catalog.ObjectPath;
+
+/**
+ * Exception for trying to create a table that already exists.
+ */
+public class TableAlreadyExistException extends RuntimeException {
+
+   private static final String MSG = "Table $catalog.$table already 
exists.";
 
 Review comment:
   since this is shared between tables and views, shall the message be "Table 
**or view** ... already exists."?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-02-04 Thread GitBox
bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/7643#discussion_r253674392
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/catalog/exceptions/TableNotExistException.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog.exceptions;
+
+import org.apache.flink.table.api.catalog.ObjectPath;
+
+/**
+ * Exception for trying to operate on a table that doesn't exist.
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-02-04 Thread GitBox
bowenli86 commented on a change in pull request #7643: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/7643#discussion_r253671017
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/catalog/CatalogView.java
 ##
 @@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+/**
+ * Represents a view in a catalog.
+ */
+public interface CatalogView extends CommonTable {
+
+   /**
+* Original text of the view definition.
+*
+* @return the original string literal provided by the user.
+*/
+   String getOriginalQuery();
+
+   /**
+* Expanded text of the original view definition
+* This is needed because the context such as current DB is
+* lost after the session, in which view is defined, is gone.
+* Expanded query text takes care of the this, as an example.
 
 Review comment:
   "... takes care of ~~the~~ this ..."
   
   shall we also add an example?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2019-02-04 Thread Thomas Weise (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760263#comment-16760263
 ] 

Thomas Weise commented on FLINK-4582:
-

[~tinder-dthomson] Thanks for the report and yes, please track it as a separate 
issue and mark the fix version as 1.8 so that we get it resolved as part of 1.8.

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2019-02-04 Thread Devin Thomson (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760250#comment-16760250
 ] 

Devin Thomson commented on FLINK-4582:
--

[~yxu-lyft] circling back here, looks like this got merged in, congrats!

 

I've been preemptively cutting over from my implementation to this one and 
noticed one (blocking) bug:

 
{code:java}
j.l.IllegalArgumentException: Conflicting setter definitions for property 
\"eventName\": 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params) vs 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params)
at c.f.j.d.i.POJOPropertyBuilder.getSetter(POJOPropertyBuilder.java:300)
at 
c.f.j.d.d.BeanDeserializerFactory.filterBeanProps(BeanDeserializerFactory.java:619)
at 
c.f.j.d.d.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:515)
at 
c.f.j.d.d.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:256)
at 
c.f.j.d.d.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:169)
at c.f.j.d.d.DeserializerCache._createDeserializer2(DeserializerCache.java:403)
at c.f.j.d.d.DeserializerCache._createDeserializer(DeserializerCache.java:352)
at c.f.j.d.d.DeserializerCache._createAndCache2(DeserializerCache.java:264)
... 15 common frames omitted
Wrapped by: c.f.j.d.JsonMappingException: Conflicting setter definitions for 
property \"eventName\": 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params...
{code}
 

 

It looks like the root cause is that DynamoDBStreamsSchema.java defines the 
object mapper as follows:
{code:java}
private static final ObjectMapper MAPPER = new ObjectMapper();
{code}
When it should be using the appropriate mix-ins offered by the dynamodb stream 
adapter library:
{code:java}
private static final ObjectMapper MAPPER = new RecordObjectMapper();
{code}
This appears to resolve the issue, I tested by using my own deserializer 
implementation. Not sure if it makes sense to track this as a separate issue or 
not since this is still a 1.8-SNAPSHOT feature.

 

Let me know if you have any questions!

- Devin

 

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2019-02-04 Thread Devin Thomson (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760250#comment-16760250
 ] 

Devin Thomson edited comment on FLINK-4582 at 2/4/19 9:45 PM:
--

[~yxu-lyft] circling back here, looks like this got merged in, congrats!

 

I've been preemptively cutting over from my implementation to this one and 
noticed one (blocking) bug:

 
{code:java}
j.l.IllegalArgumentException: Conflicting setter definitions for property 
\"eventName\": 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params) vs 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params)
at c.f.j.d.i.POJOPropertyBuilder.getSetter(POJOPropertyBuilder.java:300)
at 
c.f.j.d.d.BeanDeserializerFactory.filterBeanProps(BeanDeserializerFactory.java:619)
at 
c.f.j.d.d.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:515)
at 
c.f.j.d.d.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:256)
at 
c.f.j.d.d.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:169)
at c.f.j.d.d.DeserializerCache._createDeserializer2(DeserializerCache.java:403)
at c.f.j.d.d.DeserializerCache._createDeserializer(DeserializerCache.java:352)
at c.f.j.d.d.DeserializerCache._createAndCache2(DeserializerCache.java:264)
... 15 common frames omitted
Wrapped by: c.f.j.d.JsonMappingException: Conflicting setter definitions for 
property \"eventName\": 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params...
{code}
 

 

It looks like the root cause is that DynamoDBStreamsSchema.java defines the 
object mapper as follows:
{code:java}
private static final ObjectMapper MAPPER = new ObjectMapper();
{code}
When it should be using the appropriate mix-ins offered by the dynamodb stream 
adapter library:
{code:java}
private static final ObjectMapper MAPPER = new RecordObjectMapper();
{code}
This appears to resolve the issue, I tested by using my own deserializer 
implementation. Not sure if it makes sense to track this as a separate issue or 
not since this is still a 1.8-SNAPSHOT feature.

 

Let me know if you have any questions!

Devin

 


was (Author: tinder-dthomson):
[~yxu-lyft] circling back here, looks like this got merged in, congrats!

 

I've been preemptively cutting over from my implementation to this one and 
noticed one (blocking) bug:

 
{code:java}
j.l.IllegalArgumentException: Conflicting setter definitions for property 
\"eventName\": 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params) vs 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params)
at c.f.j.d.i.POJOPropertyBuilder.getSetter(POJOPropertyBuilder.java:300)
at 
c.f.j.d.d.BeanDeserializerFactory.filterBeanProps(BeanDeserializerFactory.java:619)
at 
c.f.j.d.d.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:515)
at 
c.f.j.d.d.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:256)
at 
c.f.j.d.d.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:169)
at c.f.j.d.d.DeserializerCache._createDeserializer2(DeserializerCache.java:403)
at c.f.j.d.d.DeserializerCache._createDeserializer(DeserializerCache.java:352)
at c.f.j.d.d.DeserializerCache._createAndCache2(DeserializerCache.java:264)
... 15 common frames omitted
Wrapped by: c.f.j.d.JsonMappingException: Conflicting setter definitions for 
property \"eventName\": 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params...
{code}
 

 

It looks like the root cause is that DynamoDBStreamsSchema.java defines the 
object mapper as follows:
{code:java}
private static final ObjectMapper MAPPER = new ObjectMapper();
{code}
When it should be using the appropriate mix-ins offered by the dynamodb stream 
adapter library:
{code:java}
private static final ObjectMapper MAPPER = new RecordObjectMapper();
{code}
This appears to resolve the issue, I tested by using my own deserializer 
implementation. Not sure if it makes sense to track this as a separate issue or 
not since this is still a 1.8-SNAPSHOT feature.

 

Let me know if you have any questions!

- Devin

 

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Rema

[GitHub] GJL removed a comment on issue #7645: [FLINK-11521] Make RetryingRegistration configurable

2019-02-04 Thread GitBox
GJL removed a comment on issue #7645: [FLINK-11521] Make RetryingRegistration 
configurable
URL: https://github.com/apache/flink/pull/7645#issuecomment-460333822
 
 
   Thanks @flinkbot 🤦‍♂️ 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…

2019-02-04 Thread GitBox
flinkbot commented on issue #7632: [FLINK-11362] Port 
TaskManagerComponentsStartupShutdownTest to new co…
URL: https://github.com/apache/flink/pull/7632#issuecomment-460347329
 
 
   This PR should now have its status reflected properly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot edited a comment on issue #7645: [FLINK-11521] Make RetryingRegistration configurable

2019-02-04 Thread GitBox
flinkbot edited a comment on issue #7645: [FLINK-11521] Make 
RetryingRegistration configurable
URL: https://github.com/apache/flink/pull/7645#issuecomment-459969990
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @GJL [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @GJL [committer]
   * ❔ 3. Needs [attention] from.
   * ✅ 4. The [architecture] is sound.
   - Approved by @GJL [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @GJL [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot edited a comment on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String)

2019-02-04 Thread GitBox
flinkbot edited a comment on issue #7651: [FLINK-11447][table] Deprecate new 
Table(TableEnvironment, String)
URL: https://github.com/apache/flink/pull/7651#issuecomment-460316970
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @twalthr [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @twalthr [PMC]
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The [architecture] is sound.
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot edited a comment on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…

2019-02-04 Thread GitBox
flinkbot edited a comment on issue #7632: [FLINK-11362] Port 
TaskManagerComponentsStartupShutdownTest to new co…
URL: https://github.com/apache/flink/pull/7632#issuecomment-459738321
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @GJL [committer], @rmetzger [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @GJL [committer]
   * ❔ 3. Needs [attention] from.
   * ✅ 4. The [architecture] is sound.
   - Approved by @GJL [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @GJL [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253463228
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -426,7 +426,7 @@ abstract class StreamTableEnvironment(
 converterFunction match {
 
   case Some(func) =>
-new CRowMapRunner[OUT](func.name, func.code, func.returnType)
+new CRowToExternalTypeMapRunner[OUT](func.name, func.code, 
func.returnType)
 
 Review comment:
   As a general rule please do not mix simple refactorings like class renames 
and moving classes around with functional changes. Embedding those two things 
inside one commit makes sense only if the refactoring/rename is tightly coupled 
with the functional change and here it doesn't seem so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7645: [FLINK-11521] Make RetryingRegistration configurable

2019-02-04 Thread GitBox
GJL commented on issue #7645: [FLINK-11521] Make RetryingRegistration 
configurable
URL: https://github.com/apache/flink/pull/7645#issuecomment-460332966
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7645: [FLINK-11521] Make RetryingRegistration configurable

2019-02-04 Thread GitBox
GJL commented on issue #7645: [FLINK-11521] Make RetryingRegistration 
configurable
URL: https://github.com/apache/flink/pull/7645#issuecomment-460333822
 
 
   Thanks @flinkbot 🤦‍♂️ 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253559564
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
 
 Review comment:
   I'm asking on Calcite dev mailing list, trying to understand those methods. 
Until we get a clear response from them we can leave this as it is now. 
`Aggregate` node (which is almost identical from the semantics perspective as 
this one) also doesn't implement those methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253559564
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUpsertToRetraction.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalUpsertToRetraction(
 
 Review comment:
   I'm syncing with Calcite dev mailing list, trying to understand those 
methods. Until we get a clear response from them lets leave this as it is now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253554512
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
 ##
 @@ -19,18 +19,49 @@
 package org.apache.flink.table.plan.nodes
 
 import org.apache.calcite.plan.{RelOptCost, RelOptPlanner}
+import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.metadata.RelMdUtil
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
 import org.apache.flink.api.common.functions.Function
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
 
 trait CommonCalc {
 
+  /**
+* Returns empty if output field is not forwarded from the input for the 
calc.
+*/
+  private[flink] def getInputFromOutputName(calc: Calc, outputFieldName: 
String): Option[String] = {
 
 Review comment:
   Could you add a simple unit test for this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253551875
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/UpsertToRetractionProcessFunction.scala
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Function used to convert upsert to retractions.
+  *
+  * @param rowTypeInfo the output row type info.
+  * @param queryConfig the configuration for the query.
+  */
+class UpsertToRetractionProcessFunction(
 
 Review comment:
   Why not using `GroupAggProcessFunction` with `LAST_VALUE()` aggregate 
function? It would allow us to share the logic and adding `LAST_VALUE()` is 
valuable addition to Flink SQL on it's own.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253546583
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -19,22 +19,138 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, Types}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
-import org.apache.flink.table.runtime.CRowOutputProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import 
org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, 
JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner}
 
 trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
+  protected def convertUpsertToInternalRow(
 
 Review comment:
   Couple of remarks regarding this method:
   1. Why does it have this `switch/case` for scala/java tuples while the 
`convertToInternalRow` doesn't?
   2. I'm not sure, but It looks like it's duplicating logic from 
`convertToInternalRow`, maybe they should be deduplicated into one method and 
passing the `upsert/append` mode as a parameter for a couple of `if` statements?
   3. Please split it into smaller methods and you could do the same for 
`convertToInternalRow`. More or less were ever there is a comment like `// 
Scala tuple` or `// input is already of correct type. Only need to wrap it as 
CRow` extract the matching piece of code to a named method like 
`convertScalaTuple(...)` or `wrapAsCRow(...)`
   4. are all of the branches covered by the tests? Like the both branches of 
`if` under the java tuple?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253465887
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -19,22 +19,138 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, Types}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
-import org.apache.flink.table.runtime.CRowOutputProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import 
org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, 
JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner}
 
 trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
+  protected def convertUpsertToInternalRow(
+  schema: RowSchema,
+  input: DataStream[Any],
+  fieldIdxs: Array[Int],
+  config: TableConfig,
+  rowtimeExpression: Option[RexNode]): DataStream[CRow] = {
+
+val internalType = schema.typeInfo
+val cRowType = CRowTypeInfo(internalType)
+
+val hasTimeIndicator = fieldIdxs.exists(f =>
+  f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER ||
+f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER)
+
+val dsType = input.getType
+
+dsType match {
+// Scala tuple
+  case t: CaseClassTypeInfo[_]
+if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == 
Types.BOOLEAN =>
+
+val inputType = t.getTypeAt[Any](1)
+if (inputType == internalType && !hasTimeIndicator) {
+  // input is already of correct type. Only need to wrap it as CRow
+  input.asInstanceOf[DataStream[(Boolean, Row)]]
+.map(new RichMapFunction[(Boolean, Row), CRow] {
+  @transient private var outCRow: CRow = _
+  override def open(parameters: Configuration): Unit = {
+outCRow = new CRow(null, change = true)
+  }
+
+  override def map(v: (Boolean, Row)): CRow = {
+outCRow.row = v._2
+outCRow.change = v._1
+outCRow
+  }
+}).returns(cRowType)
+
+} else {
+  // input needs to be converted and wrapped as CRow or time 
indicators need to be generated
+
+  val function = generateConversionProcessFunction(
+config,
+inputType.asInstanceOf[TypeInformation[Any]],
+internalType,
+"UpsertStreamSourceConversion",
+schema.fieldNames,
+fieldIdxs,
+rowtimeExpression
+  )
+
+  val processFunc = new ScalaTupleToCRowProcessRunner(
+function.name,
+function.code,
+cRowType)
+
+  val opName = s"from: (${schema.fieldNames.mkString(", ")})"
+
+  input
+.asInstanceOf[DataStream[(Boolean, Any)]]
+.process(processFunc).name(opName).returns(cRowType)
+}
+
+  // Java tuple
+  case t: TupleTypeInfo[_]
+if t.getTypeClass == classOf[JTuple2[_, _]] && t.getTypeAt(0) == 
Types.BOOLEAN =>
+
+val inputType = t.getTypeAt[Any](1)
+if (inputType == internalType && !hasTimeIndicator) {
+  // input is already of correct type. Only need to wrap it as CRow
+  input.asInstanceOf[DataStream[JTuple2[JBool, Row]]]
+.map(new RichMapFunction[JTuple2[JBool, Row], CRow] {
+  @transient private var outCRow: CRow = _
+  override def open(parameters: Configuration): Unit = {
+outCRow = new CRow(null, change = true)
+  }
+
+  override def map(v: JTuple2[JBool, Row]): CRow = {
+outCRow.row = v.f1
+outCRow.change = v.f0
+outCRow
+  }
+}).returns(cRowType)
+
+} else {
+  // input needs to be converted and wrapped as CRow or time 
indicators n

[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253468143
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -19,22 +19,138 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, Types}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
-import org.apache.flink.table.runtime.CRowOutputProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import 
org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, 
JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner}
 
 trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
+  protected def convertUpsertToInternalRow(
+  schema: RowSchema,
+  input: DataStream[Any],
+  fieldIdxs: Array[Int],
+  config: TableConfig,
+  rowtimeExpression: Option[RexNode]): DataStream[CRow] = {
+
+val internalType = schema.typeInfo
+val cRowType = CRowTypeInfo(internalType)
+
+val hasTimeIndicator = fieldIdxs.exists(f =>
 
 Review comment:
   rename `f` -> `fieldIndex` `fieldIdxs` -> `fieldIndexes`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253463228
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -426,7 +426,7 @@ abstract class StreamTableEnvironment(
 converterFunction match {
 
   case Some(func) =>
-new CRowMapRunner[OUT](func.name, func.code, func.returnType)
+new CRowToExternalTypeMapRunner[OUT](func.name, func.code, 
func.returnType)
 
 Review comment:
   Please, as a general rule do not mix simple refactorings like class renames 
and moving classes around with functional changes. Embedding those two things 
inside one commit makes sense only if the refactoring/rename is tightly coupled 
with the functional change and here it doesn't seem so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2019-02-04 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r253557485
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/CalcUpsertToRetractionTransposeRule.scala
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalUpsertToRetraction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Use this rule to transpose Calc through UpsertToRetraction relnode. It is 
beneficial if we get
+  * smaller state size in upsertToRetraction.
+  */
+class CalcUpsertToRetractionTransposeRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc],
+operand(classOf[FlinkLogicalUpsertToRetraction], none)),
+  "CalcUpsertToRetractionTransposeRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+
+fieldsRemainAfterCalc(upsertToRetraction.keyNames, calc)
+  }
+
+  override def onMatch(call: RelOptRuleCall) {
+val calc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+val upsertToRetraction = 
call.rel(1).asInstanceOf[FlinkLogicalUpsertToRetraction]
+call.transformTo(getNewUpsertToRetraction(calc, upsertToRetraction))
+  }
+
+  private def getNewUpsertToRetraction(
+calc: FlinkLogicalCalc,
+upsertToRetraction: FlinkLogicalUpsertToRetraction): 
FlinkLogicalUpsertToRetraction = {
+
+val newCalc = calc.copy(calc.getTraitSet, upsertToRetraction.getInput, 
calc.getProgram)
+
+val oldKeyNames = upsertToRetraction.keyNames
+val newKeyNames = getNamesAfterCalc(oldKeyNames, calc)
+new FlinkLogicalUpsertToRetraction(
+  upsertToRetraction.getCluster,
+  upsertToRetraction.getTraitSet,
+  newCalc,
+  newKeyNames)
+  }
+
+  private def fieldsRemainAfterCalc(fields: Seq[String], calc: 
FlinkLogicalCalc): Boolean = {
+calc.getRowType.getFieldNames
+  .flatMap(calc.getInputFromOutputName(calc, _))
 
 Review comment:
   [newbie scala question]
   How does it work? `flatMap` here is unwrapping `Option` returned from 
`getInputFromOutputName`? If `Option` is empty, the result list will have null, 
will it be empty or will it throw an exception? (two first options are 
acceptable ;) )
   [/newbie scala question]


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String)

2019-02-04 Thread GitBox
twalthr commented on issue #7651: [FLINK-11447][table] Deprecate new 
Table(TableEnvironment, String)
URL: https://github.com/apache/flink/pull/7651#issuecomment-460321719
 
 
   @flinkbot approve description
   @flinkbot approve consensus


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu opened a new pull request #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String)

2019-02-04 Thread GitBox
dianfu opened a new pull request #7651: [FLINK-11447][table] Deprecate new 
Table(TableEnvironment, String)
URL: https://github.com/apache/flink/pull/7651
 
 
   ## What is the purpose of the change
   
   *This pull request deprecates the constructor new Table(TableEnvironment, 
String)*
   
   
   ## Brief change log
   
 - *Deprecates new Table(TableEnvironment, String)*
 - *Introduces Table.lateralJoin, Table.leftOuterLateralJoin*
 - *Move the implicit of 'TableFunction to Table' to ExpressionDsl as the 
otherwise this implicit always has the highest priority*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added tests 
CorrelateStringExpressionTest.testCorrelateJoinsWithJoinLateral*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (yes)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs & JavaDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11447) Deprecate "new Table(TableEnvironment, String)"

2019-02-04 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-11447:
-
Priority: Blocker  (was: Major)

> Deprecate "new Table(TableEnvironment, String)"
> ---
>
> Key: FLINK-11447
> URL: https://issues.apache.org/jira/browse/FLINK-11447
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Dian Fu
>Priority: Blocker
>
> A more detailed description can be found in 
> [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions].
> Once table is an interface we can easily replace the underlying 
> implementation at any time. The constructor call prevents us from converting 
> it into an interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flinkbot commented on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String)

2019-02-04 Thread GitBox
flinkbot commented on issue #7651: [FLINK-11447][table] Deprecate new 
Table(TableEnvironment, String)
URL: https://github.com/apache/flink/pull/7651#issuecomment-460316970
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❌ 1. The [description] looks good.
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The [architecture] is sound.
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10644) Batch Job: Speculative execution

2019-02-04 Thread Greg Hogan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760004#comment-16760004
 ] 

Greg Hogan commented on FLINK-10644:


I'm not so sure that speculative execution is a good fit for Apache Flink. In 
MapReduce there are two concepts conducive to speculative execution not present 
in Flink:

1) In MapReduce map/reduce/task to mapper/reducer/container ratio is often 10:1 
or higher. In Flink all tasks are immediately assigned and processed in 
parallel.

2) In MapReduce intermediate and output data is always persisted whereas in 
Flink only state is persisted (and only in streaming). Input is assumed to be 
replayable but speculative execution would presumably also work for 
intermediate tasks.

 

As noted, Spark has included speculative execution and the Spark processing 
model is closer to Flink's. I'm just not clear on the circumstances where it is 
beneficial to start a catch-up task so late. I haven't followed the work on 
unification of batch and streaming but it seems more valuable to focus on 
transition a task from a straggler machine rather than start that task over.

> Batch Job: Speculative execution
> 
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
>  Issue Type: New Feature
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: ryantaocer
>Priority: Major
> Fix For: 1.8.0
>
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on issue #7651: [FLINK-11447][table] Deprecate new Table(TableEnvironment, String)

2019-02-04 Thread GitBox
dianfu commented on issue #7651: [FLINK-11447][table] Deprecate new 
Table(TableEnvironment, String)
URL: https://github.com/apache/flink/pull/7651#issuecomment-460317427
 
 
   @twalthr @dawidwys Could you help to take a look at this PR when it's 
convenient for you? Thanks in advance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11449) Uncouple the Expression class from RexNodes

2019-02-04 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760010#comment-16760010
 ] 

Timo Walther commented on FLINK-11449:
--

Thanks for working on this [~sunjincheng121]. I think most of the changes 
should be clearly defined now. Feel free to start developing. I will summarize 
the approach again here:

In {{flink-table-api-common}}, because we can even put those expression classes 
into flink-table-common for FilterableTableSources in the future:
{code}
public class FunctionDefinition {
public String name;
}

public class ScalarFunctionDefinition extends FunctionDefinition {
// the name is ignored for now

public UserDefinedFunction f;
}

public class TableFunctionDefinition extends FunctionDefinition {
// the name is ignored for now

public UserDefinedFunction f;
}

public class AggregateFunctionDefinition extends FunctionDefinition {
// the name is ignored for now

public UserDefinedFunction f;
}

public class Expression {

abstract List getChildren();

abstract  R accept(ExpressionVisitor visitor);

// do we need more methods here?
}

public final class FunctionDefinitions {
// implement just a basic definition (only name) for know
// we can extend this functionality in the future with type inference 
logic etc.
public static final FunctionDefinition TRIM = new 
FunctionDefinition("trim");
public static final FunctionDefinition CAST = new 
FunctionDefinition("cast");

public static List getDefinitions() {
// use reflection for all definitions similar to Calcite's 
SqlStdOperatorTable
}
}

CallExpression(FunctionDefinition, Expression*) extends Expression
SymbolExpression(TableSymbol ???) extends Expression
FieldReferenceExpression(String) extends Expression
TypeLiteralExpression(TypeInformation) extends Expression
ValueLiteralExpression(Object) extends Expression
{code}

In {{flink-table-api-java}}:
{code}
TableReferenceExpression(String, Table) extends Expression
{code}

In {{flink-table-planner}}:
{code}
+ visitor pattern in the planner to translate (can be in Scala)
in Flink: to Flink's case classes

in Blink: to Blink's case classes

visit(call Call) match {
case TRIM => 
}
{code}

expressionDsl.scala and ExpressionParser.scala can still remain implemented in 
Scala.

> Uncouple the Expression class from RexNodes
> ---
>
> Key: FLINK-11449
> URL: https://issues.apache.org/jira/browse/FLINK-11449
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: sunjincheng
>Priority: Major
>
> A more detailed description can be found in 
> [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions].
> Calcite will not be part of any API module anymore. Therefore, RexNode 
> translation must happen in a different layer. This issue will require a new 
> design document.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11447) Deprecate "new Table(TableEnvironment, String)"

2019-02-04 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11447:
---
Labels: pull-request-available  (was: )

> Deprecate "new Table(TableEnvironment, String)"
> ---
>
> Key: FLINK-11447
> URL: https://issues.apache.org/jira/browse/FLINK-11447
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Dian Fu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> A more detailed description can be found in 
> [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions].
> Once table is an interface we can easily replace the underlying 
> implementation at any time. The constructor call prevents us from converting 
> it into an interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11447) Deprecate "new Table(TableEnvironment, String)"

2019-02-04 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16759990#comment-16759990
 ] 

Timo Walther commented on FLINK-11447:
--

Happy Chinese New Year everyone :)

[~dian.fu] thanks for working on this issue. It would be great to get this 
issue done as quickly as possible. I will be out of office next week, so it 
would be great to merge it even this week. Let me know if I can help. I can 
also implement it myself if you don't have capacities. I will make this issue a 
blocker for the next Flink release.

> Deprecate "new Table(TableEnvironment, String)"
> ---
>
> Key: FLINK-11447
> URL: https://issues.apache.org/jira/browse/FLINK-11447
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Dian Fu
>Priority: Major
>
> A more detailed description can be found in 
> [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions].
> Once table is an interface we can easily replace the underlying 
> implementation at any time. The constructor call prevents us from converting 
> it into an interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11447) Deprecate "new Table(TableEnvironment, String)"

2019-02-04 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-11447:
-
Fix Version/s: 1.8.0

> Deprecate "new Table(TableEnvironment, String)"
> ---
>
> Key: FLINK-11447
> URL: https://issues.apache.org/jira/browse/FLINK-11447
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Dian Fu
>Priority: Blocker
> Fix For: 1.8.0
>
>
> A more detailed description can be found in 
> [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions].
> Once table is an interface we can easily replace the underlying 
> implementation at any time. The constructor call prevents us from converting 
> it into an interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
StefanRRichter commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253534721
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 ##
 @@ -608,12 +569,11 @@ public void run() {
 
 
 Review comment:
   I think all wait related things can go away.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
asfgit closed pull request #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11352) Check and port JobManagerHACheckpointRecoveryITCase to new code base if necessary

2019-02-04 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-11352.
-
   Resolution: Fixed
Fix Version/s: 1.8.0

Fixed via 8d25e88ec7de962927de350b4702ec183699afb3

> Check and port JobManagerHACheckpointRecoveryITCase to new code base if 
> necessary
> -
>
> Key: FLINK-11352
> URL: https://issues.apache.org/jira/browse/FLINK-11352
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Check and port {{JobManagerHACheckpointRecoveryITCase}} to new code base if 
> necessary



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7647: 
[FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#discussion_r253529750
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
 ##
 @@ -220,6 +221,187 @@ public void flatMap(Long value, Collector out) 
throws Exception {
env.execute();
}
 
+   /**
+* Tests that the JobManager logs failures during recovery properly.
+*
+* @see https://issues.apache.org/jira/browse/FLINK-3185";>FLINK-3185
+*/
+   @Test
+   public void testJobManagerRecoveryFailureLog() throws Exception {
+   final Time timeout = Time.seconds(30L);
+   final File zookeeperStoragePath = temporaryFolder.newFolder();
+
+   // Config
+   final int numberOfJobManagers = 2;
+   final int numberOfTaskManagers = 2;
+   final int numberOfSlotsPerTaskManager = 2;
+
+   assertEquals(PARALLELISM, numberOfTaskManagers * 
numberOfSlotsPerTaskManager);
+
+   // Job managers
+   final DispatcherProcess[] dispatcherProcesses = new 
DispatcherProcess[numberOfJobManagers];
+
+   // Task managers
+   TaskManagerRunner[] taskManagerRunners = new 
TaskManagerRunner[numberOfTaskManagers];
+
+   HighAvailabilityServices highAvailabilityServices = null;
+
+   LeaderRetrievalService leaderRetrievalService = null;
+
+   // Coordination between the processes goes through a directory
+   File coordinateTempDir = null;
+
+   // Cluster config
+   Configuration config = 
ZooKeeperTestUtils.createZooKeeperHAConfig(
+   zooKeeper.getConnectString(), 
zookeeperStoragePath.getPath());
+   // Task manager configuration
+   config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+   config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+   config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+
+   final RpcService rpcService = 
AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
+
+   try {
+   final Deadline deadline = TestTimeOut.fromNow();
+
+   // Coordination directory
+   coordinateTempDir = temporaryFolder.newFolder();
+
+   // Start first process
+   dispatcherProcesses[0] = new DispatcherProcess(0, 
config);
+   dispatcherProcesses[0].startProcess();
+
+   highAvailabilityServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+   config,
+   TestingUtils.defaultExecutor());
+
+   // Start the task manager process
+   for (int i = 0; i < numberOfTaskManagers; i++) {
+   taskManagerRunners[i] = new 
TaskManagerRunner(config, ResourceID.generate());
+   taskManagerRunners[i].start();
+   }
+
+   // Leader listener
+   TestingListener leaderListener = new TestingListener();
+   leaderRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
+   leaderRetrievalService.start(leaderListener);
+
+   // Initial submission
+   
leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+   String leaderAddress = leaderListener.getAddress();
+   UUID leaderId = leaderListener.getLeaderSessionID();
+
+   final CompletableFuture 
dispatcherGatewayFuture = rpcService.connect(
+   leaderAddress,
+   DispatcherId.fromUuid(leaderId),
+   DispatcherGateway.class);
+   final DispatcherGateway dispatcherGateway = 
dispatcherGatewayFuture.get();
+
+   // Wait for all task managers to connect to the leading 
job manager
+   waitForTaskManagers(numberOfTaskManagers, 
dispatcherGateway, deadline.timeLeft());
+
+   final File coordinateDirClosure = coordinateTempDir;
+   final Throwable[] errorRef = new Throwable[1];
+
+   // we trigger program execution in a separate thread
+   Thread programTrigger = new Thread("Program Trigger") {
+   @Override
+   public void run() {
+   try {
+   
testJobManagerFailur

[GitHub] tillrohrmann commented on a change in pull request #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7647: 
[FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#discussion_r253532313
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
 ##
 @@ -220,6 +221,187 @@ public void flatMap(Long value, Collector out) 
throws Exception {
env.execute();
}
 
+   /**
+* Tests that the JobManager logs failures during recovery properly.
+*
+* @see https://issues.apache.org/jira/browse/FLINK-3185";>FLINK-3185
+*/
+   @Test
+   public void testJobManagerRecoveryFailureLog() throws Exception {
 
 Review comment:
   I think it would be better to test the behaviour more fine grained. What we 
actually tested was that the recovery logs a message if there is a recovery 
failure due to non-existing state handles. However, the behaviour slightly 
changed such that a failing recovery will lead to a `Dispatcher` termination 
(see `DispatcherHATest#testFailingRecoveryIsAFatalError`). Thus, I think this 
test is no longer necessary and can be removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rmetzger commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…

2019-02-04 Thread GitBox
rmetzger commented on issue #7632: [FLINK-11362] Port 
TaskManagerComponentsStartupShutdownTest to new co…
URL: https://github.com/apache/flink/pull/7632#issuecomment-460301104
 
 
   Yes, I will. I was able to reproduce the issue. Re-sending the message will 
not fix the problem. Mentioning the bot again might help.
   I'll let you know when there's an update


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…

2019-02-04 Thread GitBox
GJL commented on issue #7632: [FLINK-11362] Port 
TaskManagerComponentsStartupShutdownTest to new co…
URL: https://github.com/apache/flink/pull/7632#issuecomment-460300135
 
 
   @rmetzger Let me know if you find something.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…

2019-02-04 Thread GitBox
GJL commented on issue #7632: [FLINK-11362] Port 
TaskManagerComponentsStartupShutdownTest to new co…
URL: https://github.com/apache/flink/pull/7632#issuecomment-460299894
 
 
   Ugh... 🤦‍♂️ 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…

2019-02-04 Thread GitBox
GJL commented on issue #7632: [FLINK-11362] Port 
TaskManagerComponentsStartupShutdownTest to new co…
URL: https://github.com/apache/flink/pull/7632#issuecomment-460299749
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#issuecomment-460102649
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @tillrohrmann [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @tillrohrmann [PMC]
   * ❗ 3. Needs [attention] from.
   - Needs attention by @flinkbot
   * ❌ 4. The [architecture] is sound.
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#issuecomment-460102649
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @tillrohrmann [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @tillrohrmann [PMC]
   * ❗ 3. Needs [attention] from.
   - Needs attention by @flinkbot
   * ❌ 4. The [architecture] is sound.
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann removed a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
tillrohrmann removed a comment on issue #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#issuecomment-460294817
 
 
   @flinkbot approve attention @flinkbot 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#issuecomment-460295289
 
 
   @flinkbot attention @flinkbot 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#issuecomment-460294817
 
 
   @flinkbot approve attention @flinkbot 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11447) Deprecate "new Table(TableEnvironment, String)"

2019-02-04 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16759943#comment-16759943
 ] 

sunjincheng commented on FLINK-11447:
-

I am very happy that everyone has reached an agreement. Looking forward the PR!

And Happy Chinese New Year !

> Deprecate "new Table(TableEnvironment, String)"
> ---
>
> Key: FLINK-11447
> URL: https://issues.apache.org/jira/browse/FLINK-11447
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Dian Fu
>Priority: Major
>
> A more detailed description can be found in 
> [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions].
> Once table is an interface we can easily replace the underlying 
> implementation at any time. The constructor call prevents us from converting 
> it into an interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] rmetzger commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…

2019-02-04 Thread GitBox
rmetzger commented on issue #7632: [FLINK-11362] Port 
TaskManagerComponentsStartupShutdownTest to new co…
URL: https://github.com/apache/flink/pull/7632#issuecomment-460278469
 
 
   The bot has still not updated the tracking comment. I'm looking into it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…

2019-02-04 Thread GitBox
GJL commented on issue #7632: [FLINK-11362] Port 
TaskManagerComponentsStartupShutdownTest to new co…
URL: https://github.com/apache/flink/pull/7632#issuecomment-460271888
 
 
   @flinkbot approve all
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…

2019-02-04 Thread GitBox
GJL commented on issue #7632: [FLINK-11362] Port 
TaskManagerComponentsStartupShutdownTest to new co…
URL: https://github.com/apache/flink/pull/7632#issuecomment-460271152
 
 
   @flinkbot disapprove description
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…

2019-02-04 Thread GitBox
GJL commented on issue #7632: [FLINK-11362] Port 
TaskManagerComponentsStartupShutdownTest to new co…
URL: https://github.com/apache/flink/pull/7632#issuecomment-460271199
 
 
   @flinkbot approve description
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#issuecomment-460102649
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @tillrohrmann [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @tillrohrmann [PMC]
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The [architecture] is sound.
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…

2019-02-04 Thread GitBox
GJL commented on issue #7632: [FLINK-11362] Port 
TaskManagerComponentsStartupShutdownTest to new co…
URL: https://github.com/apache/flink/pull/7632#issuecomment-460271030
 
 
   @flinkbot approve architecture


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7632: [FLINK-11362] Port TaskManagerComponentsStartupShutdownTest to new co…

2019-02-04 Thread GitBox
GJL commented on issue #7632: [FLINK-11362] Port 
TaskManagerComponentsStartupShutdownTest to new co…
URL: https://github.com/apache/flink/pull/7632#issuecomment-460270959
 
 
   @flinkbot disapprove architecture


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#issuecomment-460102649
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @tillrohrmann [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @tillrohrmann [PMC]
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The [architecture] is sound.
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#issuecomment-460270690
 
 
   @flinkbot approve consensus


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#issuecomment-460102649
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @tillrohrmann [PMC]
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The [architecture] is sound.
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
flinkbot edited a comment on issue #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#issuecomment-460102649
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @tillrohrmann [PMC]
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The [architecture] is sound.
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#issuecomment-460270092
 
 
   @flinkbot approve description


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann removed a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
tillrohrmann removed a comment on issue #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#issuecomment-460269748
 
 
   @flinkbot approve description


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
tillrohrmann commented on issue #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#issuecomment-460269748
 
 
   @flinkbot approve description consensus


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
StefanRRichter commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253491234
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
 ##
 @@ -51,6 +51,7 @@ function run_ha_test() {
 # change the pid dir to start log files always from 0, this is important 
for checks in the
 # jm killing loop
 set_conf "env.pid.dir" "${TEST_DATA_DIR}"
+set_conf "env.java.opts" "-ea"
 
 Review comment:
   I think we might want to reach consensus about that on a broader scope than 
just this PR. IMO, `-ea` should be enabled in all tests. I enabled it here 
because it was relevant for my PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann edited a comment on issue #7647: [FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base

2019-02-04 Thread GitBox
tillrohrmann edited a comment on issue #7647: [FLINK-11352][tests]Port 
JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#issuecomment-460269748
 
 
   @flinkbot approve description


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253485428
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestRestartStrategy.java
 ##
 @@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+
+import javax.annotation.Nonnull;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * A {@link RestartStrategy} for tests that gives fine-grained control over 
the point in time when restart actions are
+ * performed.
+ */
+public class TestRestartStrategy implements RestartStrategy {
+
+   @Nonnull
+   private final Queue actionsQueue;
+
+   private final int maxRestarts;
+
+   private int restartAttempts;
+
+   private boolean manuallyTriggeredExecution;
+
+   public TestRestartStrategy() {
+   this(true);
+   }
+
+   public TestRestartStrategy(boolean manuallyTriggeredExecution) {
+   this(-1, manuallyTriggeredExecution);
+   }
+
+   public TestRestartStrategy(int maxRestarts, boolean 
manuallyTriggeredExecution) {
+   this(new LinkedList<>(), maxRestarts, 
manuallyTriggeredExecution);
+   }
+
+   public TestRestartStrategy(
+   @Nonnull Queue actionsQueue,
+   int maxRestarts,
+   boolean manuallyTriggeredExecution) {
+
+   this.actionsQueue = actionsQueue;
+   this.maxRestarts = maxRestarts;
+   this.manuallyTriggeredExecution = manuallyTriggeredExecution;
+   }
+
+   @Override
+   public boolean canRestart() {
+   return maxRestarts < 0 || maxRestarts - restartAttempts > 0;
+   }
+
+   @Override
+   public void restart(RestartCallback restarter, ScheduledExecutor 
executor) {
+
+   ++restartAttempts;
+   ExecutorAction executorAction = new 
ExecutorAction(restarter::triggerFullRecovery, executor);
+   if (manuallyTriggeredExecution) {
+   synchronized (actionsQueue) {
+   actionsQueue.add(executorAction);
+   }
+   } else {
+   executorAction.trigger();
+   }
+   }
+
+   public int getNumberOfQueuedActions() {
+   synchronized (actionsQueue) {
+   return actionsQueue.size();
+   }
+   }
+
+   public CompletableFuture triggerNextAction() {
+   synchronized (actionsQueue) {
+   return actionsQueue.remove().trigger();
+   }
+   }
+
+   public CompletableFuture triggerAll() {
+
+   synchronized (actionsQueue) {
+
+   if (actionsQueue.isEmpty()) {
+   return CompletableFuture.completedFuture(null);
+   }
+
+   CompletableFuture[] completableFutures = new 
CompletableFuture[actionsQueue.size()];
+   for (int i = 0; i < completableFutures.length; ++i) {
+   completableFutures[i] = triggerNextAction();
+   }
+   return 
FutureUtils.ConjunctFuture.allOf(completableFutures);
 
 Review comment:
   Better to use `CompletableFuture.allOf(completableFutures)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253429216
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1789,4 +1835,10 @@ void notifyExecutionChange(
}
}
}
+
+   void assertRunningInJobMasterMainThread() {
+   if (!(jobMasterMainThreadExecutor instanceof 
ComponentMainThreadExecutor.DummyComponentMainThreadExecutor)) {
+   jobMasterMainThreadExecutor.assertRunningInMainThread();
+   }
+   }
 }
 
 Review comment:
   Nice, in the end it's very few changes to the `ExecutionGraph` :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253420517
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledFutureAdapter.java
 ##
 @@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import javax.annotation.Nonnull;
+
+import java.util.Objects;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * Adapter from {@link Future} to {@link ScheduledFuture}. This enriches the 
basic future with scheduling information.
+ * @param  value type of the future.
+ */
+public class ScheduledFutureAdapter implements ScheduledFuture {
+
+   /** The uid sequence generator. */
+   private static final AtomicLong SEQUENCE_GEN = new AtomicLong();
+
+   /** The encapsulated basic future to which execution is delegated. */
+   @Nonnull
+   private final Future delegate;
+
+   /** Tie-breaker for {@link #compareTo(Delayed)}. */
+   private final long tieBreakerUid;
+
+   /** The time when this is scheduled for execution in nanoseconds.*/
+   private final long scheduleTimeNanos;
+
+   public ScheduledFutureAdapter(
+   @Nonnull Future delegate,
+   long delay,
+   @Nonnull TimeUnit timeUnit) {
+   this(
+   delegate,
+   System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, 
timeUnit),
+   SEQUENCE_GEN.incrementAndGet());
+   }
+
+   @VisibleForTesting
+   ScheduledFutureAdapter(
+   @Nonnull Future delegate,
+   long scheduleTimeNanos,
+   long tieBreakerUid) {
+
+   this.delegate = delegate;
+   this.scheduleTimeNanos = scheduleTimeNanos;
+   this.tieBreakerUid = tieBreakerUid;
+   }
+
+   @Override
+   public long getDelay(@Nonnull TimeUnit unit) {
+   return unit.convert(scheduleTimeNanos - System.nanoTime(), 
TimeUnit.NANOSECONDS);
+   }
+
+   @Override
+   public int compareTo(@Nonnull Delayed o) {
+
+   if (o == this) {
+   return 0;
+   }
+
+   // tie breaking for ScheduledFutureAdapter objects
+   if (o instanceof ScheduledFutureAdapter) {
+   int cmp = Long.compare(scheduleTimeNanos, 
((ScheduledFutureAdapter) o).scheduleTimeNanos);
 
 Review comment:
   Nitpick: Cast could be `ScheduledFutureAdapter` in order to avoid a 
warning of raw class usage.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253418747
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_ha_datastream.sh
 ##
 @@ -51,6 +51,7 @@ function run_ha_test() {
 # change the pid dir to start log files always from 0, this is important 
for checks in the
 # jm killing loop
 set_conf "env.pid.dir" "${TEST_DATA_DIR}"
+set_conf "env.java.opts" "-ea"
 
 Review comment:
   Do we only want to enable assertions forthis specific test or for all e2e 
tests?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253480625
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 ##
 @@ -512,31 +584,38 @@ public void testSuspendWhileRestarting() throws 
Exception {
controllableRestartStrategy,
scheduler);
 
-   
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+   eg.start(testMainThread.getMainThreadExecutor());
 
-   assertEquals(JobStatus.CREATED, eg.getState());
+   testMainThread.execute(() -> {
+   
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
-   eg.scheduleForExecution();
+   assertEquals(JobStatus.CREATED, eg.getState());
 
-   assertEquals(JobStatus.RUNNING, eg.getState());
+   eg.scheduleForExecution();
+   assertEquals(JobStatus.RUNNING, eg.getState());
+   });
 
-   instance.markDead();
+   testMainThread.execute(instance::markDead);
 

controllableRestartStrategy.getReachedCanRestart().await(timeout.toMilliseconds(),
 TimeUnit.MILLISECONDS);
 
-   assertEquals(JobStatus.RESTARTING, eg.getState());
+   testMainThread.getMainThreadExecutor().execute(() -> {
 
-   eg.suspend(new Exception("Test exception"));
+   assertEquals(JobStatus.RESTARTING, eg.getState());
 
-   assertEquals(JobStatus.SUSPENDED, eg.getState());
+   eg.suspend(new Exception("Test exception"));
+   assertEquals(JobStatus.SUSPENDED, eg.getState());
+   });
 
controllableRestartStrategy.unlockRestart();
-

controllableRestartStrategy.getRestartDone().await(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
 
-   assertEquals(JobStatus.SUSPENDED, eg.getState());
+   testMainThread.execute(() -> {
+   assertEquals(JobStatus.SUSPENDED, eg.getState());
+   });
}
 
+   @Ignore
@Test
public void testConcurrentLocalFailAndRestart() throws Exception {
 
 Review comment:
   I would suggest to rename this test case since it is not concurrent anymore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253448003
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ScheduledFutureAdapterTest.java
 ##
 @@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Unit tests for {@link ScheduledFutureAdapter}.
+ */
+public class ScheduledFutureAdapterTest extends TestLogger {
+
+   private ScheduledFutureAdapter objectUnderTest;
+   private TestFuture innerDelegate;
+
+   @Before
+   public void before() throws Exception {
+   this.innerDelegate = new TestFuture();
+   this.objectUnderTest = new 
ScheduledFutureAdapter<>(innerDelegate, 420321L, TimeUnit.NANOSECONDS);
+   }
+
+   @Test
+   public void testForwardedMethods() throws Exception {
+
+   Assert.assertEquals((Integer) 4711, objectUnderTest.get());
+   Assert.assertEquals(1, innerDelegate.getGetInvocationCount());
+
+   Assert.assertEquals((Integer) 4711, objectUnderTest.get(42L, 
TimeUnit.SECONDS));
+   Assert.assertEquals(1, 
innerDelegate.getGetTimeoutInvocationCount());
+
+   Assert.assertEquals(innerDelegate.isCancelExpected(), 
objectUnderTest.cancel(true));
+   Assert.assertEquals(1, 
innerDelegate.getCancelInvocationCount());
+
+   
innerDelegate.setCancelResult(!innerDelegate.isCancelExpected());
+   Assert.assertEquals(innerDelegate.isCancelExpected(), 
objectUnderTest.cancel(true));
+   Assert.assertEquals(2, 
innerDelegate.getCancelInvocationCount());
+
+   Assert.assertEquals(innerDelegate.isCancelledExpected(), 
objectUnderTest.isCancelled());
+   Assert.assertEquals(1, 
innerDelegate.getIsCancelledInvocationCount());
+
+   
innerDelegate.setIsCancelledResult(!innerDelegate.isCancelledExpected());
+   Assert.assertEquals(innerDelegate.isCancelledExpected(), 
objectUnderTest.isCancelled());
+   Assert.assertEquals(2, 
innerDelegate.getIsCancelledInvocationCount());
+
+   Assert.assertEquals(innerDelegate.isDoneExpected(), 
objectUnderTest.isDone());
+   Assert.assertEquals(1, 
innerDelegate.getIsDoneInvocationCount());
+
+   
innerDelegate.setIsDoneExpected(!innerDelegate.isDoneExpected());
+   Assert.assertEquals(innerDelegate.isDoneExpected(), 
objectUnderTest.isDone());
+   Assert.assertEquals(2, 
innerDelegate.getIsDoneInvocationCount());
+   }
+
+   @Test
+   public void testCompareToEqualsHashCode() {
+
+   Assert.assertEquals(0, 
objectUnderTest.compareTo(objectUnderTest));
+   Assert.assertEquals(0, 
objectUnderTest.compareTo(objectUnderTest));
 
 Review comment:
   Is it intentional that we test twice `equals`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253448367
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ComponentMainThreadTestExecutor.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.rules.ExternalResource;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Util to run test calls with a provided main thread executor.
+ */
+public class ComponentMainThreadTestExecutor {
+
+   /** The main thread executor to which execution is delegated. */
+   @Nonnull
+   private final TestComponentMainThreadExecutor mainThreadExecutor;
+
+   public ComponentMainThreadTestExecutor(@Nonnull 
TestComponentMainThreadExecutor mainThreadExecutor) {
+   this.mainThreadExecutor = mainThreadExecutor;
+   }
+
+   /**
+* Executes the given supplier with the main thread executor until 
completion, returns the result or a exception.
+* This method blocks until the execution is complete.
+*/
+   public  U execute(@Nonnull SupplierWithException 
supplierWithException) {
+   return CompletableFuture.supplyAsync(
+   FunctionUtils.uncheckedSupplier(supplierWithException),
+   mainThreadExecutor)
+   .join();
+   }
+
+   /**
+* Executes the given runnable with the main thread executor and blocks 
until completion.
+*/
+   public void execute(@Nonnull ThrowingRunnable 
throwingRunnable) {
+   execute(() -> {
+   throwingRunnable.run();
+   return null;
+   });
+   }
+
+   @Nonnull
+   public TestComponentMainThreadExecutor getMainThreadExecutor() {
+   return mainThreadExecutor;
+   }
+
+   /**
+* Test resource for convenience.
+*/
+   public static class Resource extends ExternalResource {
+
+   private ComponentMainThreadTestExecutor 
componentMainThreadTestExecutor;
+   private ScheduledExecutorService innerExecutorService;
+
+   @Override
+   protected void before() throws Throwable {
+   this.innerExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+   this.componentMainThreadTestExecutor =
+   new ComponentMainThreadTestExecutor(
+   
TestComponentMainThreadExecutor.forSingleThreadExecutor(innerExecutorService));
+   }
+
+   @Override
+   protected void after() {
+   this.innerExecutorService.shutdown();
+   this.innerExecutorService.shutdownNow();
+   try {
+   
this.innerExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
+   } catch (InterruptedException e) {
+   throw new RuntimeException(e);
+   }
 
 Review comment:
   Here we could use the `ExecutorUtils#gracefulShutdown` to avoid code 
duplication.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253486672
 
 

 ##
 File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredDirectExecutor.java
 ##
 @@ -32,8 +33,17 @@
  */
 public class ManuallyTriggeredDirectExecutor implements Executor {
 
 Review comment:
   What is the difference of this class and 
`ManuallyTriggeredScheduledExecutor`? Could they be unified?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253480445
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 ##
 @@ -608,12 +569,11 @@ public void run() {
 
 
 Review comment:
   Do we still need these wait calls?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253484809
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestComponentMainThreadExecutor.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * An implementation of {@link 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor} for tests.
+ */
+public class TestComponentMainThreadExecutor extends 
ComponentMainThreadExecutorServiceAdapter {
 
 Review comment:
   I have to admit that I'm a bit confused by the naming 
`TestComponentMainThreadExecutor` and `ComponentMainThreadTestExecutor`. 
Especially since the `ComponentMainThreadTestExecutor` takes a 
`TestComponentMainThreadExecutor` as a constructor argument. Maybe we could 
name the one `TestingComponentMainThreadExecutorServiceAdapter` and the other 
one `TestingComponentMainThreadExecutor`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253445788
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
 ##
 @@ -44,4 +51,28 @@ public void exitMainThread() {

assert(endpoint.currentMainThread.compareAndSet(Thread.currentThread(), null)) :
"The RpcEndpoint has concurrent access from " + 
endpoint.currentMainThread.get();
}
+
+   /**
+* Returns true iff the current thread is equals to the provided 
expected thread and logs violations.
+*
+* @param expected the expected main thread.
+* @return true iff the current thread is equals to the provided 
expected thread.
+*/
+   public static boolean isRunningInExpectedThread(@Nullable Thread 
expected) {
+   Thread actual = Thread.currentThread();
+   if (expected != actual) {
+
+   String violationMsg = "Violation of main thread 
constraint detected: expected <"
+   + expected + "> but running in <" + actual + 
">.";
+
+   log.warn(violationMsg,
+   expected,
+   actual,
 
 Review comment:
   I think `expected` and `actual` is no longer needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253422204
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
 ##
 @@ -108,25 +92,28 @@ public void onTaskFailure(Execution taskExecution, 
Throwable cause) {
//   it helps to support better testing
final CompletableFuture terminationFuture = 
taskExecution.getTerminalStateFuture();
 
-   final ExecutionVertex vertexToRecover = 
taskExecution.getVertex(); 
-   final long globalModVersion = 
taskExecution.getGlobalModVersion();
-
-   terminationFuture.thenAcceptAsync(
-   (ExecutionState value) -> {
-   try {
-   long createTimestamp = 
System.currentTimeMillis();
-   Execution newExecution = 
vertexToRecover.resetForNewExecution(createTimestamp, globalModVersion);
-   newExecution.scheduleForExecution();
-   }
-   catch (GlobalModVersionMismatch e) {
-   // this happens if a concurrent global 
recovery happens. simply do nothing.
-   }
-   catch (Exception e) {
-   executionGraph.failGlobal(
-   new Exception("Error 
during fine grained recovery - triggering full recovery", e));
-   }
-   },
-   callbackExecutor);
+   Consumer restartProcedure = (ExecutionState 
executionState) -> {
+   performExecutionVertexRestart(
+   taskExecution.getVertex(),
+   taskExecution.getGlobalModVersion());
+   };
+
+   terminationFuture.thenAccept(restartProcedure);
 
 Review comment:
   nit: this could be `terminationFuture.thenRun(() -> 
performExecutionVertexRestart(taskExecution.getVertex(), 
taskExecution.getGlobalModVersion()))` and then we would not need to create the 
`restartProcedure` consumer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make access to ExecutionGraph single threaded from JobMaster main thread

2019-02-04 Thread GitBox
tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r253483809
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestComponentMainThreadExecutor.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * An implementation of {@link 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor} for tests.
+ */
+public class TestComponentMainThreadExecutor extends 
ComponentMainThreadExecutorServiceAdapter {
+
+   public TestComponentMainThreadExecutor(
+   @Nonnull ScheduledExecutorService scheduledExecutorService,
+   @Nonnull Thread mainThread) {
+
+   super(scheduledExecutorService, () -> {
+   assert 
MainThreadValidatorUtil.isRunningInExpectedThread(mainThread);
+   });
+   }
+
+   public static TestComponentMainThreadExecutor forMainThread() {
+   final Thread main = Thread.currentThread();
+   return new TestComponentMainThreadExecutor(new 
DirectScheduledExecutorService() {
+   @Override
+   public void execute(Runnable command) {
+   assert 
MainThreadValidatorUtil.isRunningInExpectedThread(main);
+   super.execute(command);
+   }
+   }, main);
+   }
+
+   /**
+* Creates a test executor that delegates to the given {@link 
ScheduledExecutorService}. The given executor must
+* execute all submissions with the same thread.
+*/
+   public static TestComponentMainThreadExecutor forSingleThreadExecutor(
+   @Nonnull ScheduledExecutorService singleThreadExecutor) {
+   try {
+   Thread thread = 
CompletableFuture.supplyAsync(Thread::currentThread, 
singleThreadExecutor).get();
 
 Review comment:
   If you use `join()` here, then you don't need the try-catch block.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >