[jira] [Commented] (FLINK-10724) Refactor failure handling in check point coordinator

2018-12-10 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715194#comment-16715194
 ] 

Yun Tang commented on FLINK-10724:
--

[~yanghua] I think CheckpointSubsumed should not be counted as checkpoint 
failure, which in my opinion, is more likely a part of procedure to complete 
new checkpoint.

> Refactor failure handling in check point coordinator
> 
>
> Key: FLINK-10724
> URL: https://issues.apache.org/jira/browse/FLINK-10724
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: vinoyang
>Priority: Major
>
> At the moment failure handling of asynchronously triggered checkpoint in 
> check point coordinator happens in different places. We could organise it 
> similar way as failure handling of synchronous triggering of checkpoint in 
> *CheckpointTriggerResult* where we classify error cases. This will simplify 
> e.g. integration of error counter for FLINK-4810.
> See also discussion here: [https://github.com/apache/flink/pull/6567]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11041) ReinterpretDataStreamAsKeyedStreamITCase.testReinterpretAsKeyedStream failed on Travis

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715174#comment-16715174
 ] 

ASF GitHub Bot commented on FLINK-11041:


StefanRRichter opened a new pull request #7268: [FLINK-11041][test] 
ReinterpretDataStreamAsKeyedStreamITCase source s…
URL: https://github.com/apache/flink/pull/7268
 
 
   ## What is the purpose of the change
   
   This test fixes the flaky behavior of 
`ReinterpretDataStreamAsKeyedStreamITCase`. the problem was that the test was 
changed to also test failover, but the source did not use the checkpointing 
lock.
   
   
   ## Brief change log
   
   Hold checkpointing lock in the test's source function.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ReinterpretDataStreamAsKeyedStreamITCase.testReinterpretAsKeyedStream failed 
> on Travis
> --
>
> Key: FLINK-11041
> URL: https://issues.apache.org/jira/browse/FLINK-11041
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.8.0
>
>
> {{ReinterpretDataStreamAsKeyedStreamITCase.testReinterpretAsKeyedStream}} 
> failed on Travis: 
> https://api.travis-ci.org/v3/job/461707550/log.txt
> https://travis-ci.org/apache/flink/jobs/461707550
> It seems that the test job is producing wrong results:
> {code}
> testReinterpretAsKeyedStream(org.apache.flink.streaming.api.datastream.ReinterpretDataStreamAsKeyedStreamITCase)
>   Time elapsed: 8.357 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
>   at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
>   at 
> org.apache.flink.streaming.api.datastream.ReinterpretDataStreamAsKeyedStreamITCase.testReinterpretAsKeyedStream(ReinterpretDataStreamAsKeyedStreamITCase.java:107)
> Caused by: java.lang.AssertionError: expected:<300> but was:<301>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> org.apache.flink.streaming.api.datastream.ReinterpretDataStreamAsKeyedStreamITCase$ValidatingSink.close(ReinterpretDataStreamAsKeyedStreamITCase.java:295)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:442)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11041) ReinterpretDataStreamAsKeyedStreamITCase.testReinterpretAsKeyedStream failed on Travis

2018-12-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11041:
---
Labels: pull-request-available test-stability  (was: test-stability)

> ReinterpretDataStreamAsKeyedStreamITCase.testReinterpretAsKeyedStream failed 
> on Travis
> --
>
> Key: FLINK-11041
> URL: https://issues.apache.org/jira/browse/FLINK-11041
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.8.0
>
>
> {{ReinterpretDataStreamAsKeyedStreamITCase.testReinterpretAsKeyedStream}} 
> failed on Travis: 
> https://api.travis-ci.org/v3/job/461707550/log.txt
> https://travis-ci.org/apache/flink/jobs/461707550
> It seems that the test job is producing wrong results:
> {code}
> testReinterpretAsKeyedStream(org.apache.flink.streaming.api.datastream.ReinterpretDataStreamAsKeyedStreamITCase)
>   Time elapsed: 8.357 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
>   at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
>   at 
> org.apache.flink.streaming.api.datastream.ReinterpretDataStreamAsKeyedStreamITCase.testReinterpretAsKeyedStream(ReinterpretDataStreamAsKeyedStreamITCase.java:107)
> Caused by: java.lang.AssertionError: expected:<300> but was:<301>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> org.apache.flink.streaming.api.datastream.ReinterpretDataStreamAsKeyedStreamITCase$ValidatingSink.close(ReinterpretDataStreamAsKeyedStreamITCase.java:295)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:109)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:442)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter opened a new pull request #7268: [FLINK-11041][test] ReinterpretDataStreamAsKeyedStreamITCase source s…

2018-12-10 Thread GitBox
StefanRRichter opened a new pull request #7268: [FLINK-11041][test] 
ReinterpretDataStreamAsKeyedStreamITCase source s…
URL: https://github.com/apache/flink/pull/7268
 
 
   ## What is the purpose of the change
   
   This test fixes the flaky behavior of 
`ReinterpretDataStreamAsKeyedStreamITCase`. the problem was that the test was 
changed to also test failover, but the source did not use the checkpointing 
lock.
   
   
   ## Brief change log
   
   Hold checkpointing lock in the test's source function.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction

2018-12-10 Thread GitBox
walterddr commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240322027
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Integer => JInt}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.dataview.{DataView, MapView}
+import org.apache.flink.table.dataview.StateMapView
+import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, 
GroupAggProcessFunction}
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class AggFunctionHarnessTest extends HarnessTestBase {
+  private val queryConfig = new TestStreamQueryConfig(Time.seconds(0), 
Time.seconds(0))
+
+  @Test
+  def testCollectAggregate(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val data = new mutable.MutableList[(JInt, String)]
+val t = env.fromCollection(data).toTable(tEnv, 'a, 'b)
+tEnv.registerTable("T", t)
+val sqlQuery = tEnv.sqlQuery(
+  s"""
+ |SELECT
+ |  b, collect(a)
+ |FROM (
+ |  SELECT a, b
+ |  FROM T
+ |  GROUP BY a, b
+ |) GROUP BY b
+ |""".stripMargin)
+
+val testHarness = createHarnessTester[String, CRow, CRow](
+  sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+
+testHarness.setStateBackend(getStateBackend)
+testHarness.open()
+
+val operator = getOperator(testHarness)
+val state = getState(operator, 
"acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]]
+assertTrue(state.isInstanceOf[StateMapView[_, _]])
+
assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]])
+
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "bbb"), 1))
+expectedOutput.add(new StreamRecord(CRow("bbb", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
1).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(2: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
2).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2, 2 -> 
1).asJava), 1))
+
+// remove some state: state may be cleaned up by the state backend if not 
accessed more than ttl
+operator.setCurrentKey(Row.of("aaa"))
+state.remove(2)
+
+// retract after state has been cleaned up
+testHarness.processElement(new StreamRecord(CRow(false, 2: JInt, "aaa"), 
1))
+
+val result = testHarness.getOutput
+
+verify(expectedOutput, result)
+
+testHarness.close()
+  }
+
+  private def getState(
 
 Review comment:
   This can probably be put into `HarnessTestBase` as well. As of now I can 
only image the Operator to 

[GitHub] walterddr commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction

2018-12-10 Thread GitBox
walterddr commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240320208
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##
 @@ -87,7 +85,7 @@ class HarnessTestBase {
 new 
RowTypeInfo(distinctCountAggregates.map(getAccumulatorTypeOfAggregateFunction(_)):
 _*)
 
   protected val distinctCountDescriptor: String = 
EncodingUtils.encodeObjectToString(
-new MapStateDescriptor("distinctAgg0", distinctCountAggregationStateType, 
Types.LONG))
 
 Review comment:
   This change doesn't seem necessary


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715297#comment-16715297
 ] 

ASF GitHub Bot commented on FLINK-11074:


walterddr commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240320208
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##
 @@ -87,7 +85,7 @@ class HarnessTestBase {
 new 
RowTypeInfo(distinctCountAggregates.map(getAccumulatorTypeOfAggregateFunction(_)):
 _*)
 
   protected val distinctCountDescriptor: String = 
EncodingUtils.encodeObjectToString(
-new MapStateDescriptor("distinctAgg0", distinctCountAggregationStateType, 
Types.LONG))
 
 Review comment:
   This change doesn't seem necessary


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve the harness test to make it possible test with state backend
> 
>
> Key: FLINK-11074
> URL: https://issues.apache.org/jira/browse/FLINK-11074
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the harness test can only test without state backend. If you use a 
> DataView in the accumulator of the aggregate function, the DataView is a java 
> object and held in heap, not replaced with StateMapView/StateListView which 
> values are actually held in the state backend. We should improve the harness 
> test to make it possible to test with state backend. Otherwise, issues such 
> as FLINK-10674 could have never been found. With this harness test available, 
> we could test the built-in aggregate functions which use the DataView more 
> fine grained.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715298#comment-16715298
 ] 

ASF GitHub Bot commented on FLINK-11074:


walterddr commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240322027
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Integer => JInt}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.dataview.{DataView, MapView}
+import org.apache.flink.table.dataview.StateMapView
+import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, 
GroupAggProcessFunction}
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class AggFunctionHarnessTest extends HarnessTestBase {
+  private val queryConfig = new TestStreamQueryConfig(Time.seconds(0), 
Time.seconds(0))
+
+  @Test
+  def testCollectAggregate(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val data = new mutable.MutableList[(JInt, String)]
+val t = env.fromCollection(data).toTable(tEnv, 'a, 'b)
+tEnv.registerTable("T", t)
+val sqlQuery = tEnv.sqlQuery(
+  s"""
+ |SELECT
+ |  b, collect(a)
+ |FROM (
+ |  SELECT a, b
+ |  FROM T
+ |  GROUP BY a, b
+ |) GROUP BY b
+ |""".stripMargin)
+
+val testHarness = createHarnessTester[String, CRow, CRow](
+  sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+
+testHarness.setStateBackend(getStateBackend)
+testHarness.open()
+
+val operator = getOperator(testHarness)
+val state = getState(operator, 
"acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]]
+assertTrue(state.isInstanceOf[StateMapView[_, _]])
+
assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]])
+
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "bbb"), 1))
+expectedOutput.add(new StreamRecord(CRow("bbb", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
1).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(2: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
2).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2, 2 -> 
1).asJava), 1))
+
+// remove some state: state may be cleaned up by the state backend if not 
accessed more than ttl
+operator.setCurrentKey(Row.of("aaa"))
+state.remove(2)
+
+// retract after state has been cleaned up
+testHarness.processElement(new StreamRecord(CRow(false, 2: JInt, "aaa"), 
1))
+
+val result = 

[GitHub] walterddr commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction

2018-12-10 Thread GitBox
walterddr commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240321103
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##
 @@ -491,13 +489,75 @@ class HarnessTestBase {
 distinctCountFuncName,
 distinctCountAggCode)
 
+  def createHarnessTester[KEY, IN, OUT](
+  dataStream: DataStream[_],
+  prefixOperatorName: String)
+  : KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = {
+
+val transformation = extractExpectedTransformation(
+  dataStream.javaStream.getTransformation,
+  prefixOperatorName).asInstanceOf[OneInputTransformation[_, _]]
+if (transformation == null) {
+  throw new Exception("Can not find the expected transformation")
+}
+
+val processOperator = 
transformation.getOperator.asInstanceOf[OneInputStreamOperator[IN, OUT]]
+val keySelector = 
transformation.getStateKeySelector.asInstanceOf[KeySelector[IN, KEY]]
+val keyType = 
transformation.getStateKeyType.asInstanceOf[TypeInformation[KEY]]
+
+createHarnessTester(processOperator, keySelector, keyType)
+  .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT]]
+  }
+
+  private def extractExpectedTransformation(
+  transformation: StreamTransformation[_],
+  prefixOperatorName: String): StreamTransformation[_] = {
+def extractFromInputs(inputs: StreamTransformation[_]*): 
StreamTransformation[_] = {
+  for (input <- inputs) {
+val t = extractExpectedTransformation(input, prefixOperatorName)
+if (t != null) {
+  return t
+}
+  }
+  null
+}
+
+transformation match {
+  case one: OneInputTransformation[_, _] =>
+if (one.getName.startsWith(prefixOperatorName)) {
+  one
+} else {
+  extractExpectedTransformation(one.getInput, prefixOperatorName)
+}
+  case two: TwoInputTransformation[_, _, _] =>
 
 Review comment:
   Let's throw unsupported operation for now, since there's no code path that 
executes two input transform yet. we can always add this logic later when 
necessary. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2018-12-10 Thread GitBox
azagrebin commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r240313527
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -726,6 +728,56 @@ void sendPartitionInfos() {
}
}
 
+   /**
+* Check whether the InputDependencyConstraint is satisfied for this 
vertex.
+*
+* @return whether the input constraint is satisfied
+*/
+   public boolean checkInputDependencyConstraints() {
 
 Review comment:
   This might be a bit more concise:
   ```
  boolean checkInputDependencyConstraints() {
if (getExecutionGraph().getInputDependencyConstraint() == 
InputDependencyConstraint.ANY) {
// InputDependencyConstraint == ANY
return 
jobVertex.getInputs().stream().anyMatch(this::isInputConsumable);
} else {
// InputDependencyConstraint == ALL
return 
jobVertex.getInputs().stream().allMatch(this::isInputConsumable);
}
}
   
boolean isInputConsumable(IntermediateResult result) {
if (result.getResultType().isPipelined()) {
// For PIPELINED result, the input is consumable if any 
result partition has produced records or is finished
return 
Arrays.stream(result.getPartitions()).anyMatch(IntermediateResultPartition::hasDataProduced);
} else {
// For BLOCKING result, the input is consumable if all 
the partitions in the result are finished
return result.areAllPartitionsFinished();
}
}
   ```
   I am not sure we need to check the `ANY` case at all. Just checking this 
theoretically changes current behaviour. On the other hand, at the moment, I 
think it is always true for `ANY` where we check it if 
`ScheduleMode.LAZY_FROM_SOURCES`. 
   
   I am also not sure that `ALL` config makes sense together with 
`ScheduleMode.EAGER`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2018-12-10 Thread GitBox
azagrebin commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r240310923
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -747,31 +747,34 @@ else if (numConsumers == 0) {

consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(
partition, partitionExecution));
 
-   // When deploying a consuming task, its task 
deployment descriptor will contain all
-   // deployment information available at the 
respective time. It is possible that some
-   // of the partitions to be consumed have not 
been created yet. These are updated
-   // runtime via the update messages.
-   //
-   // TODO The current approach may send many 
update messages even though the consuming
-   // task has already been deployed with all 
necessary information. We have to check
-   // whether this is a problem and fix it, if it 
is.
-   CompletableFuture.supplyAsync(
-   () -> {
-   try {
-   final ExecutionGraph 
executionGraph = consumerVertex.getExecutionGraph();
-   
consumerVertex.scheduleForExecution(
-   
executionGraph.getSlotProvider(),
-   
executionGraph.isQueuedSchedulingAllowed(),
-   
LocationPreferenceConstraint.ANY, // there must be at least one known location
-   
Collections.emptySet());
-   } catch (Throwable t) {
-   consumerVertex.fail(new 
IllegalStateException("Could not schedule consumer " +
+   // Schedule the consumer vertex if its inputs 
constraint is satisfied, otherwise postpone the scheduling
+   if 
(consumerVertex.checkInputDependencyConstraints()) {
+   // When deploying a consuming task, its 
task deployment descriptor will contain all
+   // deployment information available at 
the respective time. It is possible that some
+   // of the partitions to be consumed 
have not been created yet. These are updated
+   // runtime via the update messages.
+   //
+   // TODO The current approach may send 
many update messages even though the consuming
+   // task has already been deployed with 
all necessary information. We have to check
+   // whether this is a problem and fix 
it, if it is.
+   CompletableFuture.supplyAsync(
 
 Review comment:
   The body of the introduced if is quite big, could we move it into a separate 
method, like `scheduleConsumer`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2018-12-10 Thread GitBox
azagrebin commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r240312767
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
 ##
 @@ -36,6 +36,11 @@
 
private List> consumers;
 
+   /**
+* Whether this partition has data produced. For pipelined result only.
 
 Review comment:
   Whether this partition has produced some data. For pipelined result only.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2018-12-10 Thread GitBox
azagrebin commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r240321678
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
 ##
 @@ -192,6 +193,7 @@ private static ExecutionVertex 
mockExecutionVertex(ExecutionState state, Resourc
 
private static IntermediateResultPartition 
mockPartition(ExecutionVertex producer) {
IntermediateResultPartition partition = 
mock(IntermediateResultPartition.class);
+   
when(partition.getResultType()).thenReturn(ResultPartitionType.PIPELINED);
 
 Review comment:
   Is this change needed? `isConsumable` should be still enough.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2018-12-10 Thread GitBox
azagrebin commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r240311956
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
 ##
 @@ -36,6 +36,11 @@
 
private List> consumers;
 
+   /**
+* Whether this partition has data produced. For pipelined result only.
+*/
+   private boolean dataProduced = false;
 
 Review comment:
   I would call it `isSomePipelinedDataProduced`.
   Also methid: `markSomePipelinedDataProduced`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715277#comment-16715277
 ] 

ASF GitHub Bot commented on FLINK-10945:


azagrebin commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r240310923
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -747,31 +747,34 @@ else if (numConsumers == 0) {

consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(
partition, partitionExecution));
 
-   // When deploying a consuming task, its task 
deployment descriptor will contain all
-   // deployment information available at the 
respective time. It is possible that some
-   // of the partitions to be consumed have not 
been created yet. These are updated
-   // runtime via the update messages.
-   //
-   // TODO The current approach may send many 
update messages even though the consuming
-   // task has already been deployed with all 
necessary information. We have to check
-   // whether this is a problem and fix it, if it 
is.
-   CompletableFuture.supplyAsync(
-   () -> {
-   try {
-   final ExecutionGraph 
executionGraph = consumerVertex.getExecutionGraph();
-   
consumerVertex.scheduleForExecution(
-   
executionGraph.getSlotProvider(),
-   
executionGraph.isQueuedSchedulingAllowed(),
-   
LocationPreferenceConstraint.ANY, // there must be at least one known location
-   
Collections.emptySet());
-   } catch (Throwable t) {
-   consumerVertex.fail(new 
IllegalStateException("Could not schedule consumer " +
+   // Schedule the consumer vertex if its inputs 
constraint is satisfied, otherwise postpone the scheduling
+   if 
(consumerVertex.checkInputDependencyConstraints()) {
+   // When deploying a consuming task, its 
task deployment descriptor will contain all
+   // deployment information available at 
the respective time. It is possible that some
+   // of the partitions to be consumed 
have not been created yet. These are updated
+   // runtime via the update messages.
+   //
+   // TODO The current approach may send 
many update messages even though the consuming
+   // task has already been deployed with 
all necessary information. We have to check
+   // whether this is a problem and fix 
it, if it is.
+   CompletableFuture.supplyAsync(
 
 Review comment:
   The body of the introduced if is quite big, could we move it into a separate 
method, like `scheduleConsumer`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid resource deadlocks for finite stream jobs when resources are limited
> --
>
> Key: FLINK-10945
> URL: https://issues.apache.org/jira/browse/FLINK-10945
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.1
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>
> Currently *resource deadlocks* can happen to finite stream jobs(or batch 
> jobs) when resources are limited. In 2 cases as below:
>  # Task Y is a pipelined downstream task of task X. When X takes all 
> resources(slots), Y cannot acquire slots 

[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715278#comment-16715278
 ] 

ASF GitHub Bot commented on FLINK-10945:


azagrebin commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r240312767
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
 ##
 @@ -36,6 +36,11 @@
 
private List> consumers;
 
+   /**
+* Whether this partition has data produced. For pipelined result only.
 
 Review comment:
   Whether this partition has produced some data. For pipelined result only.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid resource deadlocks for finite stream jobs when resources are limited
> --
>
> Key: FLINK-10945
> URL: https://issues.apache.org/jira/browse/FLINK-10945
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.1
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>
> Currently *resource deadlocks* can happen to finite stream jobs(or batch 
> jobs) when resources are limited. In 2 cases as below:
>  # Task Y is a pipelined downstream task of task X. When X takes all 
> resources(slots), Y cannot acquire slots to start, thus the back pressure 
> will block X to finish
>  # Task Y is a upstream task of task X. When X takes all resources(slots) and 
> Y cannot start, X cannot finish as some of its inputs are not finished
>  
> We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline 
> back pressure. However, case 2 cannot be avoided as X(downstream task) will 
> be launched when any of its input result is ready.
> To be detailed, say task X has BLOCKING upstream task Y and Z, X can be 
> launched when Z finishes, though task Y is not launched yet. This pre-launch 
> behaviour can be beneficial when there are plenty of resources, thus X can 
> process data from Z earlier before Y finishes its data processing. However, 
> resource deadlocks may happen when the resources are limited, e.g. in small 
> sessions.
>  
> I’d propose introducing a constraint named as *InputDependencyConstraint* to 
> control the scheduling of vertices. It has 2 values:
>  # *ANY*. The vertex can be scheduled when any of its inputs is consumable.
>  # *ALL*. The vertex can be scheduled when all of its inputs are consumable.
>  
> The design doc is here. 
> [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715276#comment-16715276
 ] 

ASF GitHub Bot commented on FLINK-10945:


azagrebin commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r240311956
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
 ##
 @@ -36,6 +36,11 @@
 
private List> consumers;
 
+   /**
+* Whether this partition has data produced. For pipelined result only.
+*/
+   private boolean dataProduced = false;
 
 Review comment:
   I would call it `isSomePipelinedDataProduced`.
   Also methid: `markSomePipelinedDataProduced`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid resource deadlocks for finite stream jobs when resources are limited
> --
>
> Key: FLINK-10945
> URL: https://issues.apache.org/jira/browse/FLINK-10945
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.1
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>
> Currently *resource deadlocks* can happen to finite stream jobs(or batch 
> jobs) when resources are limited. In 2 cases as below:
>  # Task Y is a pipelined downstream task of task X. When X takes all 
> resources(slots), Y cannot acquire slots to start, thus the back pressure 
> will block X to finish
>  # Task Y is a upstream task of task X. When X takes all resources(slots) and 
> Y cannot start, X cannot finish as some of its inputs are not finished
>  
> We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline 
> back pressure. However, case 2 cannot be avoided as X(downstream task) will 
> be launched when any of its input result is ready.
> To be detailed, say task X has BLOCKING upstream task Y and Z, X can be 
> launched when Z finishes, though task Y is not launched yet. This pre-launch 
> behaviour can be beneficial when there are plenty of resources, thus X can 
> process data from Z earlier before Y finishes its data processing. However, 
> resource deadlocks may happen when the resources are limited, e.g. in small 
> sessions.
>  
> I’d propose introducing a constraint named as *InputDependencyConstraint* to 
> control the scheduling of vertices. It has 2 values:
>  # *ANY*. The vertex can be scheduled when any of its inputs is consumable.
>  # *ALL*. The vertex can be scheduled when all of its inputs are consumable.
>  
> The design doc is here. 
> [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715275#comment-16715275
 ] 

ASF GitHub Bot commented on FLINK-10945:


azagrebin commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r240321678
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
 ##
 @@ -192,6 +193,7 @@ private static ExecutionVertex 
mockExecutionVertex(ExecutionState state, Resourc
 
private static IntermediateResultPartition 
mockPartition(ExecutionVertex producer) {
IntermediateResultPartition partition = 
mock(IntermediateResultPartition.class);
+   
when(partition.getResultType()).thenReturn(ResultPartitionType.PIPELINED);
 
 Review comment:
   Is this change needed? `isConsumable` should be still enough.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid resource deadlocks for finite stream jobs when resources are limited
> --
>
> Key: FLINK-10945
> URL: https://issues.apache.org/jira/browse/FLINK-10945
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.1
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>
> Currently *resource deadlocks* can happen to finite stream jobs(or batch 
> jobs) when resources are limited. In 2 cases as below:
>  # Task Y is a pipelined downstream task of task X. When X takes all 
> resources(slots), Y cannot acquire slots to start, thus the back pressure 
> will block X to finish
>  # Task Y is a upstream task of task X. When X takes all resources(slots) and 
> Y cannot start, X cannot finish as some of its inputs are not finished
>  
> We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline 
> back pressure. However, case 2 cannot be avoided as X(downstream task) will 
> be launched when any of its input result is ready.
> To be detailed, say task X has BLOCKING upstream task Y and Z, X can be 
> launched when Z finishes, though task Y is not launched yet. This pre-launch 
> behaviour can be beneficial when there are plenty of resources, thus X can 
> process data from Z earlier before Y finishes its data processing. However, 
> resource deadlocks may happen when the resources are limited, e.g. in small 
> sessions.
>  
> I’d propose introducing a constraint named as *InputDependencyConstraint* to 
> control the scheduling of vertices. It has 2 values:
>  # *ANY*. The vertex can be scheduled when any of its inputs is consumable.
>  # *ALL*. The vertex can be scheduled when all of its inputs are consumable.
>  
> The design doc is here. 
> [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715279#comment-16715279
 ] 

ASF GitHub Bot commented on FLINK-10945:


azagrebin commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r240313527
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -726,6 +728,56 @@ void sendPartitionInfos() {
}
}
 
+   /**
+* Check whether the InputDependencyConstraint is satisfied for this 
vertex.
+*
+* @return whether the input constraint is satisfied
+*/
+   public boolean checkInputDependencyConstraints() {
 
 Review comment:
   This might be a bit more concise:
   ```
  boolean checkInputDependencyConstraints() {
if (getExecutionGraph().getInputDependencyConstraint() == 
InputDependencyConstraint.ANY) {
// InputDependencyConstraint == ANY
return 
jobVertex.getInputs().stream().anyMatch(this::isInputConsumable);
} else {
// InputDependencyConstraint == ALL
return 
jobVertex.getInputs().stream().allMatch(this::isInputConsumable);
}
}
   
boolean isInputConsumable(IntermediateResult result) {
if (result.getResultType().isPipelined()) {
// For PIPELINED result, the input is consumable if any 
result partition has produced records or is finished
return 
Arrays.stream(result.getPartitions()).anyMatch(IntermediateResultPartition::hasDataProduced);
} else {
// For BLOCKING result, the input is consumable if all 
the partitions in the result are finished
return result.areAllPartitionsFinished();
}
}
   ```
   I am not sure we need to check the `ANY` case at all. Just checking this 
theoretically changes current behaviour. On the other hand, at the moment, I 
think it is always true for `ANY` where we check it if 
`ScheduleMode.LAZY_FROM_SOURCES`. 
   
   I am also not sure that `ALL` config makes sense together with 
`ScheduleMode.EAGER`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Avoid resource deadlocks for finite stream jobs when resources are limited
> --
>
> Key: FLINK-10945
> URL: https://issues.apache.org/jira/browse/FLINK-10945
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.1
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
>
> Currently *resource deadlocks* can happen to finite stream jobs(or batch 
> jobs) when resources are limited. In 2 cases as below:
>  # Task Y is a pipelined downstream task of task X. When X takes all 
> resources(slots), Y cannot acquire slots to start, thus the back pressure 
> will block X to finish
>  # Task Y is a upstream task of task X. When X takes all resources(slots) and 
> Y cannot start, X cannot finish as some of its inputs are not finished
>  
> We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline 
> back pressure. However, case 2 cannot be avoided as X(downstream task) will 
> be launched when any of its input result is ready.
> To be detailed, say task X has BLOCKING upstream task Y and Z, X can be 
> launched when Z finishes, though task Y is not launched yet. This pre-launch 
> behaviour can be beneficial when there are plenty of resources, thus X can 
> process data from Z earlier before Y finishes its data processing. However, 
> resource deadlocks may happen when the resources are limited, e.g. in small 
> sessions.
>  
> I’d propose introducing a constraint named as *InputDependencyConstraint* to 
> control the scheduling of vertices. It has 2 values:
>  # *ANY*. The vertex can be scheduled when any of its inputs is consumable.
>  # *ALL*. The vertex can be scheduled when all of its inputs are consumable.
>  
> The design doc is here. 
> [https://docs.google.com/document/d/1jpqC7OW_nLOSVOg06_QCWelicVtV6Au0Krg5m_S4kjY/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715299#comment-16715299
 ] 

ASF GitHub Bot commented on FLINK-11074:


walterddr commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240321103
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##
 @@ -491,13 +489,75 @@ class HarnessTestBase {
 distinctCountFuncName,
 distinctCountAggCode)
 
+  def createHarnessTester[KEY, IN, OUT](
+  dataStream: DataStream[_],
+  prefixOperatorName: String)
+  : KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = {
+
+val transformation = extractExpectedTransformation(
+  dataStream.javaStream.getTransformation,
+  prefixOperatorName).asInstanceOf[OneInputTransformation[_, _]]
+if (transformation == null) {
+  throw new Exception("Can not find the expected transformation")
+}
+
+val processOperator = 
transformation.getOperator.asInstanceOf[OneInputStreamOperator[IN, OUT]]
+val keySelector = 
transformation.getStateKeySelector.asInstanceOf[KeySelector[IN, KEY]]
+val keyType = 
transformation.getStateKeyType.asInstanceOf[TypeInformation[KEY]]
+
+createHarnessTester(processOperator, keySelector, keyType)
+  .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT]]
+  }
+
+  private def extractExpectedTransformation(
+  transformation: StreamTransformation[_],
+  prefixOperatorName: String): StreamTransformation[_] = {
+def extractFromInputs(inputs: StreamTransformation[_]*): 
StreamTransformation[_] = {
+  for (input <- inputs) {
+val t = extractExpectedTransformation(input, prefixOperatorName)
+if (t != null) {
+  return t
+}
+  }
+  null
+}
+
+transformation match {
+  case one: OneInputTransformation[_, _] =>
+if (one.getName.startsWith(prefixOperatorName)) {
+  one
+} else {
+  extractExpectedTransformation(one.getInput, prefixOperatorName)
+}
+  case two: TwoInputTransformation[_, _, _] =>
 
 Review comment:
   Let's throw unsupported operation for now, since there's no code path that 
executes two input transform yet. we can always add this logic later when 
necessary. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve the harness test to make it possible test with state backend
> 
>
> Key: FLINK-11074
> URL: https://issues.apache.org/jira/browse/FLINK-11074
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the harness test can only test without state backend. If you use a 
> DataView in the accumulator of the aggregate function, the DataView is a java 
> object and held in heap, not replaced with StateMapView/StateListView which 
> values are actually held in the state backend. We should improve the harness 
> test to make it possible to test with state backend. Otherwise, issues such 
> as FLINK-10674 could have never been found. With this harness test available, 
> we could test the built-in aggregate functions which use the DataView more 
> fine grained.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11070) Add stream-stream non-window cross join

2018-12-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715884#comment-16715884
 ] 

Hequn Cheng edited comment on FLINK-11070 at 12/11/18 1:06 AM:
---

[~fhueske] Hi, thanks for the feedback and sharing the thoughts.

As for the example, I think I should add more details.
 * Table2 and table3 are very small tables, such as a table with only one row. 
And table1 is a very big table, such as with 1M rows.
 * All these rows contain same join keys.

So the cost for cross join and non-cross join would be 1*1+1*1M(cross join) VS 
1*1M+1M*1(non-cross join). This means the query may be executed with a cross 
join with a better performance.

Adding a switch is a good idea. This force our users to pay more attention to 
the performance of cross join. But it may also bring some inconvenience. We 
can't remove the switch even Flink support join reordering. Because there is a 
chance the cardinality estimates have not been passed by the user. So once we 
can't get the cardinality estimates, the user has to configure the switch to 
enable a cross join if he does want to use cross join. From this point of view, 
I think we should not have the switch. 
 I would propose that:
 * Don't enable join reordering in general because reordering without 
cardinality estimates is gambling
 * Trust the query written by the user as we don't have cardinality estimates. 
And we don't need to add a switch to bring inconvenience to the user. 

What do you think?

To get a better performance, I think making the join parallel is a good idea. I 
will take an investigate on it. Thanks a lot for your suggestions. 

Best, Hequn


was (Author: hequn8128):
[~fhueske] Hi, thanks for the feedback and sharing the thoughts.

As for the example, I think I should add more details.
 * Table2 and table3 are very small tables, such as a table with only one row. 
And table1 is a very big table, such as with 1M rows.
 * All these rows contain same join keys.

So the cost for cross join and non-cross join would be 1*1*1M(cross join) VS 
1*1M*1M(non-cross join). This means the query may be executed with a cross join 
with a much better performance.

Adding a switch is a good idea. This force our users to pay more attention to 
the performance of cross join. But it may also bring some inconvenience. We 
can't remove the switch even Flink support join reordering. Because there is a 
chance the cardinality estimates have not been passed by the user. So once we 
can't get the cardinality estimates, the user has to configure the switch to 
enable a cross join if he does want to use cross join. From this point of view, 
I think we should not have the switch. 
I would propose that:
 * Don't enable join reordering in general because reordering without 
cardinality estimates is gambling
 * Trust the query written by the user as we don't have cardinality estimates. 
And we don't need to add a switch to bring inconvenience to the user. 

What do you think?

To get a better performance, I think making the join parallel is a good idea. I 
will take an investigate on it. Thanks a lot for your suggestions. 

Best, Hequn

> Add stream-stream non-window cross join
> ---
>
> Key: FLINK-11070
> URL: https://issues.apache.org/jira/browse/FLINK-11070
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code:java}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   

[jira] [Commented] (FLINK-11039) LogicalExchange and HashPartitioner realization

2018-12-10 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715635#comment-16715635
 ] 

Fabian Hueske commented on FLINK-11039:
---

Hi [~ambition],

I still don't understand the purpose of this issue. 
Can you share a query that fails due to missing support of {{LogicalExchange}}?

Thank you,
Fabian

> LogicalExchange and HashPartitioner realization
> ---
>
> Key: FLINK-11039
> URL: https://issues.apache.org/jira/browse/FLINK-11039
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0
>Reporter: ambition
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> RelTimeIndicatorConverter.visit(exchange: LogicalExchange) function 
> realization.
> FlinkLogicalExchange realization 
> org.apache.calcite.rel.logical.LogicalExchange.
> HashPartitioner is Partitioner that implements hash-based partitioning using 
> Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type 
> HASH_DISTRIBUTED



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715703#comment-16715703
 ] 

ASF GitHub Bot commented on FLINK-10974:


sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for 
flatMap to table API
URL: https://github.com/apache/flink/pull/7196#issuecomment-446002606
 
 
   Thanks for the update! @dianfu 
   LGTM. +1 to merged


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-12-10 Thread GitBox
sunjincheng121 commented on issue #7196: [FLINK-10974] [table] Add support for 
flatMap to table API
URL: https://github.com/apache/flink/pull/7196#issuecomment-446002606
 
 
   Thanks for the update! @dianfu 
   LGTM. +1 to merged


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 opened a new pull request #7269: [FLINK-11123][docs] fix the import of the class is missing in ml quic…

2018-12-10 Thread GitBox
sunjincheng121 opened a new pull request #7269: [FLINK-11123][docs] fix the 
import of the class is missing in ml quic…
URL: https://github.com/apache/flink/pull/7269
 
 
   ## What is the purpose of the change
   
   *This PR fix the import of the class is missing in ml quick start document.*
   
   
   ## Brief change log
 - *Add `import org.apache.flink.ml.math.Vector` in ml quick start doc.*
   
   ## Verifying this change
   This change is a documentation improvement without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )
 - The S3 file system connector: (no)
   
   ## Documentation
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11123) Improve ml quick start doc

2018-12-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11123:
---
Labels: pull-request-available  (was: )

> Improve ml quick start doc
> --
>
> Key: FLINK-11123
> URL: https://issues.apache.org/jira/browse/FLINK-11123
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Machine Learning Library
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.7.1
>
>
> The user cannot run the sample through the ml quick launch document because 
> the import description of the class is missing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11123) Improve ml quick start doc

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715871#comment-16715871
 ] 

ASF GitHub Bot commented on FLINK-11123:


sunjincheng121 opened a new pull request #7269: [FLINK-11123][docs] fix the 
import of the class is missing in ml quic…
URL: https://github.com/apache/flink/pull/7269
 
 
   ## What is the purpose of the change
   
   *This PR fix the import of the class is missing in ml quick start document.*
   
   
   ## Brief change log
 - *Add `import org.apache.flink.ml.math.Vector` in ml quick start doc.*
   
   ## Verifying this change
   This change is a documentation improvement without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )
 - The S3 file system connector: (no)
   
   ## Documentation
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve ml quick start doc
> --
>
> Key: FLINK-11123
> URL: https://issues.apache.org/jira/browse/FLINK-11123
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Machine Learning Library
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.7.1
>
>
> The user cannot run the sample through the ml quick launch document because 
> the import description of the class is missing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11123) Improve ml quick start doc

2018-12-10 Thread sunjincheng (JIRA)
sunjincheng created FLINK-11123:
---

 Summary: Improve ml quick start doc
 Key: FLINK-11123
 URL: https://issues.apache.org/jira/browse/FLINK-11123
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Machine Learning Library
Affects Versions: 1.7.0
Reporter: sunjincheng
Assignee: sunjincheng
 Fix For: 1.7.1, 1.7.0


The user cannot run the sample through the ml quick launch document because the 
import description of the class is missing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11117) Migrate connector classes to flink-table-spi

2018-12-10 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716330#comment-16716330
 ] 

Timo Walther commented on FLINK-7:
--

[~yanghua] This issue is a duplicate of FLINK-10688.

> Migrate connector classes to flink-table-spi
> 
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> This issue covers the fifth step of the migration plan mentioned in 
> [FLIP-28|https://cwiki.apache.org/confluence/display/FLINK/FLIP-28%3A+Long-term+goal+of+making+flink-table+Scala-free].
> Once we implemented improvements to the [unified connector 
> interface|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf],
>  we can also migrate the classes. Among others, it requires a refactoring of 
> the timestamp extractors which are the biggest blockers because they 
> transitively depending on expressions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10774) connection leak when partition discovery is disabled and open throws exception

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716328#comment-16716328
 ] 

ASF GitHub Bot commented on FLINK-10774:


stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when 
partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#issuecomment-446086480
 
 
   @tillrohrmann Thanks a lot for the feedbacks. I made the changes according 
to your comments. please take a look and see if I miss anything


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> connection leak when partition discovery is disabled and open throws exception
> --
>
> Key: FLINK-10774
> URL: https://issues.apache.org/jira/browse/FLINK-10774
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2, 1.5.5, 1.6.2
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
>Priority: Major
>  Labels: pull-request-available
>
> Here is the scenario to reproduce the issue
>  * partition discovery is disabled
>  * open method throws an exception (e.g. when broker SSL authorization denies 
> request)
> In this scenario, run method won't be executed. As a result, 
> _partitionDiscoverer.close()_ won't be called. that caused the connection 
> leak, because KafkaConsumer is initialized but not closed. That has caused 
> outage that brought down our Kafka cluster, when a high-parallelism job got 
> into a restart/failure loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716331#comment-16716331
 ] 

vinoyang commented on FLINK-11081:
--

[~walterddr]  Thank you for recognizing the value of this issue. In fact, its 
implementation emulates the traditional way Flink supports port ranges, such as 
"taskmanager.rpc.port". It allows to specify:
 * Single port (eg 8081, 0 means randomly picked by jm)
 * The port list (12345, 1234) will be selected in order in this case.
 * Port range (12345-12346), in this case will also be selected in order

The specific implementation verifies the validity of the port and detects 
conflict exception.

> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread GitBox
samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is 
inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-446041717
 
 
   i got the same problem too:
   val format = new SimpleDateFormat("-MM-dd HH:mm:ssZ")
   val origin: DataStream[TransactionEvent] = env.fromCollection(List(
 TransactionEvent("u1", format.parse("2018-01-02 01:13:30+0800"), 10)
   ))
   val source2 = origin
 .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[TransactionEvent](Time.minutes(1)){
   override def extractTimestamp(element: TransactionEvent): Long = {
 val timestamp = element.time.getTime
 println(s"extractTimestamp:$timestamp")
 timestamp
   }
 })
   tEnv.fromDataStream(source2, 'user,'eventTime.rowtime)
 .toAppendStream[Row].print()
   
   I got eventTime as 2018-01-01 17:13:30.0, which is 8hours delayed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715949#comment-16715949
 ] 

ASF GitHub Bot commented on FLINK-11010:


samsai commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is 
inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-446041717
 
 
   i got the same problem too:
   val format = new SimpleDateFormat("-MM-dd HH:mm:ssZ")
   val origin: DataStream[TransactionEvent] = env.fromCollection(List(
 TransactionEvent("u1", format.parse("2018-01-02 01:13:30+0800"), 10)
   ))
   val source2 = origin
 .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[TransactionEvent](Time.minutes(1)){
   override def extractTimestamp(element: TransactionEvent): Long = {
 val timestamp = element.time.getTime
 println(s"extractTimestamp:$timestamp")
 timestamp
   }
 })
   tEnv.fromDataStream(source2, 'user,'eventTime.rowtime)
 .toAppendStream[Row].print()
   
   I got eventTime as 2018-01-01 17:13:30.0, which is 8hours delayed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11070) Add stream-stream non-window cross join

2018-12-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716015#comment-16716015
 ] 

Hequn Cheng commented on FLINK-11070:
-

[~fhueske] Thanks for your quick reply! 
Ok, I got your point. It also makes sense to notify users about the potential 
performance risk. Once we support join reordering, we can remove the switches.

Best, Hequn

> Add stream-stream non-window cross join
> ---
>
> Key: FLINK-11070
> URL: https://issues.apache.org/jira/browse/FLINK-11070
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code:java}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> In order to support more queries, it would be nice to have cross join on 
> streaming. We can start from a simple version, for example, call 
> forceNonParallel() for connectOperator in `DataStreamJoin` when it is a cross 
> join. The performance may be bad. But it works fine if the two tables of 
> cross join are small ones. 
> We can do some optimizations later, such as broadcasting the smaller side, 
> etc.
> Any suggestions are greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11124) Add private[flink] to TemporalTableFunction.create()

2018-12-10 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-11124:
---

 Summary: Add private[flink] to TemporalTableFunction.create()
 Key: FLINK-11124
 URL: https://issues.apache.org/jira/browse/FLINK-11124
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Hequn Cheng
Assignee: Hequn Cheng


{{TemporalTableFunction}} is an user-oriented class. I think it would be better 
to add {{private[flink]}} to the {{TemporalTableFunction.create()}} method in 
order to make it invisible to users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators

2018-12-10 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716101#comment-16716101
 ] 

sunjincheng commented on FLINK-10543:
-

Hi [~fhueske] from the point of compatibility I agree this change should not cp 
to release-1.7.  I'll revert the change from release-1.7.

Thank you for reminding we only back port bug fixes to release branches that do 
not break compatibility.

I think the improvement is necessary on the master to reduce the storage and 
access pressure on useless states.

If you find there are any incorrect semantics changes, we can file new JIRA. to 
fix.  

Thanks, Jincheng

 

> Leverage efficient timer deletion in relational operators
> -
>
> Key: FLINK-10543
> URL: https://issues.apache.org/jira/browse/FLINK-10543
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0, 1.7.1
>
>
> FLINK-9423 added support for efficient timer deletions. This feature is 
> available since Flink 1.6 and should be used by the relational operator of 
> SQL and Table API.
> Currently, we use a few workarounds to handle situations when deleting timers 
> would be the better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators

2018-12-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716116#comment-16716116
 ] 

Hequn Cheng commented on FLINK-10543:
-

[~fhueske] Hi, would be nice if you can take a look at the PR.
The pr mainly include the following changes:
 * Delete expired timers in {{ProcessFunctionWithCleanupState}}.
 * Add {{CoProcessFunctionWithCleanupState}} for join. This can align the 
cleanup logic in join with other operators.
 * Use one {{ValueState}} to control clean up instead of two, i.e, take left 
and right state of join as a whole.
 * Leverage min and max retention time to clean up the state in join while 
before, join regist clean up timer and clean up the state in a fixed interval. 
In the new version, we can remove all records once in all as new records will 
refresh the timer. 

Best, Hequn

> Leverage efficient timer deletion in relational operators
> -
>
> Key: FLINK-10543
> URL: https://issues.apache.org/jira/browse/FLINK-10543
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0, 1.7.1
>
>
> FLINK-9423 added support for efficient timer deletions. This feature is 
> available since Flink 1.6 and should be used by the relational operator of 
> SQL and Table API.
> Currently, we use a few workarounds to handle situations when deleting timers 
> would be the better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11125) Remove useless import

2018-12-10 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-11125:
---

 Summary: Remove useless import 
 Key: FLINK-11125
 URL: https://issues.apache.org/jira/browse/FLINK-11125
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL, Tests
Reporter: Hequn Cheng
Assignee: Hequn Cheng






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10943) Flink runtime test failed caused by curator dependency conflicts

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716150#comment-16716150
 ] 

ASF GitHub Bot commented on FLINK-10943:


link3280 closed pull request #7148: [FLINK-10943][tests] Fix test failures in 
flink runtime caused by curator dependency conflicts
URL: https://github.com/apache/flink/pull/7148
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index d216851c2d1..0cab4feabda 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -72,6 +72,12 @@ under the License.
flink-shaded-hadoop2
${project.version}
true
+   
+   
+   org.apache.curator
+   curator-client
+   
+   

 



 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink runtime test failed caused by curator dependency conflicts
> 
>
> Key: FLINK-10943
> URL: https://issues.apache.org/jira/browse/FLINK-10943
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Tests
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Paul Lin
>Assignee: Paul Lin
>Priority: Minor
>  Labels: pull-request-available
> Attachments: 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt
>
>
> Hadoop-common of 2.6 + version includes curator dependencies, which would 
> have conflicts with the curator used by Flink runtime and cause test failures 
> (the attachment is the surefire report). 
> The curator dependencies tree of flink runtime is as below:
> ```
> flink-shaded-hadoop2 -> hadoop-common -> curator-client & curator-recipes
> flink-shaded-curator -> curator-recipes -> curator-framework -> curator-client
> ```
> According to the dependency mechanism, maven would pick the curator-client in 
> flink-shaded-hadoop2, and curator-framework and curator-recipes from 
> flink-shaded-curator.
> To fix the problem I think we can simply exclude curator-client from 
> flink-shaded-hadoop2 dependency in flink-runtime.
> I'd like to fix this, please let me know what you think. Thanks!
> [^org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11090) Unused parameter in WindowedStream.aggregate()

2018-12-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716146#comment-16716146
 ] 

Hequn Cheng commented on FLINK-11090:
-

Hi [~aljoscha] thanks for telling the differences between \{{PublicEvolving}} 
and \{{Public}}. It would be nice to remove the useless parameter directly. I 
will give a pr soon. 

Thanks, Hequn

> Unused parameter in WindowedStream.aggregate()
> --
>
> Key: FLINK-11090
> URL: https://issues.apache.org/jira/browse/FLINK-11090
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> The {{aggregateResultType}} parameter in {{WindowedStream.aggregate()}} seems 
> useless. Or what have I missed?
> If it is useless, I prefer to remove the parameter by adding a new API and 
> deprecate the current one. We can't remove it directly as it is 
> PublicEvolving.
> {code:java}
>   @PublicEvolving
>   public  SingleOutputStreamOperator aggregate(
>   AggregateFunction aggregateFunction,
>   ProcessWindowFunction windowFunction,
>   TypeInformation accumulatorType,
>   TypeInformation aggregateResultType,
>   TypeInformation resultType) {
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716154#comment-16716154
 ] 

ASF GitHub Bot commented on FLINK-11048:


tweise commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r240470550
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   Agreed on the public method contract. I rearranged the code to keep the 
existing method as is.
   
   The setter is an alternative option for users that already have the remote 
environment instance (instead of having to deal with an 8 parameter static 
method). I would like to keep this as is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-10 Thread GitBox
tweise commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r240470550
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
 
 Review comment:
   Agreed on the public method contract. I rearranged the code to keep the 
existing method as is.
   
   The setter is an alternative option for users that already have the remote 
environment instance (instead of having to deal with an 8 parameter static 
method). I would like to keep this as is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716215#comment-16716215
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r240476864
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+   private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+   private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
+
+   // --- Cassandra Fields 
---
 
private final ClusterBuilder builder;
 
-   private final AtomicInteger updatesPending = new AtomicInteger();
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   //  Synchronization Fields 

+
+   private AtomicReference throwable;
+   private Semaphore semaphore;
+   private Phaser phaser;
 
CassandraSinkBase(ClusterBuilder builder) {
this.builder = builder;
ClosureCleaner.clean(builder, true);
}
 
-   @Override
-   public void open(Configuration configuration) {
-   this.callback = new FutureCallback() {
-   @Override
-   public void onSuccess(V ignored) {
-   int pending = updatesPending.decrementAndGet();
-   if (pending == 0) {
-   synchronized (updatesPending) {
-   updatesPending.notifyAll();
-   }
-   }
-   }
+   // - Sink Methods 
-
 
+   @Override
+   public void open(Configuration parameters) {
+   cluster = createCluster();
+   session = createSession();
+
+   throwable = new AtomicReference<>();
+   semaphore = new Semaphore(maxConcurrentRequests);
+   /*
+* A Phaser is a flexible and reusable synchronization barrier 
similar to CyclicBarrier and CountDownLatch.
+*
+* This Phaser is configured to support "1 + N" parties.
+*   - "1" for the CassandraSinkBase to arrive and to await at 
the Phaser during a flush() call.
+*   - "N" for the varying number of invoke() calls that 
register and de-register with the Phaser.
+*
+* The Phaser awaits the completion of the advancement of a 
phase prior to returning from a register() call.
+* This behavior ensures that no backlogged invoke() calls 
register to execute while the Semaphore's permits
+* are being released during a flush() call.
+*/
+   phaser = new Phaser(1) {
 
 Review comment:
   @azagrebin @zentol Okay. Yeah. Then we don't need a barrier. The semaphore 
would be sufficient.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For 

[GitHub] jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-12-10 Thread GitBox
jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r240476864
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+   private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+   private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
+
+   // --- Cassandra Fields 
---
 
private final ClusterBuilder builder;
 
-   private final AtomicInteger updatesPending = new AtomicInteger();
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   //  Synchronization Fields 

+
+   private AtomicReference throwable;
+   private Semaphore semaphore;
+   private Phaser phaser;
 
CassandraSinkBase(ClusterBuilder builder) {
this.builder = builder;
ClosureCleaner.clean(builder, true);
}
 
-   @Override
-   public void open(Configuration configuration) {
-   this.callback = new FutureCallback() {
-   @Override
-   public void onSuccess(V ignored) {
-   int pending = updatesPending.decrementAndGet();
-   if (pending == 0) {
-   synchronized (updatesPending) {
-   updatesPending.notifyAll();
-   }
-   }
-   }
+   // - Sink Methods 
-
 
+   @Override
+   public void open(Configuration parameters) {
+   cluster = createCluster();
+   session = createSession();
+
+   throwable = new AtomicReference<>();
+   semaphore = new Semaphore(maxConcurrentRequests);
+   /*
+* A Phaser is a flexible and reusable synchronization barrier 
similar to CyclicBarrier and CountDownLatch.
+*
+* This Phaser is configured to support "1 + N" parties.
+*   - "1" for the CassandraSinkBase to arrive and to await at 
the Phaser during a flush() call.
+*   - "N" for the varying number of invoke() calls that 
register and de-register with the Phaser.
+*
+* The Phaser awaits the completion of the advancement of a 
phase prior to returning from a register() call.
+* This behavior ensures that no backlogged invoke() calls 
register to execute while the Semaphore's permits
+* are being released during a flush() call.
+*/
+   phaser = new Phaser(1) {
 
 Review comment:
   @azagrebin @zentol Okay. Yeah. Then we don't need a barrier. The semaphore 
would be sufficient.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11060) Unable to set number of task manager and slot per task manager in scala shell local mode

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716285#comment-16716285
 ] 

ASF GitHub Bot commented on FLINK-11060:


zjffdu commented on issue #7229: [FLINK-11060][scala-shell] Unable to set 
number of task manager and slot per task manager in scala shell local mode
URL: https://github.com/apache/flink/pull/7229#issuecomment-446082679
 
 
   @yanghua @tillrohrmann Could you help review it ? Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Unable to set number of task manager and slot per task manager in scala shell 
> local mode
> 
>
> Key: FLINK-11060
> URL: https://issues.apache.org/jira/browse/FLINK-11060
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.7.0
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>
> In scala-shell, I can not change the number of task manager and slot per task 
> manager, it is hard coded to 1. I can not specify them in flink-conf.yaml



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zjffdu commented on issue #6236: [FLINK-9699] [table] Add api to replace registered table

2018-12-10 Thread GitBox
zjffdu commented on issue #6236: [FLINK-9699] [table] Add api to replace 
registered table
URL: https://github.com/apache/flink/pull/6236#issuecomment-446082836
 
 
   @hequn8128 @yanghua PR is updated, could you help review it ? Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when partition discovery is disabled an…

2018-12-10 Thread GitBox
stevenzwu commented on issue #7020: [FLINK-10774] [Kafka] connection leak when 
partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#issuecomment-446086480
 
 
   @tillrohrmann Thanks a lot for the feedbacks. I made the changes according 
to your comments. please take a look and see if I miss anything


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on issue #7270: [hotfix][docs] Fix typo in Joining documentation

2018-12-10 Thread GitBox
tzulitai commented on issue #7270: [hotfix][docs] Fix typo in Joining 
documentation
URL: https://github.com/apache/flink/pull/7270#issuecomment-446090988
 
 
   LGTM, merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-11010:
---
Affects Version/s: 1.8.0

> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread lamber-ken (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated FLINK-11010:
---
Affects Version/s: 1.7.1
   1.7.0

> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zzchun opened a new pull request #7270: [hotfix][docs] Fix typo in Joining documentation

2018-12-10 Thread GitBox
zzchun opened a new pull request #7270: [hotfix][docs] Fix typo in Joining 
documentation
URL: https://github.com/apache/flink/pull/7270
 
 
   ## What is the purpose of the change
   
   Fix typo in Joining documentation
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 opened a new pull request #7271: [FLINK-11124][table] Add private[flink] to TemporalTableFunction.create() method

2018-12-10 Thread GitBox
hequn8128 opened a new pull request #7271: [FLINK-11124][table] Add 
private[flink] to TemporalTableFunction.create() method
URL: https://github.com/apache/flink/pull/7271
 
 
   
   ## What is the purpose of the change
   
   This pull request adds `private[flink]` to `TemporalTableFunction.create()` 
method. As `TemporalTableFunction` is an user-oriented class. I think it would 
be better to add `private[flink]` to the `TemporalTableFunction.create()` 
method in order to make it invisible to users.
   
   
   ## Brief change log
   
 - Add private[flink] to TemporalTableFunction.create()
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11124) Add private[flink] to TemporalTableFunction.create()

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716152#comment-16716152
 ] 

ASF GitHub Bot commented on FLINK-11124:


hequn8128 opened a new pull request #7271: [FLINK-11124][table] Add 
private[flink] to TemporalTableFunction.create() method
URL: https://github.com/apache/flink/pull/7271
 
 
   
   ## What is the purpose of the change
   
   This pull request adds `private[flink]` to `TemporalTableFunction.create()` 
method. As `TemporalTableFunction` is an user-oriented class. I think it would 
be better to add `private[flink]` to the `TemporalTableFunction.create()` 
method in order to make it invisible to users.
   
   
   ## Brief change log
   
 - Add private[flink] to TemporalTableFunction.create()
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add private[flink] to TemporalTableFunction.create()
> 
>
> Key: FLINK-11124
> URL: https://issues.apache.org/jira/browse/FLINK-11124
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> {{TemporalTableFunction}} is an user-oriented class. I think it would be 
> better to add {{private[flink]}} to the {{TemporalTableFunction.create()}} 
> method in order to make it invisible to users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11124) Add private[flink] to TemporalTableFunction.create()

2018-12-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11124:
---
Labels: pull-request-available  (was: )

> Add private[flink] to TemporalTableFunction.create()
> 
>
> Key: FLINK-11124
> URL: https://issues.apache.org/jira/browse/FLINK-11124
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> {{TemporalTableFunction}} is an user-oriented class. I think it would be 
> better to add {{private[flink]}} to the {{TemporalTableFunction.create()}} 
> method in order to make it invisible to users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11081) Support binding port range for REST server

2018-12-10 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716173#comment-16716173
 ] 

Rong Rong commented on FLINK-11081:
---

+1 to this feature. Very helpful actually for lots of our use cases. just quick 
higher level questions, 1) what is the binding strategy of the port range to 
the actual port number?  (smallest? random?); and 2) should we introduce a 
range limit to maximum range size?

> Support binding port range for REST server
> --
>
> Key: FLINK-11081
> URL: https://issues.apache.org/jira/browse/FLINK-11081
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently the {{RestServerEndpoint}} binds to the port specified by 
> {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be 
> useful to being able to specify not only a single port but a port range to 
> pick a port from. Therefore, I propose to add similar to 
> {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which 
> allows to specify a port range for the {{RestServerEndpoint}} to pick a port 
> from. {{RestOptions#PORT}} would then only be used by the client to connect 
> to the started {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11039) LogicalExchange and HashPartitioner realization

2018-12-10 Thread ambition (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ambition closed FLINK-11039.

Resolution: Fixed

> LogicalExchange and HashPartitioner realization
> ---
>
> Key: FLINK-11039
> URL: https://issues.apache.org/jira/browse/FLINK-11039
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0
>Reporter: ambition
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> RelTimeIndicatorConverter.visit(exchange: LogicalExchange) function 
> realization.
> FlinkLogicalExchange realization 
> org.apache.calcite.rel.logical.LogicalExchange.
> HashPartitioner is Partitioner that implements hash-based partitioning using 
> Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type 
> HASH_DISTRIBUTED



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] ambition119 closed pull request #7202: [FLINK-11039] LogicalExchange and HashPartitioner realization

2018-12-10 Thread GitBox
ambition119 closed pull request #7202: [FLINK-11039]  LogicalExchange and 
HashPartitioner realization
URL: https://github.com/apache/flink/pull/7202
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11039) LogicalExchange and HashPartitioner realization

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716238#comment-16716238
 ] 

ASF GitHub Bot commented on FLINK-11039:


ambition119 closed pull request #7202: [FLINK-11039]  LogicalExchange and 
HashPartitioner realization
URL: https://github.com/apache/flink/pull/7202
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> LogicalExchange and HashPartitioner realization
> ---
>
> Key: FLINK-11039
> URL: https://issues.apache.org/jira/browse/FLINK-11039
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0
>Reporter: ambition
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> RelTimeIndicatorConverter.visit(exchange: LogicalExchange) function 
> realization.
> FlinkLogicalExchange realization 
> org.apache.calcite.rel.logical.LogicalExchange.
> HashPartitioner is Partitioner that implements hash-based partitioning using 
> Java's `Object.hashCode`. support org.apache.calcite.rel.RelDistribution.Type 
> HASH_DISTRIBUTED



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11125) Remove useless import

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716317#comment-16716317
 ] 

ASF GitHub Bot commented on FLINK-11125:


hequn8128 opened a new pull request #7272: [FLINK-11125] remove unused import
URL: https://github.com/apache/flink/pull/7272
 
 
   
   ## What is the purpose of the change
   
   This pull request removes some unused imports in tests and flink-table.
   
   
   ## Brief change log
   
 - Remove unused import
   
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove useless import 
> --
>
> Key: FLINK-11125
> URL: https://issues.apache.org/jira/browse/FLINK-11125
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL, Tests
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11117) Migrate connector classes to flink-table-spi

2018-12-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716333#comment-16716333
 ] 

vinoyang commented on FLINK-7:
--

[~twalthr] oh, sorry, I will close it.

> Migrate connector classes to flink-table-spi
> 
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> This issue covers the fifth step of the migration plan mentioned in 
> [FLIP-28|https://cwiki.apache.org/confluence/display/FLINK/FLIP-28%3A+Long-term+goal+of+making+flink-table+Scala-free].
> Once we implemented improvements to the [unified connector 
> interface|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf],
>  we can also migrate the classes. Among others, it requires a refactoring of 
> the timestamp extractors which are the biggest blockers because they 
> transitively depending on expressions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11117) Migrate connector classes to flink-table-spi

2018-12-10 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang closed FLINK-7.

Resolution: Duplicate

> Migrate connector classes to flink-table-spi
> 
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> This issue covers the fifth step of the migration plan mentioned in 
> [FLIP-28|https://cwiki.apache.org/confluence/display/FLINK/FLIP-28%3A+Long-term+goal+of+making+flink-table+Scala-free].
> Once we implemented improvements to the [unified connector 
> interface|https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf],
>  we can also migrate the classes. Among others, it requires a refactoring of 
> the timestamp extractors which are the biggest blockers because they 
> transitively depending on expressions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11118) Refactor and unify rowtime timestamp extractor interface

2018-12-10 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang updated FLINK-8:
-
Issue Type: Task  (was: Sub-task)
Parent: (was: FLINK-7)

> Refactor and unify rowtime timestamp extractor interface
> 
>
> Key: FLINK-8
> URL: https://issues.apache.org/jira/browse/FLINK-8
> Project: Flink
>  Issue Type: Task
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction

2018-12-10 Thread GitBox
dianfu commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240445274
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Integer => JInt}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.dataview.{DataView, MapView}
+import org.apache.flink.table.dataview.StateMapView
+import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, 
GroupAggProcessFunction}
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class AggFunctionHarnessTest extends HarnessTestBase {
+  private val queryConfig = new TestStreamQueryConfig(Time.seconds(0), 
Time.seconds(0))
+
+  @Test
+  def testCollectAggregate(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val data = new mutable.MutableList[(JInt, String)]
+val t = env.fromCollection(data).toTable(tEnv, 'a, 'b)
+tEnv.registerTable("T", t)
+val sqlQuery = tEnv.sqlQuery(
+  s"""
+ |SELECT
+ |  b, collect(a)
+ |FROM (
+ |  SELECT a, b
+ |  FROM T
+ |  GROUP BY a, b
+ |) GROUP BY b
+ |""".stripMargin)
+
+val testHarness = createHarnessTester[String, CRow, CRow](
+  sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+
+testHarness.setStateBackend(getStateBackend)
+testHarness.open()
+
+val operator = getOperator(testHarness)
+val state = getState(operator, 
"acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]]
+assertTrue(state.isInstanceOf[StateMapView[_, _]])
+
assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]])
+
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "bbb"), 1))
+expectedOutput.add(new StreamRecord(CRow("bbb", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
1).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(2: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
2).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2, 2 -> 
1).asJava), 1))
+
+// remove some state: state may be cleaned up by the state backend if not 
accessed more than ttl
+operator.setCurrentKey(Row.of("aaa"))
+state.remove(2)
+
+// retract after state has been cleaned up
+testHarness.processElement(new StreamRecord(CRow(false, 2: JInt, "aaa"), 
1))
+
+val result = testHarness.getOutput
+
+verify(expectedOutput, result)
+
+testHarness.close()
+  }
+
+  private def getState(
 
 Review comment:
   Make sense. Done.


This is an automated 

[GitHub] dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction

2018-12-10 Thread GitBox
dianfu commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240445604
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##
 @@ -491,13 +489,75 @@ class HarnessTestBase {
 distinctCountFuncName,
 distinctCountAggCode)
 
+  def createHarnessTester[KEY, IN, OUT](
+  dataStream: DataStream[_],
+  prefixOperatorName: String)
+  : KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = {
+
+val transformation = extractExpectedTransformation(
+  dataStream.javaStream.getTransformation,
+  prefixOperatorName).asInstanceOf[OneInputTransformation[_, _]]
+if (transformation == null) {
+  throw new Exception("Can not find the expected transformation")
+}
+
+val processOperator = 
transformation.getOperator.asInstanceOf[OneInputStreamOperator[IN, OUT]]
+val keySelector = 
transformation.getStateKeySelector.asInstanceOf[KeySelector[IN, KEY]]
+val keyType = 
transformation.getStateKeyType.asInstanceOf[TypeInformation[KEY]]
+
+createHarnessTester(processOperator, keySelector, keyType)
+  .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT]]
+  }
+
+  private def extractExpectedTransformation(
+  transformation: StreamTransformation[_],
+  prefixOperatorName: String): StreamTransformation[_] = {
+def extractFromInputs(inputs: StreamTransformation[_]*): 
StreamTransformation[_] = {
+  for (input <- inputs) {
+val t = extractExpectedTransformation(input, prefixOperatorName)
+if (t != null) {
+  return t
+}
+  }
+  null
+}
+
+transformation match {
+  case one: OneInputTransformation[_, _] =>
+if (one.getName.startsWith(prefixOperatorName)) {
+  one
+} else {
+  extractExpectedTransformation(one.getInput, prefixOperatorName)
+}
+  case two: TwoInputTransformation[_, _, _] =>
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715934#comment-16715934
 ] 

ASF GitHub Bot commented on FLINK-11074:


dianfu commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240445604
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##
 @@ -491,13 +489,75 @@ class HarnessTestBase {
 distinctCountFuncName,
 distinctCountAggCode)
 
+  def createHarnessTester[KEY, IN, OUT](
+  dataStream: DataStream[_],
+  prefixOperatorName: String)
+  : KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = {
+
+val transformation = extractExpectedTransformation(
+  dataStream.javaStream.getTransformation,
+  prefixOperatorName).asInstanceOf[OneInputTransformation[_, _]]
+if (transformation == null) {
+  throw new Exception("Can not find the expected transformation")
+}
+
+val processOperator = 
transformation.getOperator.asInstanceOf[OneInputStreamOperator[IN, OUT]]
+val keySelector = 
transformation.getStateKeySelector.asInstanceOf[KeySelector[IN, KEY]]
+val keyType = 
transformation.getStateKeyType.asInstanceOf[TypeInformation[KEY]]
+
+createHarnessTester(processOperator, keySelector, keyType)
+  .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT]]
+  }
+
+  private def extractExpectedTransformation(
+  transformation: StreamTransformation[_],
+  prefixOperatorName: String): StreamTransformation[_] = {
+def extractFromInputs(inputs: StreamTransformation[_]*): 
StreamTransformation[_] = {
+  for (input <- inputs) {
+val t = extractExpectedTransformation(input, prefixOperatorName)
+if (t != null) {
+  return t
+}
+  }
+  null
+}
+
+transformation match {
+  case one: OneInputTransformation[_, _] =>
+if (one.getName.startsWith(prefixOperatorName)) {
+  one
+} else {
+  extractExpectedTransformation(one.getInput, prefixOperatorName)
+}
+  case two: TwoInputTransformation[_, _, _] =>
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve the harness test to make it possible test with state backend
> 
>
> Key: FLINK-11074
> URL: https://issues.apache.org/jira/browse/FLINK-11074
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the harness test can only test without state backend. If you use a 
> DataView in the accumulator of the aggregate function, the DataView is a java 
> object and held in heap, not replaced with StateMapView/StateListView which 
> values are actually held in the state backend. We should improve the harness 
> test to make it possible to test with state backend. Otherwise, issues such 
> as FLINK-10674 could have never been found. With this harness test available, 
> we could test the built-in aggregate functions which use the DataView more 
> fine grained.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715933#comment-16715933
 ] 

ASF GitHub Bot commented on FLINK-11074:


dianfu commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240445308
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##
 @@ -87,7 +85,7 @@ class HarnessTestBase {
 new 
RowTypeInfo(distinctCountAggregates.map(getAccumulatorTypeOfAggregateFunction(_)):
 _*)
 
   protected val distinctCountDescriptor: String = 
EncodingUtils.encodeObjectToString(
-new MapStateDescriptor("distinctAgg0", distinctCountAggregationStateType, 
Types.LONG))
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve the harness test to make it possible test with state backend
> 
>
> Key: FLINK-11074
> URL: https://issues.apache.org/jira/browse/FLINK-11074
> Project: Flink
>  Issue Type: Test
>  Components: Table API  SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the harness test can only test without state backend. If you use a 
> DataView in the accumulator of the aggregate function, the DataView is a java 
> object and held in heap, not replaced with StateMapView/StateListView which 
> values are actually held in the state backend. We should improve the harness 
> test to make it possible to test with state backend. Otherwise, issues such 
> as FLINK-10674 could have never been found. With this harness test available, 
> we could test the built-in aggregate functions which use the DataView more 
> fine grained.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on a change in pull request #7253: [FLINK-11074] [table][tests] Enable harness tests with RocksdbStateBackend and add harness tests for CollectAggFunction

2018-12-10 Thread GitBox
dianfu commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240445308
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##
 @@ -87,7 +85,7 @@ class HarnessTestBase {
 new 
RowTypeInfo(distinctCountAggregates.map(getAccumulatorTypeOfAggregateFunction(_)):
 _*)
 
   protected val distinctCountDescriptor: String = 
EncodingUtils.encodeObjectToString(
-new MapStateDescriptor("distinctAgg0", distinctCountAggregationStateType, 
Types.LONG))
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11123) Improve ml quick start doc

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716125#comment-16716125
 ] 

ASF GitHub Bot commented on FLINK-11123:


asfgit closed pull request #7269: [FLINK-11123][docs] fix the import of the 
class is missing in ml quic…
URL: https://github.com/apache/flink/pull/7269
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/libs/ml/quickstart.md b/docs/dev/libs/ml/quickstart.md
index e056b28b505..2e9a7b9505c 100644
--- a/docs/dev/libs/ml/quickstart.md
+++ b/docs/dev/libs/ml/quickstart.md
@@ -153,6 +153,8 @@ A conversion can be done using a simple normalizer mapping 
function:
  
 {% highlight scala %}
 
+import org.apache.flink.ml.math.Vector
+
 def normalizer : LabeledVector => LabeledVector = { 
 lv => LabeledVector(if (lv.label > 0.0) 1.0 else -1.0, lv.vector)
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve ml quick start doc
> --
>
> Key: FLINK-11123
> URL: https://issues.apache.org/jira/browse/FLINK-11123
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Machine Learning Library
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.7.1
>
>
> The user cannot run the sample through the ml quick launch document because 
> the import description of the class is missing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #7269: [FLINK-11123][docs] fix the import of the class is missing in ml quic…

2018-12-10 Thread GitBox
asfgit closed pull request #7269: [FLINK-11123][docs] fix the import of the 
class is missing in ml quic…
URL: https://github.com/apache/flink/pull/7269
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/libs/ml/quickstart.md b/docs/dev/libs/ml/quickstart.md
index e056b28b505..2e9a7b9505c 100644
--- a/docs/dev/libs/ml/quickstart.md
+++ b/docs/dev/libs/ml/quickstart.md
@@ -153,6 +153,8 @@ A conversion can be done using a simple normalizer mapping 
function:
  
 {% highlight scala %}
 
+import org.apache.flink.ml.math.Vector
+
 def normalizer : LabeledVector => LabeledVector = { 
 lv => LabeledVector(if (lv.label > 0.0) 1.0 else -1.0, lv.vector)
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tweise commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-10 Thread GitBox
tweise commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r240470721
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
+   
List jarFiles,
 
 Review comment:
   It seems to leave room for arbitrary review demands also!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11048) Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716158#comment-16716158
 ] 

ASF GitHub Bot commented on FLINK-11048:


tweise commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r240470721
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,39 +170,46 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   protected List getJarFiles() throws ProgramInvocationException {
+   return jarFiles;
}
 
/**
 * Executes the remote job.
 *
-* @param streamGraph
-*Stream Graph to execute
+* @param streamExecutionEnvironment
+*Execution Environment with Stream Graph to execute
 * @param jarFiles
 *List of jar file URLs to ship to the cluster
 * @return The result of the job execution, containing elapsed time and 
accumulators.
 */
-   protected JobExecutionResult executeRemotely(StreamGraph streamGraph, 
List jarFiles) throws ProgramInvocationException {
+   public static JobExecutionResult 
executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
+   
List jarFiles,
 
 Review comment:
   It seems to leave room for arbitrary review demands also!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ability to programmatically execute streaming pipeline with savepoint restore 
>  
> ---
>
> Key: FLINK-11048
> URL: https://issues.apache.org/jira/browse/FLINK-11048
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.7.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>  Labels: pull-request-available
>
> RemoteStreamEnvironment.execute doesn't support restore from savepoint, 
> though the underlying ClusterClient does. Add an explicit "execute remotely" 
> that can be used by downstream projects.
> [https://lists.apache.org/thread.html/6fff05d4a8444d1c6fa139d63605d51f610caff46605a4cdbb35cd50@%3Cdev.flink.apache.org%3E]
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11123) Improve ml quick start doc

2018-12-10 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-11123.
---
   Resolution: Fixed
Fix Version/s: 1.6.3

Fixed in master: 156c09ced9a61d20e2e5d4ce5cfedab8ac4d4ee4
Fixed in release-1.7: f045dfd501343bc9c72c665a6c599e42a221bd67
Fixed in release-1.6: d1a489f9eda733d914a885cd593b26f336a4d380

> Improve ml quick start doc
> --
>
> Key: FLINK-11123
> URL: https://issues.apache.org/jira/browse/FLINK-11123
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Machine Learning Library
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.1, 1.7.0
>
>
> The user cannot run the sample through the ml quick launch document because 
> the import description of the class is missing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints

2018-12-10 Thread GitBox
ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should 
fail ExecutionGraph after "n" unsuccessful checkpoints
URL: https://github.com/apache/flink/pull/3334#issuecomment-446070175
 
 
   @azagrebin - Thanks for the ping. Currently am not working on this. Pls feel 
free to work on this or the related JIRA FLINK-10074. I would add myself as a 
watcher to understand more about it. Thanks once again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-4810) Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful checkpoints

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716171#comment-16716171
 ] 

ASF GitHub Bot commented on FLINK-4810:
---

ramkrish86 commented on issue #3334: FLINK-4810 Checkpoint Coordinator should 
fail ExecutionGraph after "n" unsuccessful checkpoints
URL: https://github.com/apache/flink/pull/3334#issuecomment-446070175
 
 
   @azagrebin - Thanks for the ping. Currently am not working on this. Pls feel 
free to work on this or the related JIRA FLINK-10074. I would add myself as a 
watcher to understand more about it. Thanks once again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Checkpoint Coordinator should fail ExecutionGraph after "n" unsuccessful 
> checkpoints
> 
>
> Key: FLINK-4810
> URL: https://issues.apache.org/jira/browse/FLINK-4810
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> The Checkpoint coordinator should track the number of consecutive 
> unsuccessful checkpoints.
> If more than {{n}} (configured value) checkpoints fail in a row, it should 
> call {{fail()}} on the execution graph to trigger a recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11125) Remove useless import

2018-12-10 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716274#comment-16716274
 ] 

TisonKun commented on FLINK-11125:
--

@[~hequn8128] If the change is not big, it might be satisfied in a hotfix. 
Otherwise it's better to detail the title as "Remove useless import under 
XXXClass" or something.

> Remove useless import 
> --
>
> Key: FLINK-11125
> URL: https://issues.apache.org/jira/browse/FLINK-11125
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL, Tests
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zjffdu commented on issue #7229: [FLINK-11060][scala-shell] Unable to set number of task manager and slot per task manager in scala shell local mode

2018-12-10 Thread GitBox
zjffdu commented on issue #7229: [FLINK-11060][scala-shell] Unable to set 
number of task manager and slot per task manager in scala shell local mode
URL: https://github.com/apache/flink/pull/7229#issuecomment-446082679
 
 
   @yanghua @tillrohrmann Could you help review it ? Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716320#comment-16716320
 ] 

ASF GitHub Bot commented on FLINK-11010:


lamber-ken commented on a change in pull request #7180: [FLINK-11010] [TABLE] 
Flink SQL timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#discussion_r240484482
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
 ##
 @@ -164,4 +167,35 @@ public void testUnion() throws Exception {
 
StreamITCase.compareWithList(expected);
}
+
+   @Test
+   public void testProctime() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+   DataStream> ds = 
JavaStreamTestData.getSmall3TupleDataSet(env);
+   tableEnv.registerDataStream("MyTable", ds, "a, b, c, 
proctime.proctime");
+
+   String sqlQuery = "select proctime from MyTable";
+
+   Table result = tableEnv.sqlQuery(sqlQuery);
+
+   tableEnv
+   .toAppendStream(result, TypeInformation.of(Row.class))
+   .addSink(new SinkFunction() {
+   @Override
+   public void invoke(Row value, Context context) 
throws Exception {
+
+   Timestamp procTimestamp = (Timestamp) 
value.getField(0);
+
+   // validate the second here
+   long procSecondTime = 
procTimestamp.getTime() / 1000;
 
 Review comment:
   yeah, it's better to validate hour. I'll update.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] lamber-ken commented on a change in pull request #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread GitBox
lamber-ken commented on a change in pull request #7180: [FLINK-11010] [TABLE] 
Flink SQL timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#discussion_r240484482
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
 ##
 @@ -164,4 +167,35 @@ public void testUnion() throws Exception {
 
StreamITCase.compareWithList(expected);
}
+
+   @Test
+   public void testProctime() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+   DataStream> ds = 
JavaStreamTestData.getSmall3TupleDataSet(env);
+   tableEnv.registerDataStream("MyTable", ds, "a, b, c, 
proctime.proctime");
+
+   String sqlQuery = "select proctime from MyTable";
+
+   Table result = tableEnv.sqlQuery(sqlQuery);
+
+   tableEnv
+   .toAppendStream(result, TypeInformation.of(Row.class))
+   .addSink(new SinkFunction() {
+   @Override
+   public void invoke(Row value, Context context) 
throws Exception {
+
+   Timestamp procTimestamp = (Timestamp) 
value.getField(0);
+
+   // validate the second here
+   long procSecondTime = 
procTimestamp.getTime() / 1000;
 
 Review comment:
   yeah, it's better to validate hour. I'll update.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9699) Add api to replace registered table

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716286#comment-16716286
 ] 

ASF GitHub Bot commented on FLINK-9699:
---

zjffdu commented on issue #6236: [FLINK-9699] [table] Add api to replace 
registered table
URL: https://github.com/apache/flink/pull/6236#issuecomment-446082836
 
 
   @hequn8128 @yanghua PR is updated, could you help review it ? Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add api to replace registered table
> ---
>
> Key: FLINK-9699
> URL: https://issues.apache.org/jira/browse/FLINK-9699
> Project: Flink
>  Issue Type: Improvement
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10964) sql-client throws exception when paging through finished batch query

2018-12-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716340#comment-16716340
 ] 

vinoyang commented on FLINK-10964:
--

[~twalthr] when you have time, can you help to review the PR of this issue? By 
analyzing the source code, I found that the ResultRestore has been emptied at 
the time of cancelQuery. The main problem with this exception is that when 
stopping the retrieval thread, it still tries to call cancelQuery regardless of 
the running state of the job.

> sql-client throws exception when paging through finished batch query 
> -
>
> Key: FLINK-10964
> URL: https://issues.apache.org/jira/browse/FLINK-10964
> Project: Flink
>  Issue Type: Bug
>  Components: SQL Client
>Reporter: Seth Wiesman
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> When paging through a batch query in state 'Finished' the sql client throws 
> the following exception: 
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not find a 
> result with result identifier '0c7dce30d287fdd13b934fbefe5a38d1'.{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10964) sql-client throws exception when paging through finished batch query

2018-12-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716340#comment-16716340
 ] 

vinoyang edited comment on FLINK-10964 at 12/11/18 6:36 AM:


[~twalthr] when you have time, can you help to review the PR of this issue? By 
analyzing the source code, I found that the ResultStore has been emptied at the 
time of cancelQuery. The main problem with this exception is that when stopping 
the retrieval thread, it still tries to call cancelQuery regardless of the 
running state of the job.


was (Author: yanghua):
[~twalthr] when you have time, can you help to review the PR of this issue? By 
analyzing the source code, I found that the ResultRestore has been emptied at 
the time of cancelQuery. The main problem with this exception is that when 
stopping the retrieval thread, it still tries to call cancelQuery regardless of 
the running state of the job.

> sql-client throws exception when paging through finished batch query 
> -
>
> Key: FLINK-10964
> URL: https://issues.apache.org/jira/browse/FLINK-10964
> Project: Flink
>  Issue Type: Bug
>  Components: SQL Client
>Reporter: Seth Wiesman
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> When paging through a batch query in state 'Finished' the sql client throws 
> the following exception: 
> {code:java}
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not find a 
> result with result identifier '0c7dce30d287fdd13b934fbefe5a38d1'.{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10724) Refactor failure handling in check point coordinator

2018-12-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716352#comment-16716352
 ] 

vinoyang commented on FLINK-10724:
--

[~yunta] From the source code point of view {{abortSubsumed}}, the current 
checkpoint subsumed is treated as a "failure" way (especially when it can be 
subsumed), but should it be regarded as a failure when counting, I think it 
might need to be discussed.

> Refactor failure handling in check point coordinator
> 
>
> Key: FLINK-10724
> URL: https://issues.apache.org/jira/browse/FLINK-10724
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: vinoyang
>Priority: Major
>
> At the moment failure handling of asynchronously triggered checkpoint in 
> check point coordinator happens in different places. We could organise it 
> similar way as failure handling of synchronous triggering of checkpoint in 
> *CheckpointTriggerResult* where we classify error cases. This will simplify 
> e.g. integration of error counter for FLINK-4810.
> See also discussion here: [https://github.com/apache/flink/pull/6567]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] liu-zhaokun commented on issue #7257: [FLINK-11089]Log filecache directory removed messages

2018-12-10 Thread GitBox
liu-zhaokun commented on issue #7257: [FLINK-11089]Log filecache directory 
removed messages
URL: https://github.com/apache/flink/pull/7257#issuecomment-446037623
 
 
   @tillrohrmann Hi,are you available to help me to review this PR? Thanks! 
Look forwards to your reply.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11089) Log filecache directory removed messages

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715910#comment-16715910
 ] 

ASF GitHub Bot commented on FLINK-11089:


liu-zhaokun commented on issue #7257: [FLINK-11089]Log filecache directory 
removed messages
URL: https://github.com/apache/flink/pull/7257#issuecomment-446037623
 
 
   @tillrohrmann Hi,are you available to help me to review this PR? Thanks! 
Look forwards to your reply.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Log filecache directory removed messages 
> -
>
> Key: FLINK-11089
> URL: https://issues.apache.org/jira/browse/FLINK-11089
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 1.7.0
>Reporter: liuzhaokun
>Priority: Minor
>  Labels: pull-request-available
>
> When taskmanager exit or shutdown,the filecache directory named 
> "flink-dist-cache*" will be removed,but there is not any log about this 
> action.So I think we should log it for user to check it easy when there are 
> some bugs.
> You can see IOManager.java logs the removed messages when taskmanager 
> shutdown, filecache can do the same things.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11074) Improve the harness test to make it possible test with state backend

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715932#comment-16715932
 ] 

ASF GitHub Bot commented on FLINK-11074:


dianfu commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240445274
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Integer => JInt}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.dataview.{DataView, MapView}
+import org.apache.flink.table.dataview.StateMapView
+import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, 
GroupAggProcessFunction}
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+class AggFunctionHarnessTest extends HarnessTestBase {
+  private val queryConfig = new TestStreamQueryConfig(Time.seconds(0), 
Time.seconds(0))
+
+  @Test
+  def testCollectAggregate(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val data = new mutable.MutableList[(JInt, String)]
+val t = env.fromCollection(data).toTable(tEnv, 'a, 'b)
+tEnv.registerTable("T", t)
+val sqlQuery = tEnv.sqlQuery(
+  s"""
+ |SELECT
+ |  b, collect(a)
+ |FROM (
+ |  SELECT a, b
+ |  FROM T
+ |  GROUP BY a, b
+ |) GROUP BY b
+ |""".stripMargin)
+
+val testHarness = createHarnessTester[String, CRow, CRow](
+  sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+
+testHarness.setStateBackend(getStateBackend)
+testHarness.open()
+
+val operator = getOperator(testHarness)
+val state = getState(operator, 
"acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]]
+assertTrue(state.isInstanceOf[StateMapView[_, _]])
+
assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]])
+
+val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "bbb"), 1))
+expectedOutput.add(new StreamRecord(CRow("bbb", Map(1 -> 1).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(1: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
1).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2).asJava), 1))
+
+testHarness.processElement(new StreamRecord(CRow(2: JInt, "aaa"), 1))
+expectedOutput.add(new StreamRecord(CRow(false, "aaa", Map(1 -> 
2).asJava), 1))
+expectedOutput.add(new StreamRecord(CRow("aaa", Map(1 -> 2, 2 -> 
1).asJava), 1))
+
+// remove some state: state may be cleaned up by the state backend if not 
accessed more than ttl
+operator.setCurrentKey(Row.of("aaa"))
+state.remove(2)
+
+// retract after state has been cleaned up
+testHarness.processElement(new StreamRecord(CRow(false, 2: JInt, "aaa"), 
1))
+
+val result = 

[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages

2018-12-10 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715937#comment-16715937
 ] 

Fabian Hueske commented on FLINK-10566:
---

Hmmm, true. 
The stack trace was taken during a recursive sink-to-source plan traversal to 
register the serializers.
The plan resulting from the given program branches a lot.
I guess the problem is that the traversal does not check if a subplan was 
already traversed. 

> Flink Planning is exponential in the number of stages
> -
>
> Key: FLINK-10566
> URL: https://issues.apache.org/jira/browse/FLINK-10566
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.5.4, 1.6.1, 1.7.0
>Reporter: Robert Bradshaw
>Priority: Major
> Attachments: chart.png
>
>
> This makes it nearly impossible to run graphs with 100 or more stages. (The 
> execution itself is still sub-second, but the job submission takes 
> increasingly long.)
> I can reproduce this with the following pipeline, which resembles my 
> real-world workloads (with depth up to 10 and width up, and past, 50). On 
> Flink it seems getting width beyond width 10 is problematic (times out after 
> hours). Note the log scale on the chart for time. 
>  
> {code:java}
>   public static void runPipeline(int depth, int width) throws Exception {
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet input = env.fromElements("a", "b", "c");
> DataSet stats = null;
> for (int i = 0; i < depth; i++) {
>   stats = analyze(input, stats, width / (i + 1) + 1);
> }
> stats.writeAsText("out.txt");
> env.execute("depth " + depth + " width " + width);
>   }
>   public static DataSet analyze(DataSet input, 
> DataSet stats, int branches) {
> System.out.println("analyze " + branches);
> for (int i = 0; i < branches; i++) {
>   final int ii = i;
>   if (stats != null) {
> input = input.map(new RichMapFunction() {
> @Override
> public void open(Configuration parameters) throws Exception {
>   Collection broadcastSet = 
> getRuntimeContext().getBroadcastVariable("stats");
> }
> @Override
> public String map(String value) throws Exception {
>   return value;
> }
>   }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
>   }
>   DataSet branch = input
>.map(s -> new Tuple2(0, s + 
> ii))
>.groupBy(0)
>.minBy(1)
>.map(kv -> kv.f1);
>   if (stats == null) {
> stats = branch;
>   } else {
> stats = stats.union(branch);
>   }
> }
> return stats.map(s -> "(" + s + ").stats");
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11070) Add stream-stream non-window cross join

2018-12-10 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715954#comment-16715954
 ] 

Fabian Hueske commented on FLINK-11070:
---

Hmmm, I'm hesitant to enable cross joins by default.
It is very easy to write a query that performs unnecessary cross joins. I would 
not trust users with that.
If we run cross joins by default, users will experience very bad performance 
even the query could be rewritten without cross joins. 
With a switch in the table config we can notify the user about a (potentially) 
inefficient query and users can manually enable these queries.

I don't really see a problem removing the switch in the future. We can change 
the default behavior to enable cross joins by default once we have support for 
join reordering. Even if the cardinality estimates are not perfect, equi joins 
should be much cheaper than cross joins.

> Add stream-stream non-window cross join
> ---
>
> Key: FLINK-11070
> URL: https://issues.apache.org/jira/browse/FLINK-11070
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code:java}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> In order to support more queries, it would be nice to have cross join on 
> streaming. We can start from a simple version, for example, call 
> forceNonParallel() for connectOperator in `DataStreamJoin` when it is a cross 
> join. The performance may be bad. But it works fine if the two tables of 
> cross join are small ones. 
> We can do some optimizations later, such as broadcasting the smaller side, 
> etc.
> Any suggestions are greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10543) Leverage efficient timer deletion in relational operators

2018-12-10 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716027#comment-16716027
 ] 

Fabian Hueske commented on FLINK-10543:
---

Hi [~sunjincheng121], [~hequn8128], I had a quick look at the PR and it seems 
that the clean up behavior of some operators was changed.

1. Some {{ValueState}} were removed from join operators, which breaks savepoint 
compatibility.
2. The clean up behavior of joins was changed. It seems that all records are 
removed in case of a firing clean up timer while before, non-expired records 
were kept.

We only backport bug fixes to release branches that do not break compatibility 
(unless it is absolutely necessary).
Hence, the commit on the release-1.7 branch must be reverted.

I think we should also discuss the changes on the master branch. 
This issue was about leveraging the new timer deletion feature to improve the 
performance and not changing the clean up semantics of the operators. 
I'll have a closer look at the changes in the next days to figure out what 
exactly changed.

Thanks, Fabian

> Leverage efficient timer deletion in relational operators
> -
>
> Key: FLINK-10543
> URL: https://issues.apache.org/jira/browse/FLINK-10543
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0, 1.7.1
>
>
> FLINK-9423 added support for efficient timer deletions. This feature is 
> available since Flink 1.6 and should be used by the relational operator of 
> SQL and Table API.
> Currently, we use a few workarounds to handle situations when deleting timers 
> would be the better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr commented on a change in pull request #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread GitBox
walterddr commented on a change in pull request #7180: [FLINK-11010] [TABLE] 
Flink SQL timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#discussion_r240470904
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
 ##
 @@ -164,4 +167,35 @@ public void testUnion() throws Exception {
 
StreamITCase.compareWithList(expected);
}
+
+   @Test
+   public void testProctime() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+   DataStream> ds = 
JavaStreamTestData.getSmall3TupleDataSet(env);
+   tableEnv.registerDataStream("MyTable", ds, "a, b, c, 
proctime.proctime");
+
+   String sqlQuery = "select proctime from MyTable";
+
+   Table result = tableEnv.sqlQuery(sqlQuery);
+
+   tableEnv
+   .toAppendStream(result, TypeInformation.of(Row.class))
+   .addSink(new SinkFunction() {
+   @Override
+   public void invoke(Row value, Context context) 
throws Exception {
+
+   Timestamp procTimestamp = (Timestamp) 
value.getField(0);
+
+   // validate the second here
+   long procSecondTime = 
procTimestamp.getTime() / 1000;
 
 Review comment:
   This causes me some trouble because cases can happen when they cross the 
second boundary. this is not a stable ITCase in my opinion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716164#comment-16716164
 ] 

ASF GitHub Bot commented on FLINK-11010:


walterddr commented on a change in pull request #7180: [FLINK-11010] [TABLE] 
Flink SQL timestamp is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#discussion_r240470904
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
 ##
 @@ -164,4 +167,35 @@ public void testUnion() throws Exception {
 
StreamITCase.compareWithList(expected);
}
+
+   @Test
+   public void testProctime() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+
+   DataStream> ds = 
JavaStreamTestData.getSmall3TupleDataSet(env);
+   tableEnv.registerDataStream("MyTable", ds, "a, b, c, 
proctime.proctime");
+
+   String sqlQuery = "select proctime from MyTable";
+
+   Table result = tableEnv.sqlQuery(sqlQuery);
+
+   tableEnv
+   .toAppendStream(result, TypeInformation.of(Row.class))
+   .addSink(new SinkFunction() {
+   @Override
+   public void invoke(Row value, Context context) 
throws Exception {
+
+   Timestamp procTimestamp = (Timestamp) 
value.getField(0);
+
+   // validate the second here
+   long procSecondTime = 
procTimestamp.getTime() / 1000;
 
 Review comment:
   This causes me some trouble because cases can happen when they cross the 
second boundary. this is not a stable ITCase in my opinion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0, 1.8.0, 1.7.1
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11070) Add stream-stream non-window cross join

2018-12-10 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715884#comment-16715884
 ] 

Hequn Cheng commented on FLINK-11070:
-

[~fhueske] Hi, thanks for the feedback and sharing the thoughts.

As for the example, I think I should add more details.
 * Table2 and table3 are very small tables, such as a table with only one row. 
And table1 is a very big table, such as with 1M rows.
 * All these rows contain same join keys.

So the cost for cross join and non-cross join would be 1*1*1M(cross join) VS 
1*1M*1M(non-cross join). This means the query may be executed with a cross join 
with a much better performance.

Adding a switch is a good idea. This force our users to pay more attention to 
the performance of cross join. But it may also bring some inconvenience. We 
can't remove the switch even Flink support join reordering. Because there is a 
chance the cardinality estimates have not been passed by the user. So once we 
can't get the cardinality estimates, the user has to configure the switch to 
enable a cross join if he does want to use cross join. From this point of view, 
I think we should not have the switch. 
I would propose that:
 * Don't enable join reordering in general because reordering without 
cardinality estimates is gambling
 * Trust the query written by the user as we don't have cardinality estimates. 
And we don't need to add a switch to bring inconvenience to the user. 

What do you think?

To get a better performance, I think making the join parallel is a good idea. I 
will take an investigate on it. Thanks a lot for your suggestions. 

Best, Hequn

> Add stream-stream non-window cross join
> ---
>
> Key: FLINK-11070
> URL: https://issues.apache.org/jira/browse/FLINK-11070
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code:java}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>   """
> |SELECT t1.a, t3.b
> |FROM MyTable3 t3, MyTable2 t2, MyTable t1
> |WHERE t1.a = t3.a AND t1.a = t2.a
> |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
> LogicalJoin(condition=[true], joinType=[inner])
>   LogicalJoin(condition=[true], joinType=[inner])
> LogicalTableScan(table=[[_DataStreamTable_2]])
> LogicalTableScan(table=[[_DataStreamTable_1]])
>   LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> In order to support more queries, it would be nice to have cross join on 
> streaming. We can start from a simple version, for example, call 
> forceNonParallel() for connectOperator in `DataStreamJoin` when it is a cross 
> join. The performance may be bad. But it works fine if the two tables of 
> cross join are small ones. 
> We can do some optimizations later, such as broadcasting the smaller side, 
> etc.
> Any suggestions are greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716020#comment-16716020
 ] 

ASF GitHub Bot commented on FLINK-11010:


lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp 
is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-446050898
 
 
   hi, @zentol cc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink SQL timestamp is inconsistent with currentProcessingTime()
> 
>
> Key: FLINK-11010
> URL: https://issues.apache.org/jira/browse/FLINK-11010
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> Flink SQL timestamp is inconsistent with currentProcessingTime().
>  
> the ProcessingTime is just implemented by invoking System.currentTimeMillis() 
> but the long value will be automatically wrapped to a Timestamp with the 
> following statement: 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime()

2018-12-10 Thread GitBox
lamber-ken commented on issue #7180: [FLINK-11010] [TABLE] Flink SQL timestamp 
is inconsistent with currentProcessingTime()
URL: https://github.com/apache/flink/pull/7180#issuecomment-446050898
 
 
   hi, @zentol cc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] link3280 closed pull request #7148: [FLINK-10943][tests] Fix test failures in flink runtime caused by curator dependency conflicts

2018-12-10 Thread GitBox
link3280 closed pull request #7148: [FLINK-10943][tests] Fix test failures in 
flink runtime caused by curator dependency conflicts
URL: https://github.com/apache/flink/pull/7148
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index d216851c2d1..0cab4feabda 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -72,6 +72,12 @@ under the License.
flink-shaded-hadoop2
${project.version}
true
+   
+   
+   org.apache.curator
+   curator-client
+   
+   

 



 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10943) Flink runtime test failed caused by curator dependency conflicts

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716149#comment-16716149
 ] 

ASF GitHub Bot commented on FLINK-10943:


link3280 commented on issue #7148: [FLINK-10943][tests] Fix test failures in 
flink runtime caused by curator dependency conflicts
URL: https://github.com/apache/flink/pull/7148#issuecomment-446068042
 
 
   @tillrohrmann Thanks for your review. You're right, it's caused by the 
relocation problem of IntelliJ. I'll close the PR and the Jira ticket.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink runtime test failed caused by curator dependency conflicts
> 
>
> Key: FLINK-10943
> URL: https://issues.apache.org/jira/browse/FLINK-10943
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Tests
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Paul Lin
>Assignee: Paul Lin
>Priority: Minor
>  Labels: pull-request-available
> Attachments: 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt
>
>
> Hadoop-common of 2.6 + version includes curator dependencies, which would 
> have conflicts with the curator used by Flink runtime and cause test failures 
> (the attachment is the surefire report). 
> The curator dependencies tree of flink runtime is as below:
> ```
> flink-shaded-hadoop2 -> hadoop-common -> curator-client & curator-recipes
> flink-shaded-curator -> curator-recipes -> curator-framework -> curator-client
> ```
> According to the dependency mechanism, maven would pick the curator-client in 
> flink-shaded-hadoop2, and curator-framework and curator-recipes from 
> flink-shaded-curator.
> To fix the problem I think we can simply exclude curator-client from 
> flink-shaded-hadoop2 dependency in flink-runtime.
> I'd like to fix this, please let me know what you think. Thanks!
> [^org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] link3280 commented on issue #7148: [FLINK-10943][tests] Fix test failures in flink runtime caused by curator dependency conflicts

2018-12-10 Thread GitBox
link3280 commented on issue #7148: [FLINK-10943][tests] Fix test failures in 
flink runtime caused by curator dependency conflicts
URL: https://github.com/apache/flink/pull/7148#issuecomment-446068042
 
 
   @tillrohrmann Thanks for your review. You're right, it's caused by the 
relocation problem of IntelliJ. I'll close the PR and the Jira ticket.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10943) Flink runtime test failed caused by curator dependency conflicts

2018-12-10 Thread Paul Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paul Lin closed FLINK-10943.

Resolution: Not A Bug

> Flink runtime test failed caused by curator dependency conflicts
> 
>
> Key: FLINK-10943
> URL: https://issues.apache.org/jira/browse/FLINK-10943
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Tests
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Paul Lin
>Assignee: Paul Lin
>Priority: Minor
>  Labels: pull-request-available
> Attachments: 
> org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt
>
>
> Hadoop-common of 2.6 + version includes curator dependencies, which would 
> have conflicts with the curator used by Flink runtime and cause test failures 
> (the attachment is the surefire report). 
> The curator dependencies tree of flink runtime is as below:
> ```
> flink-shaded-hadoop2 -> hadoop-common -> curator-client & curator-recipes
> flink-shaded-curator -> curator-recipes -> curator-framework -> curator-client
> ```
> According to the dependency mechanism, maven would pick the curator-client in 
> flink-shaded-hadoop2, and curator-framework and curator-recipes from 
> flink-shaded-curator.
> To fix the problem I think we can simply exclude curator-client from 
> flink-shaded-hadoop2 dependency in flink-runtime.
> I'd like to fix this, please let me know what you think. Thanks!
> [^org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStoreTest.txt]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11126) Filter out AMRMToken in the TaskManager credentials

2018-12-10 Thread Paul Lin (JIRA)
Paul Lin created FLINK-11126:


 Summary: Filter out AMRMToken in the TaskManager credentials
 Key: FLINK-11126
 URL: https://issues.apache.org/jira/browse/FLINK-11126
 Project: Flink
  Issue Type: Improvement
  Components: Security, YARN
Affects Versions: 1.7.0, 1.6.2
Reporter: Paul Lin
Assignee: Paul Lin


Currently, Flink JobManager propagates its storage tokens to TaskManager to 
meet the requirement of YARN log aggregation (see FLINK-6376). But in this way 
the AMRMToken is also included in the TaskManager credentials, which could be 
potentially insecure. We should filter out AMRMToken before setting the tokens 
to TaskManager's container launch context.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 opened a new pull request #7272: [FLINK-11125] remove unused import

2018-12-10 Thread GitBox
hequn8128 opened a new pull request #7272: [FLINK-11125] remove unused import
URL: https://github.com/apache/flink/pull/7272
 
 
   
   ## What is the purpose of the change
   
   This pull request removes some unused imports in tests and flink-table.
   
   
   ## Brief change log
   
 - Remove unused import
   
   
   ## Verifying this change
   
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11125) Remove useless import

2018-12-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11125:
---
Labels: pull-request-available  (was: )

> Remove useless import 
> --
>
> Key: FLINK-11125
> URL: https://issues.apache.org/jira/browse/FLINK-11125
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL, Tests
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >