[jira] [Commented] (FLINK-6055) Supported setting timers on a Non-Keyed Stream

2017-03-14 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925575#comment-15925575
 ] 

Xiaogang Shi commented on FLINK-6055:
-

I think it's very challenging because the storing and restoring of timers in 
non-keyed streams is very difficult. Do you have any idea?

> Supported setting timers on a Non-Keyed Stream
> --
>
> Key: FLINK-6055
> URL: https://issues.apache.org/jira/browse/FLINK-6055
> Project: Flink
>  Issue Type: New Feature
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> After [FLINK-4460] Allow ProcessFunction on non-keyed streams, I want 
> supported setting timers on a Non-Keyed Stream. What do you think? 
> [~aljoscha] 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6055) Supported setting timers on a Non-Keyed Stream

2017-03-14 Thread sunjincheng (JIRA)
sunjincheng created FLINK-6055:
--

 Summary: Supported setting timers on a Non-Keyed Stream
 Key: FLINK-6055
 URL: https://issues.apache.org/jira/browse/FLINK-6055
 Project: Flink
  Issue Type: New Feature
Reporter: sunjincheng
Assignee: sunjincheng


After [FLINK-4460] Allow ProcessFunction on non-keyed streams, I want supported 
setting timers on a Non-Keyed Stream. What do you think? [~aljoscha] 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-14 Thread godfreyhe
Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106073030
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+* Extracts the indices of input fields which accessed by the 
RexProgram.
+*
+* @param rexProgram The RexProgram to analyze
+* @return The indices of accessed input fields
+*/
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+val visitor = new InputRefVisitor
+
+// extract referenced input fields from projections
+rexProgram.getProjectList.foreach(
+  exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+// extract referenced input fields from condition
+val condition = rexProgram.getCondition
+if (condition != null) {
+  rexProgram.expandLocalRef(condition).accept(visitor)
+}
+
+visitor.getFields
+  }
+
+  /**
+* Extract condition from RexProgram and convert it into independent 
CNF expressions.
+*
+* @param rexProgram The RexProgram to analyze
+* @return converted expressions as well as RexNodes which cannot be 
translated
+*/
+  def extractConjunctiveConditions(
+  rexProgram: RexProgram,
+  rexBuilder: RexBuilder,
+  catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+rexProgram.getCondition match {
+  case condition: RexLocalRef =>
+val expanded = rexProgram.expandLocalRef(condition)
+// converts the expanded expression to conjunctive normal form,
+// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR 
c)"
+val cnf = RexUtil.toCnf(rexBuilder, expanded)
+// converts the cnf condition to a list of AND conditions
+val conjunctions = RelOptUtil.conjunctions(cnf)
+
+val convertedExpressions = new mutable.ArrayBuffer[Expression]
+val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+val inputNames = 
rexProgram.getInputRowType.getFieldNames.asScala.toArray
+val converter = new ConvertToExpression(inputNames, catalog)
+
+conjunctions.asScala.foreach(rex => {
+  rex.accept(converter) match {
+case Some(expression) => convertedExpressions += expression
+case None => unconvertedRexNodes += rex
+  }
+})
+(convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+  case _ => (Array.empty, Array.empty)
+}
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): Unit =
+fields += inputRef.getIndex
+
+  override def visitCall(call: RexCall): Unit =
+call.operands.foreach(operand => operand.accept(this))
+}
+
+/**
+  * An RexVisitor to convert RexNode to 

[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...

2017-03-14 Thread godfreyhe
Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106072764
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+* Extracts the indices of input fields which accessed by the 
RexProgram.
+*
+* @param rexProgram The RexProgram to analyze
+* @return The indices of accessed input fields
+*/
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+val visitor = new InputRefVisitor
+
+// extract referenced input fields from projections
+rexProgram.getProjectList.foreach(
+  exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+// extract referenced input fields from condition
+val condition = rexProgram.getCondition
+if (condition != null) {
+  rexProgram.expandLocalRef(condition).accept(visitor)
+}
+
+visitor.getFields
+  }
+
+  /**
+* Extract condition from RexProgram and convert it into independent 
CNF expressions.
+*
+* @param rexProgram The RexProgram to analyze
+* @return converted expressions as well as RexNodes which cannot be 
translated
+*/
+  def extractConjunctiveConditions(
+  rexProgram: RexProgram,
+  rexBuilder: RexBuilder,
+  catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+rexProgram.getCondition match {
+  case condition: RexLocalRef =>
+val expanded = rexProgram.expandLocalRef(condition)
+// converts the expanded expression to conjunctive normal form,
+// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR 
c)"
+val cnf = RexUtil.toCnf(rexBuilder, expanded)
+// converts the cnf condition to a list of AND conditions
+val conjunctions = RelOptUtil.conjunctions(cnf)
+
+val convertedExpressions = new mutable.ArrayBuffer[Expression]
+val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+val inputNames = 
rexProgram.getInputRowType.getFieldNames.asScala.toArray
+val converter = new ConvertToExpression(inputNames, catalog)
+
+conjunctions.asScala.foreach(rex => {
+  rex.accept(converter) match {
+case Some(expression) => convertedExpressions += expression
+case None => unconvertedRexNodes += rex
+  }
+})
+(convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+  case _ => (Array.empty, Array.empty)
+}
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private var fields = mutable.LinkedHashSet[Int]()
--- End diff --

`var` → `val`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with 

[GitHub] flink issue #3486: [FLINK-5981][SECURITY]make ssl version and cipher suites ...

2017-03-14 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3486
  
@vijikarthi I've checked the [JDK 
doc](http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites)
 and not found any notes about combination of ssl version and ciper suites. 
About cihper suites, it says `The following list contains the standard JSSE 
cipher suite names. Over time, various groups have added additional cipher 
suites to the SSL/TLS namespace. `, so i think we better not add additional 
description about that and let user to follow JRE/JDK rules.

@StephanEwen Two comments you mentioned was fixed :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925442#comment-15925442
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106072764
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+* Extracts the indices of input fields which accessed by the 
RexProgram.
+*
+* @param rexProgram The RexProgram to analyze
+* @return The indices of accessed input fields
+*/
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+val visitor = new InputRefVisitor
+
+// extract referenced input fields from projections
+rexProgram.getProjectList.foreach(
+  exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+// extract referenced input fields from condition
+val condition = rexProgram.getCondition
+if (condition != null) {
+  rexProgram.expandLocalRef(condition).accept(visitor)
+}
+
+visitor.getFields
+  }
+
+  /**
+* Extract condition from RexProgram and convert it into independent 
CNF expressions.
+*
+* @param rexProgram The RexProgram to analyze
+* @return converted expressions as well as RexNodes which cannot be 
translated
+*/
+  def extractConjunctiveConditions(
+  rexProgram: RexProgram,
+  rexBuilder: RexBuilder,
+  catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+rexProgram.getCondition match {
+  case condition: RexLocalRef =>
+val expanded = rexProgram.expandLocalRef(condition)
+// converts the expanded expression to conjunctive normal form,
+// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR 
c)"
+val cnf = RexUtil.toCnf(rexBuilder, expanded)
+// converts the cnf condition to a list of AND conditions
+val conjunctions = RelOptUtil.conjunctions(cnf)
+
+val convertedExpressions = new mutable.ArrayBuffer[Expression]
+val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+val inputNames = 
rexProgram.getInputRowType.getFieldNames.asScala.toArray
+val converter = new ConvertToExpression(inputNames, catalog)
+
+conjunctions.asScala.foreach(rex => {
+  rex.accept(converter) match {
+case Some(expression) => convertedExpressions += expression
+case None => unconvertedRexNodes += rex
+  }
+})
+(convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+  case _ => (Array.empty, Array.empty)
+}
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private var fields = mutable.LinkedHashSet[Int]()
--- End diff --

`var` → `val`


> Add FilterableTableSource interface and translation rule
> 

[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925441#comment-15925441
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user godfreyhe commented on a diff in the pull request:

https://github.com/apache/flink/pull/3520#discussion_r106073030
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, 
ResolvedFieldReference}
+import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.util.Preconditions
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+object RexProgramExtractor {
+
+  /**
+* Extracts the indices of input fields which accessed by the 
RexProgram.
+*
+* @param rexProgram The RexProgram to analyze
+* @return The indices of accessed input fields
+*/
+  def extractRefInputFields(rexProgram: RexProgram): Array[Int] = {
+val visitor = new InputRefVisitor
+
+// extract referenced input fields from projections
+rexProgram.getProjectList.foreach(
+  exp => rexProgram.expandLocalRef(exp).accept(visitor))
+
+// extract referenced input fields from condition
+val condition = rexProgram.getCondition
+if (condition != null) {
+  rexProgram.expandLocalRef(condition).accept(visitor)
+}
+
+visitor.getFields
+  }
+
+  /**
+* Extract condition from RexProgram and convert it into independent 
CNF expressions.
+*
+* @param rexProgram The RexProgram to analyze
+* @return converted expressions as well as RexNodes which cannot be 
translated
+*/
+  def extractConjunctiveConditions(
+  rexProgram: RexProgram,
+  rexBuilder: RexBuilder,
+  catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
+
+rexProgram.getCondition match {
+  case condition: RexLocalRef =>
+val expanded = rexProgram.expandLocalRef(condition)
+// converts the expanded expression to conjunctive normal form,
+// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR 
c)"
+val cnf = RexUtil.toCnf(rexBuilder, expanded)
+// converts the cnf condition to a list of AND conditions
+val conjunctions = RelOptUtil.conjunctions(cnf)
+
+val convertedExpressions = new mutable.ArrayBuffer[Expression]
+val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+val inputNames = 
rexProgram.getInputRowType.getFieldNames.asScala.toArray
+val converter = new ConvertToExpression(inputNames, catalog)
+
+conjunctions.asScala.foreach(rex => {
+  rex.accept(converter) match {
+case Some(expression) => convertedExpressions += expression
+case None => unconvertedRexNodes += rex
+  }
+})
+(convertedExpressions.toArray, unconvertedRexNodes.toArray)
+
+  case _ => (Array.empty, Array.empty)
+}
+  }
+}
+
+/**
+  * An RexVisitor to extract all referenced input fields
+  */
+class InputRefVisitor extends RexVisitorImpl[Unit](true) {
+
+  private var fields = mutable.LinkedHashSet[Int]()
+
+  def getFields: Array[Int] = fields.toArray
+
+  override def visitInputRef(inputRef: RexInputRef): 

[jira] [Updated] (FLINK-6039) Row of TableFunction should support flexible number of fields

2017-03-14 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-6039:
--
Component/s: Table API & SQL

> Row of TableFunction should support flexible number of fields
> -
>
> Key: FLINK-6039
> URL: https://issues.apache.org/jira/browse/FLINK-6039
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> In actual world, especially while processing logs with TableFunction. The 
> formats of the logs in actual world are flexible. Thus, the number of fields 
> should not be fixed. 
> For examples, we should make the three following types of of TableFunction 
> work.
> {code}
> // Test for incomplete row
> class TableFunc4 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(3)
> row.setField(0, s)  // And we only set values for one column
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for incomplete row
> class TableFunc5 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(1)  // ResultType is three columns, we have only 
> one here
> row.setField(0, s)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> // Test for overflow row
> class TableFunc6 extends TableFunction[Row] {
>   def eval(str: String): Unit = {
> if (str.contains("#")) {
>   str.split("#").foreach({ s =>
> val row = new Row(5)  // ResultType is two columns, we have five 
> columns here
> row.setField(0, s)
> row.setField(1, s.length)
> row.setField(2, s.length)
> row.setField(3, s.length)
> row.setField(4, s.length)
> collect(row)
>   })
> }
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO)
>   }
> }
> {code}
> Actually, the TableFunc4 and TableFunc6 has already worked correctly with 
> current version. This issue will make TableFunc5 works.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6036) Let catalog support partition

2017-03-14 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-6036:
-
Description: 
Now catalog only support CRUD at database and table level. But in some kind of 
catalog, for example for hive, we also need do CRUD operations at partition 
level. 
This issue aims to let catalog support partition.

  was:
Now catalog only support CRUD at database and table level. But in some kind of 
catalog, for example for hive, we also need do CRUD operations on partition 
level. 
This issue aims to let catalog support partition.


> Let catalog support partition
> -
>
> Key: FLINK-6036
> URL: https://issues.apache.org/jira/browse/FLINK-6036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> Now catalog only support CRUD at database and table level. But in some kind 
> of catalog, for example for hive, we also need do CRUD operations at 
> partition level. 
> This issue aims to let catalog support partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6036) Let catalog support partition

2017-03-14 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang updated FLINK-6036:
-
Description: 
Now catalog only support CRUD at database and table level. But in some kind of 
catalog, for example for hive, we also need do CRUD operations on partition 
level. 
This issue aims to let catalog support partition.

> Let catalog support partition
> -
>
> Key: FLINK-6036
> URL: https://issues.apache.org/jira/browse/FLINK-6036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: jingzhang
>
> Now catalog only support CRUD at database and table level. But in some kind 
> of catalog, for example for hive, we also need do CRUD operations on 
> partition level. 
> This issue aims to let catalog support partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6036) Let catalog support partition

2017-03-14 Thread jingzhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jingzhang reassigned FLINK-6036:


Assignee: jingzhang

> Let catalog support partition
> -
>
> Key: FLINK-6036
> URL: https://issues.apache.org/jira/browse/FLINK-6036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> Now catalog only support CRUD at database and table level. But in some kind 
> of catalog, for example for hive, we also need do CRUD operations on 
> partition level. 
> This issue aims to let catalog support partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5981) SSL version and ciper suites cannot be constrained as configured

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925423#comment-15925423
 ] 

ASF GitHub Bot commented on FLINK-5981:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3486
  
@vijikarthi I've checked the [JDK 
doc](http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites)
 and not found any notes about combination of ssl version and ciper suites. 
About cihper suites, it says `The following list contains the standard JSSE 
cipher suite names. Over time, various groups have added additional cipher 
suites to the SSL/TLS namespace. `, so i think we better not add additional 
description about that and let user to follow JRE/JDK rules.

@StephanEwen Two comments you mentioned was fixed :)


> SSL version and ciper suites cannot be constrained as configured
> 
>
> Key: FLINK-5981
> URL: https://issues.apache.org/jira/browse/FLINK-5981
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> I configured ssl and start flink job, but found configured properties cannot 
> apply properly:
> akka port: only ciper suites apply right, ssl version not
> blob server/netty server: both ssl version and ciper suites are not like what 
> I configured
> I've found out the reason why:
> http://stackoverflow.com/questions/11504173/sslcontext-initialization (for 
> blob server and netty server)
> https://groups.google.com/forum/#!topic/akka-user/JH6bGnWE8kY(for akka ssl 
> version, it's fixed in akka 2.4:https://github.com/akka/akka/pull/21078)
> I'll fix the issue on blob server and netty server, and it seems like only 
> upgrade for akka can solve issue in akka side(we'll consider later as upgrade 
> is not a small action).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor

2017-03-14 Thread Syinchwun Leo (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925393#comment-15925393
 ] 

Syinchwun Leo edited comment on FLINK-5756 at 3/15/17 1:43 AM:
---

Is it possible that avoiding using merge() operation. I notice that the result 
of RocksDB's get() is a byte array. My point is that when calling add() method 
of RocksDBListState, call get() first and get byte array, then append new 
value's serialized byte[] to byte array, then set back to Rocks. The method 
make it is possible there is only one byte[] under the key. I haven't
 test the idea, maybe the performance is not perfect and  awkward.


was (Author: syinchwunleo):
Is it possible that avoiding using merge() operation. I notice that the result 
of RocksDB's get() is a byte array. My point is that when calling add() method 
of RocksDBListState, call get() first and get byte array, then append new 
value's serialized byte[] to byte array, then set to Rocks. I haven't
 test the idea, maybe the performance is not perfect and  awkward.

> When there are many values under the same key in ListState, 
> RocksDBStateBackend performances poor
> -
>
> Key: FLINK-5756
> URL: https://issues.apache.org/jira/browse/FLINK-5756
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>
> When using RocksDB as the StateBackend, if there are many values under the 
> same key in ListState, the windowState.get() operator performances very poor. 
> I also the the RocksDB using version 4.11.2, the performance is also very 
> poor. The problem is likely to related to RocksDB itself's get() operator 
> after using merge(). The problem may influences the window operation's 
> performance when the size is very large using ListState. I try to merge 5 
> values under the same key in RocksDB, It costs 120 seconds to execute get() 
> operation.
> ///
> The flink's code is as follows:
> {code}
> class SEventSource extends RichSourceFunction [SEvent] {
>   private var count = 0L
>   private val alphabet = 
> "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
>   override def run(sourceContext: SourceContext[SEvent]): Unit = {
> while (true) {
>   for (i <- 0 until 5000) {
> sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
> count += 1L
>   }
>   Thread.sleep(1000)
> }
>   }
> }
> env.addSource(new SEventSource)
>   .assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[SEvent] {
> override def getCurrentWatermark: Watermark = {
>   new Watermark(System.currentTimeMillis())
> }
> override def extractTimestamp(t: SEvent, l: Long): Long = {
>   System.currentTimeMillis()
> }
>   })
>   .keyBy(0)
>   .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
>   .apply(new WindowStatistic)
>   .map(x => (System.currentTimeMillis(), x))
>   .print()
> {code}
> 
> The RocksDB Test code:
> {code}
> val stringAppendOperator = new StringAppendOperator
> val options = new Options()
> options.setCompactionStyle(CompactionStyle.LEVEL)
>   .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>   .setLevelCompactionDynamicLevelBytes(true)
>   .setIncreaseParallelism(4)
>   .setUseFsync(true)
>   .setMaxOpenFiles(-1)
>   .setCreateIfMissing(true)
>   .setMergeOperator(stringAppendOperator)
> val write_options = new WriteOptions
> write_options.setSync(false)
> val rocksDB = RocksDB.open(options, "/**/Data/")
> val key = "key"
> val value = 
> "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
> val beginmerge = System.currentTimeMillis()
> for(i <- 0 to 5) {
>   rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
>   //rocksDB.put(key.getBytes, value.getBytes)
> }
> println("finish")
> val begin = System.currentTimeMillis()
> rocksDB.get(key.getBytes)
> val end = System.currentTimeMillis()
> println("merge cost:" + (begin - beginmerge))
> println("Time consuming:" + (end - begin))
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor

2017-03-14 Thread Syinchwun Leo (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925393#comment-15925393
 ] 

Syinchwun Leo commented on FLINK-5756:
--

Is it possible that avoiding using merge() operation. I notice that the result 
of RocksDB's get() is a byte array. My point is that when calling add() method 
of RocksDBListState, call get() first and get byte array, then append new 
value's serialized byte[] to byte array, then set to Rocks. I haven't
 test the idea, maybe the performance is not perfect and  awkward.

> When there are many values under the same key in ListState, 
> RocksDBStateBackend performances poor
> -
>
> Key: FLINK-5756
> URL: https://issues.apache.org/jira/browse/FLINK-5756
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>
> When using RocksDB as the StateBackend, if there are many values under the 
> same key in ListState, the windowState.get() operator performances very poor. 
> I also the the RocksDB using version 4.11.2, the performance is also very 
> poor. The problem is likely to related to RocksDB itself's get() operator 
> after using merge(). The problem may influences the window operation's 
> performance when the size is very large using ListState. I try to merge 5 
> values under the same key in RocksDB, It costs 120 seconds to execute get() 
> operation.
> ///
> The flink's code is as follows:
> {code}
> class SEventSource extends RichSourceFunction [SEvent] {
>   private var count = 0L
>   private val alphabet = 
> "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
>   override def run(sourceContext: SourceContext[SEvent]): Unit = {
> while (true) {
>   for (i <- 0 until 5000) {
> sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
> count += 1L
>   }
>   Thread.sleep(1000)
> }
>   }
> }
> env.addSource(new SEventSource)
>   .assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[SEvent] {
> override def getCurrentWatermark: Watermark = {
>   new Watermark(System.currentTimeMillis())
> }
> override def extractTimestamp(t: SEvent, l: Long): Long = {
>   System.currentTimeMillis()
> }
>   })
>   .keyBy(0)
>   .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
>   .apply(new WindowStatistic)
>   .map(x => (System.currentTimeMillis(), x))
>   .print()
> {code}
> 
> The RocksDB Test code:
> {code}
> val stringAppendOperator = new StringAppendOperator
> val options = new Options()
> options.setCompactionStyle(CompactionStyle.LEVEL)
>   .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>   .setLevelCompactionDynamicLevelBytes(true)
>   .setIncreaseParallelism(4)
>   .setUseFsync(true)
>   .setMaxOpenFiles(-1)
>   .setCreateIfMissing(true)
>   .setMergeOperator(stringAppendOperator)
> val write_options = new WriteOptions
> write_options.setSync(false)
> val rocksDB = RocksDB.open(options, "/**/Data/")
> val key = "key"
> val value = 
> "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
> val beginmerge = System.currentTimeMillis()
> for(i <- 0 to 5) {
>   rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
>   //rocksDB.put(key.getBytes, value.getBytes)
> }
> println("finish")
> val begin = System.currentTimeMillis()
> rocksDB.get(key.getBytes)
> val end = System.currentTimeMillis()
> println("merge cost:" + (begin - beginmerge))
> println("Time consuming:" + (end - begin))
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6040) DataStreamUserDefinedFunctionITCase occasionally fails

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925373#comment-15925373
 ] 

ASF GitHub Bot commented on FLINK-6040:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3530


> DataStreamUserDefinedFunctionITCase occasionally fails
> --
>
> Key: FLINK-6040
> URL: https://issues.apache.org/jira/browse/FLINK-6040
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>Priority: Trivial
>  Labels: test
> Fix For: 1.3.0
>
>
> Three test cases in DataStreamUserDefinedFunctionITCase forgot to call the 
> StreamITCase.clear method. This will cause it occasionally fails. Because the 
> result of one case may affect another sometimes.
> {code}
> java.lang.AssertionError: 
> Expected :MutableList(Anna#44,1, Anna#44,2, Anna#44,Anna#44, Jack#22,1, 
> Jack#22,2, Jack#22,Jack#22, John#19,1, John#19,2, John#19,John#19, nosharp,1, 
> nosharp,2, nosharp,nosharp)
> Actual   :MutableList(Anna#44,1, Anna#44,2, Anna#44,Anna#44, Jack#22,1, 
> Jack#22,2, Jack#22,Jack#22, John#19,1, John#19,2, John#19,John#19, 
> Miller,13.56, Smith,180.2, Williams,4.68, Williams,69.0, nosharp,1, 
> nosharp,2, nosharp,nosharp)
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.table.runtime.datastream.DataStreamUserDefinedFunctionITCase.testTableFunctionWithVariableArguments(DataStreamUserDefinedFunctionITCase.scala:226)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3530: [FLINK-6040] [table] DataStreamUserDefinedFunction...

2017-03-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3530


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6040) DataStreamUserDefinedFunctionITCase occasionally fails

2017-03-14 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-6040.
-
Resolution: Fixed

> DataStreamUserDefinedFunctionITCase occasionally fails
> --
>
> Key: FLINK-6040
> URL: https://issues.apache.org/jira/browse/FLINK-6040
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>Priority: Trivial
>  Labels: test
> Fix For: 1.3.0
>
>
> Three test cases in DataStreamUserDefinedFunctionITCase forgot to call the 
> StreamITCase.clear method. This will cause it occasionally fails. Because the 
> result of one case may affect another sometimes.
> {code}
> java.lang.AssertionError: 
> Expected :MutableList(Anna#44,1, Anna#44,2, Anna#44,Anna#44, Jack#22,1, 
> Jack#22,2, Jack#22,Jack#22, John#19,1, John#19,2, John#19,John#19, nosharp,1, 
> nosharp,2, nosharp,nosharp)
> Actual   :MutableList(Anna#44,1, Anna#44,2, Anna#44,Anna#44, Jack#22,1, 
> Jack#22,2, Jack#22,Jack#22, John#19,1, John#19,2, John#19,John#19, 
> Miller,13.56, Smith,180.2, Williams,4.68, Williams,69.0, nosharp,1, 
> nosharp,2, nosharp,nosharp)
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.table.runtime.datastream.DataStreamUserDefinedFunctionITCase.testTableFunctionWithVariableArguments(DataStreamUserDefinedFunctionITCase.scala:226)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3535: [FLINK-5713] Protect against NPE in WindowOperator...

2017-03-14 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3535#discussion_r106065855
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -354,22 +354,27 @@ public void merge(W mergeResult,
}
});
 
-   // drop if the window is already late
-   if (isLate(actualWindow)) {
-   
mergingWindows.retireWindow(actualWindow);
-   continue;
-   }
+   context.key = key;
+   context.window = actualWindow;
 
W stateWindow = 
mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window 
" + window + " is not in in-flight window set.");
}
 
windowState.setCurrentNamespace(stateWindow);
-   windowState.add(element.getValue());
 
-   context.key = key;
-   context.window = actualWindow;
+   // Drop if the window is already late. In rare 
cases (with a misbehaving
+   // WindowAssigner) it can happen that a window 
becomes late that already has
+   // state (contents, state and timers). That's 
why we first get the window state
+   // above and then drop everything.
+   if (isLate(actualWindow)) {
+   clearAllState(actualWindow, 
windowState, mergingWindows);
+   mergingWindows.persist();
--- End diff --

Yes,in fact,I had seen that, but I did not realize that it could be 
deleted.Haha, You are very quick-witted.:)
Anyway, I must say thanks for your explaining.
Best,
SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5713) Protect against NPE in WindowOperator window cleanup

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925371#comment-15925371
 ] 

ASF GitHub Bot commented on FLINK-5713:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3535#discussion_r106065855
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -354,22 +354,27 @@ public void merge(W mergeResult,
}
});
 
-   // drop if the window is already late
-   if (isLate(actualWindow)) {
-   
mergingWindows.retireWindow(actualWindow);
-   continue;
-   }
+   context.key = key;
+   context.window = actualWindow;
 
W stateWindow = 
mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window 
" + window + " is not in in-flight window set.");
}
 
windowState.setCurrentNamespace(stateWindow);
-   windowState.add(element.getValue());
 
-   context.key = key;
-   context.window = actualWindow;
+   // Drop if the window is already late. In rare 
cases (with a misbehaving
+   // WindowAssigner) it can happen that a window 
becomes late that already has
+   // state (contents, state and timers). That's 
why we first get the window state
+   // above and then drop everything.
+   if (isLate(actualWindow)) {
+   clearAllState(actualWindow, 
windowState, mergingWindows);
+   mergingWindows.persist();
--- End diff --

Yes,in fact,I had seen that, but I did not realize that it could be 
deleted.Haha, You are very quick-witted.:)
Anyway, I must say thanks for your explaining.
Best,
SunJincheng


> Protect against NPE in WindowOperator window cleanup
> 
>
> Key: FLINK-5713
> URL: https://issues.apache.org/jira/browse/FLINK-5713
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.2.1
>
>
> Some (misbehaved) WindowAssigners can cause windows to be dropped from the 
> merging window set while a cleanup timer is still active. This will trigger a 
> NullPointerException when that timer fires.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor

2017-03-14 Thread Syinchwun Leo (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925364#comment-15925364
 ] 

Syinchwun Leo commented on FLINK-5756:
--

OK, this problem is not only influence the performance of UDF windows but also 
the checkpoint. Poor window performance leads to many tuples waiting for being 
processed in IO buffer and the barrier could not be processed timely. This may 
result in failure of checkpoints.

> When there are many values under the same key in ListState, 
> RocksDBStateBackend performances poor
> -
>
> Key: FLINK-5756
> URL: https://issues.apache.org/jira/browse/FLINK-5756
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>
> When using RocksDB as the StateBackend, if there are many values under the 
> same key in ListState, the windowState.get() operator performances very poor. 
> I also the the RocksDB using version 4.11.2, the performance is also very 
> poor. The problem is likely to related to RocksDB itself's get() operator 
> after using merge(). The problem may influences the window operation's 
> performance when the size is very large using ListState. I try to merge 5 
> values under the same key in RocksDB, It costs 120 seconds to execute get() 
> operation.
> ///
> The flink's code is as follows:
> {code}
> class SEventSource extends RichSourceFunction [SEvent] {
>   private var count = 0L
>   private val alphabet = 
> "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
>   override def run(sourceContext: SourceContext[SEvent]): Unit = {
> while (true) {
>   for (i <- 0 until 5000) {
> sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
> count += 1L
>   }
>   Thread.sleep(1000)
> }
>   }
> }
> env.addSource(new SEventSource)
>   .assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[SEvent] {
> override def getCurrentWatermark: Watermark = {
>   new Watermark(System.currentTimeMillis())
> }
> override def extractTimestamp(t: SEvent, l: Long): Long = {
>   System.currentTimeMillis()
> }
>   })
>   .keyBy(0)
>   .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
>   .apply(new WindowStatistic)
>   .map(x => (System.currentTimeMillis(), x))
>   .print()
> {code}
> 
> The RocksDB Test code:
> {code}
> val stringAppendOperator = new StringAppendOperator
> val options = new Options()
> options.setCompactionStyle(CompactionStyle.LEVEL)
>   .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>   .setLevelCompactionDynamicLevelBytes(true)
>   .setIncreaseParallelism(4)
>   .setUseFsync(true)
>   .setMaxOpenFiles(-1)
>   .setCreateIfMissing(true)
>   .setMergeOperator(stringAppendOperator)
> val write_options = new WriteOptions
> write_options.setSync(false)
> val rocksDB = RocksDB.open(options, "/**/Data/")
> val key = "key"
> val value = 
> "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
> val beginmerge = System.currentTimeMillis()
> for(i <- 0 to 5) {
>   rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
>   //rocksDB.put(key.getBytes, value.getBytes)
> }
> println("finish")
> val begin = System.currentTimeMillis()
> rocksDB.get(key.getBytes)
> val end = System.currentTimeMillis()
> println("merge cost:" + (begin - beginmerge))
> println("Time consuming:" + (end - begin))
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6040) DataStreamUserDefinedFunctionITCase occasionally fails

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925359#comment-15925359
 ] 

ASF GitHub Bot commented on FLINK-6040:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3530
  
looks good, merging


> DataStreamUserDefinedFunctionITCase occasionally fails
> --
>
> Key: FLINK-6040
> URL: https://issues.apache.org/jira/browse/FLINK-6040
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>Priority: Trivial
>  Labels: test
> Fix For: 1.3.0
>
>
> Three test cases in DataStreamUserDefinedFunctionITCase forgot to call the 
> StreamITCase.clear method. This will cause it occasionally fails. Because the 
> result of one case may affect another sometimes.
> {code}
> java.lang.AssertionError: 
> Expected :MutableList(Anna#44,1, Anna#44,2, Anna#44,Anna#44, Jack#22,1, 
> Jack#22,2, Jack#22,Jack#22, John#19,1, John#19,2, John#19,John#19, nosharp,1, 
> nosharp,2, nosharp,nosharp)
> Actual   :MutableList(Anna#44,1, Anna#44,2, Anna#44,Anna#44, Jack#22,1, 
> Jack#22,2, Jack#22,Jack#22, John#19,1, John#19,2, John#19,John#19, 
> Miller,13.56, Smith,180.2, Williams,4.68, Williams,69.0, nosharp,1, 
> nosharp,2, nosharp,nosharp)
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.table.runtime.datastream.DataStreamUserDefinedFunctionITCase.testTableFunctionWithVariableArguments(DataStreamUserDefinedFunctionITCase.scala:226)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3530: [FLINK-6040] [table] DataStreamUserDefinedFunctionITCase ...

2017-03-14 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3530
  
looks good, merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything

2017-03-14 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925257#comment-15925257
 ] 

Bowen Li commented on FLINK-6053:
-

hm I found in Flink's Gauge class is used like this in 
`org.apache.flink.runtime.checkpoint.CheckpointStatsTracker`, which doesn't 
make much sense to me

```
private class LatestCompletedCheckpointExternalPathGauge implements 
Gauge {
@Override
public String getValue() {
CompletedCheckpointStats completed = 
latestSnapshot.getHistory().getLatestCompletedCheckpoint();
if (completed != null) {
return completed.getExternalPath();
} else {
return "n/a";
}
}
}
```

> Gauge should only take subclasses of Number, rather than everything
> --
>
> Key: FLINK-6053
> URL: https://issues.apache.org/jira/browse/FLINK-6053
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Bowen Li
> Fix For: 1.3.0
>
>
> Currently, Flink's Gauge is defined as 
> ```java
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```
> But it doesn't make sense to have Gauge take generic types other than Number. 
> And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is 
> only about Number. So the class should be like
> ```java
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6054) Add new state backend that dynamically stores data in memory and external storage

2017-03-14 Thread Sergio Esteves (JIRA)
Sergio Esteves created FLINK-6054:
-

 Summary: Add new state backend that dynamically stores data in 
memory and external storage
 Key: FLINK-6054
 URL: https://issues.apache.org/jira/browse/FLINK-6054
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Reporter: Sergio Esteves
Priority: Minor


This feature would be useful for memory-intensive applications that need to 
maintain state for long periods of time; e.g., event-time streaming application 
with long-lived windows that tolerate large amounts of lateness.

This feature would allow to scale the state and, in the example above, tolerate 
a very large (possibly unbounded) amount of lateness, which can be useful in a 
set of scenarios, like the one of Photon in the Google Advertising System 
(white paper: "Photon: Fault-tolerant and Scalable Joining of Continuous Data 
Streams").

In a nutshell, the idea would be to have a quota for the maximum memory that a 
state cell (different keys and namespaces) can occupy. When that quota gets 
fully occupied, new state data would be written out to disk. Then, when state 
needs to be retrieved, data is read entirely from memory - persisted data is 
loaded into memory in the background at the same time that data pertaining to 
the quota is being fetched (this reduces I/O overhead).

Different policies, defining when to offload/load data from/to memory, can be 
implemented to govern the overall memory utilization. We already have a 
preliminary implementation with promising results in terms of memory savings 
(in the context of streaming applications with windows that tolerate lateness).

More details are to be given soon through a design document.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything

2017-03-14 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-6053:

Description: 
Currently, Flink's Gauge is defined as 

```java
public interface Gauge extends Metric {
T getValue();
}
```

But it doesn't make sense to have Gauge take generic types other than Number. 
And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is 
only about Number. So the class should be like

```java
public interface Gauge extends Metric {
T getValue();
}
```

  was:
Currently, Flink's Gauge is defined as 

```java
/**
 * A Gauge is a {@link Metric} that calculates a specific value at a point in 
time.
 */
public interface Gauge extends Metric {
T getValue();
}
```

But it doesn't make sense to have Gauge take generic types other than Number. 
And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is 
only about Number. So the class should be like

```
/**
 * A Gauge is a {@link Metric} that calculates a specific value at a point in 
time.
 */
public interface Gauge extends Metric {
T getValue();
}
```


> Gauge should only take subclasses of Number, rather than everything
> --
>
> Key: FLINK-6053
> URL: https://issues.apache.org/jira/browse/FLINK-6053
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Bowen Li
> Fix For: 1.3.0
>
>
> Currently, Flink's Gauge is defined as 
> ```java
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```
> But it doesn't make sense to have Gauge take generic types other than Number. 
> And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is 
> only about Number. So the class should be like
> ```java
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything

2017-03-14 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-6053:

Description: 
Currently, Flink's Gauge is defined as 

```java
/**
 * A Gauge is a {@link Metric} that calculates a specific value at a point in 
time.
 */
public interface Gauge extends Metric {
T getValue();
}
```

But it doesn't make sense to have Gauge take generic types other than Number. 
And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is 
only about Number. So the class should be like

```
/**
 * A Gauge is a {@link Metric} that calculates a specific value at a point in 
time.
 */
public interface Gauge extends Metric {
T getValue();
}
```

  was:
Currently, Flink's Gauge is defined as 

```
/**
 * A Gauge is a {@link Metric} that calculates a specific value at a point in 
time.
 */
public interface Gauge extends Metric {
T getValue();
}
```

But it doesn't make sense to have Gauge take generic types other than Number. 
So the class should be like

```
/**
 * A Gauge is a {@link Metric} that calculates a specific value at a point in 
time.
 */
public interface Gauge extends Metric {
T getValue();
}
```


> Gauge should only take subclasses of Number, rather than everything
> --
>
> Key: FLINK-6053
> URL: https://issues.apache.org/jira/browse/FLINK-6053
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Bowen Li
> Fix For: 1.3.0
>
>
> Currently, Flink's Gauge is defined as 
> ```java
> /**
>  * A Gauge is a {@link Metric} that calculates a specific value at a point in 
> time.
>  */
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```
> But it doesn't make sense to have Gauge take generic types other than Number. 
> And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is 
> only about Number. So the class should be like
> ```
> /**
>  * A Gauge is a {@link Metric} that calculates a specific value at a point in 
> time.
>  */
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything

2017-03-14 Thread Bowen Li (JIRA)
Bowen Li created FLINK-6053:
---

 Summary: Gauge should only take subclasses of Number, rather 
than everything
 Key: FLINK-6053
 URL: https://issues.apache.org/jira/browse/FLINK-6053
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.2.0
Reporter: Bowen Li
 Fix For: 1.3.0


Currently, Flink's Gauge is defined as 

```
/**
 * A Gauge is a {@link Metric} that calculates a specific value at a point in 
time.
 */
public interface Gauge extends Metric {
T getValue();
}
```

But it doesn't make sense to have Gauge take generic types other than Number. 
So the class should be like

```
/**
 * A Gauge is a {@link Metric} that calculates a specific value at a point in 
time.
 */
public interface Gauge extends Metric {
T getValue();
}
```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3509: [FLINK-5808] Fix Missing verification for setParal...

2017-03-14 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3509#discussion_r106040063
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
 ---
@@ -38,14 +38,10 @@
private final ContextEnvironment ctx;
 
protected StreamContextEnvironment(ContextEnvironment ctx) {
+   super(GlobalConfiguration.loadConfiguration().getInteger(
--- End diff --

@StephanEwen ping 😉 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5808) Missing verification for setParallelism and setMaxParallelism

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925127#comment-15925127
 ] 

ASF GitHub Bot commented on FLINK-5808:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3509#discussion_r106040063
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
 ---
@@ -38,14 +38,10 @@
private final ContextEnvironment ctx;
 
protected StreamContextEnvironment(ContextEnvironment ctx) {
+   super(GlobalConfiguration.loadConfiguration().getInteger(
--- End diff --

@StephanEwen ping  


> Missing verification for setParallelism and setMaxParallelism
> -
>
> Key: FLINK-5808
> URL: https://issues.apache.org/jira/browse/FLINK-5808
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.0, 1.2.1
>
>
> When {{setParallelism()}} is called we don't verify that it is <= than max 
> parallelism. Likewise, when {{setMaxParallelism()}} is called we don't check 
> that the new value doesn't clash with a previously set parallelism.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924934#comment-15924934
 ] 

ASF GitHub Bot commented on FLINK-5985:
---

Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3523
  
It also doesnt seem to work starting from a clean state and then savepoint 
redeploy with changed topology so maybe I am really screwing up something


> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3523: [FLINK-5985] Report no task states for stateless tasks in...

2017-03-14 Thread gyfora
Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3523
  
It also doesnt seem to work starting from a clean state and then savepoint 
redeploy with changed topology so maybe I am really screwing up something


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3523: [FLINK-5985] Report no task states for stateless tasks in...

2017-03-14 Thread gyfora
Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3523
  
Hm, doesnt seem to work for the first try. What I did is I updated the 
client with the new jar based on your backport branch. Redeployed the job with 
a savepoint (to get the new Flink version), took a savepoint and tried to 
redeploy with the changed topology. 

I still seem to get the same error.

Is it possible that the previous checkpoints have an effect on this? In any 
case I will double check tomorrow morning and try to do the test again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924869#comment-15924869
 ] 

ASF GitHub Bot commented on FLINK-5985:
---

Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/3523
  
Hm, doesnt seem to work for the first try. What I did is I updated the 
client with the new jar based on your backport branch. Redeployed the job with 
a savepoint (to get the new Flink version), took a savepoint and tried to 
redeploy with the changed topology. 

I still seem to get the same error.

Is it possible that the previous checkpoints have an effect on this? In any 
case I will double check tomorrow morning and try to do the test again.


> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6052) Potential null access in ZooKeeperCompletedCheckpointStore#getLatestCheckpoint()

2017-03-14 Thread Ted Yu (JIRA)
Ted Yu created FLINK-6052:
-

 Summary: Potential null access in 
ZooKeeperCompletedCheckpointStore#getLatestCheckpoint()
 Key: FLINK-6052
 URL: https://issues.apache.org/jira/browse/FLINK-6052
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
Tuple2 
checkpointStateHandle = checkpointStateHandles.peekLast();

try {
  return retrieveCompletedCheckpoint(checkpointStateHandle);
} catch (Exception e) {
  LOG.warn("Could not retrieve latest checkpoint. Removing it from " +
"the completed checkpoint store.", e);

  try {
// remove the checkpoint with broken state handle
removeBrokenStateHandle(checkpointStateHandles.pollLast());
  } catch (Exception removeException) {
{code}
The code should handle the case where peekLast() / pollLast() returns null.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924834#comment-15924834
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/2425
  
@StephanEwen keep in mind that Flink's current SSL support in Flink doesn't 
achieve _mutual authentication_ - there's no client certificate there.With 
SSL enabled, an untrusted client can launch jobs in your Flink cluster and thus 
gain access to the Kerberos credential associated with the cluster.

SSL mutual authentication is a good alternative to a shared secret, but at 
the time we were limited to built-in Akka functionality (which doesn't include 
mutual auth).   Given the "flakka" fork that's now in place, a pure SSL 
solution might now be possible (I haven't thought it through completely).

The fact remains that, today, _all the secrets known to a Flink job are 
exposed to everyone who can connect to the cluster's endpoint_.  

It would be nice to construct a holistic plan that worked out how the Web 
UI would support authentication and also incorporated FLIP-6.  Both YARN 
and Mesos interpose a web proxy for the UI with its own limitations, notably no 
support for SSL mutual auth.



> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...

2017-03-14 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/2425
  
@StephanEwen keep in mind that Flink's current SSL support in Flink doesn't 
achieve _mutual authentication_ - there's no client certificate there.With 
SSL enabled, an untrusted client can launch jobs in your Flink cluster and thus 
gain access to the Kerberos credential associated with the cluster.

SSL mutual authentication is a good alternative to a shared secret, but at 
the time we were limited to built-in Akka functionality (which doesn't include 
mutual auth).   Given the "flakka" fork that's now in place, a pure SSL 
solution might now be possible (I haven't thought it through completely).

The fact remains that, today, _all the secrets known to a Flink job are 
exposed to everyone who can connect to the cluster's endpoint_.  

It would be nice to construct a holistic plan that worked out how the Web 
UI would support authentication and also incorporated FLIP-6.  Both YARN 
and Mesos interpose a web proxy for the UI with its own limitations, notably no 
support for SSL mutual auth.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6000) Can not start HA cluster with start-cluster.sh

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924751#comment-15924751
 ] 

ASF GitHub Bot commented on FLINK-6000:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/3506
  
@uce You replied to my message on mailing-list than maybe you could have a 
look at this PR ;)
Recently another user reported it at SO: 
http://stackoverflow.com/questions/42793598/flink-1-2-does-not-start-in-ha-cluster-mode


> Can not start HA cluster with start-cluster.sh
> --
>
> Key: FLINK-6000
> URL: https://issues.apache.org/jira/browse/FLINK-6000
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.2.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> Right know it is impossible to start a cluster in zookeeper HA mode as 
> described in the documentation by setting:
> in con/flink-conf.yaml:
> {code}
> high-availability: zookeeper
> ...
> {code}
> in conf/masters:
> {code}
> localhost:8081
> localhost:8082
> {code}
> The problem is with the {{bin/config.sh}} file. If value "zookeeper" is read 
> from config file the variable {{HIGH_AVAILABILITY}} will be reset to "none" 
> with the else branch. See the below code:
> {code}
> if [ -z "${HIGH_AVAILABILITY}" ]; then
>  HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" 
> "${YAML_CONF}")
>  if [ -z "${HIGH_AVAILABILITY}" ]; then
> # Try deprecated value
> DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")
> if [ -z "${DEPRECATED_HA}" ]; then
> HIGH_AVAILABILITY="none"
> elif [ ${DEPRECATED_HA} == "standalone" ]; then
> # Standalone is now 'none'
> HIGH_AVAILABILITY="none"
> else
> HIGH_AVAILABILITY=${DEPRECATED_HA}
> fi
>  else
>  HIGH_AVAILABILITY="none" <-- it exits here
>  fi
> fi
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3506: [FLINK-6000] Fix starting HA cluster with start-cluster.s...

2017-03-14 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/3506
  
@uce You replied to my message on mailing-list than maybe you could have a 
look at this PR ;)
Recently another user reported it at SO: 
http://stackoverflow.com/questions/42793598/flink-1-2-does-not-start-in-ha-cluster-mode


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-3398) Flink Kafka consumer should support auto-commit opt-outs

2017-03-14 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-3398.

Resolution: Fixed

Fixed for 1.3.0 with 
http://git-wip-us.apache.org/repos/asf/flink/commit/90c7415.

> Flink Kafka consumer should support auto-commit opt-outs
> 
>
> Key: FLINK-3398
> URL: https://issues.apache.org/jira/browse/FLINK-3398
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Shikhar Bhushan
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Currently the Kafka source will commit consumer offsets to Zookeeper, either 
> upon a checkpoint if checkpointing is enabled, otherwise periodically based 
> on {{auto.commit.interval.ms}}
> It should be possible to opt-out of committing consumer offsets to Zookeeper. 
> Kafka has this config as {{auto.commit.enable}} (0.8) and 
> {{enable.auto.commit}} (0.9).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6001) NPE on TumblingEventTimeWindows with ContinuousEventTimeTrigger and allowedLateness

2017-03-14 Thread Vladislav Pernin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924615#comment-15924615
 ] 

Vladislav Pernin commented on FLINK-6001:
-

Maybe related to FLINK-5713 ? I have to test.

> NPE on TumblingEventTimeWindows with ContinuousEventTimeTrigger and 
> allowedLateness
> ---
>
> Key: FLINK-6001
> URL: https://issues.apache.org/jira/browse/FLINK-6001
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Streaming
>Affects Versions: 1.2.0
>Reporter: Vladislav Pernin
>Priority: Critical
>
> I try to isolate the problem in a small and simple reproducer by extracting 
> the data from my real setup.
> I fails with NPE at :
> {noformat}
> java.lang.NullPointerException: null
>   at 
> org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger.onEventTime(ContinuousEventTimeTrigger.java:81)
>  ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0]
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onEventTime(WindowOperator.java:721)
>  ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0]
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:425)
>  ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0]
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276)
>  ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0]
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:858)
>  ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0]
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
>  ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>  ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
>  ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) 
> ~[flink-runtime_2.11-1.2.0.jar:1.2.0]
>   at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]
> {noformat}
> It fails only with the Thread.sleep. If you uncomment it, it won't fail.
> So, you may have to increase the sleep time depending of your environment.
> I know this is not a very rigourous test, but this is the only way I've found 
> to reproduce it.
> You can find the reproducer here :
> https://github.com/vpernin/flink-window-npe



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-5949) Flink on YARN checks for Kerberos credentials for non-Kerberos authentication methods

2017-03-14 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-5949.

Resolution: Fixed

Fixed for 1.3.0 with 
http://git-wip-us.apache.org/repos/asf/flink/commit/87779ad.

Fixed for 1.2.1 with 
http://git-wip-us.apache.org/repos/asf/flink/commit/0c532ed.

> Flink on YARN checks for Kerberos credentials for non-Kerberos authentication 
> methods
> -
>
> Key: FLINK-5949
> URL: https://issues.apache.org/jira/browse/FLINK-5949
> Project: Flink
>  Issue Type: Bug
>  Components: Security, YARN
>Affects Versions: 1.2.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.0, 1.2.1
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Yarn-and-MapR-Kerberos-issue-td11996.html
> The problem is that the Flink on YARN client incorrectly assumes 
> {{UserGroupInformation.isSecurityEnabled()}} returns {{true}} only for 
> Kerberos authentication modes, whereas it actually returns {{true}} for other 
> kinds of authentications too.
> We could make use of {{UserGroupInformation.getAuthenticationMethod()}} to 
> check for {{KERBEROS}} only.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924608#comment-15924608
 ] 

ASF GitHub Bot commented on FLINK-6006:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3507
  
Doing another Travis run locally before merging just to be safe:
https://travis-ci.org/tzulitai/flink/builds/211031758


> Kafka Consumer can lose state if queried partition list is incomplete on 
> restore
> 
>
> Key: FLINK-6006
> URL: https://issues.apache.org/jira/browse/FLINK-6006
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.1.5, 1.2.1
>
>
> In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying 
> on restore. Then, only restored state of partitions that exists in the 
> queried list is used to initialize the fetcher's state holders.
> If in any case the returned partition list is incomplete (i.e. missing 
> partitions that existed before, perhaps due to temporary ZK / broker 
> downtime), then the state of the missing partitions is dropped and cannot be 
> recovered anymore.
> In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 
> is affected.
> We can backport some of the behavioural changes there to 1.1 and 1.2. 
> Generally, we should not depend on the current partition list in Kafka when 
> restoring, but just restore all previous state into the fetcher's state 
> holders. 
> This would therefore also require some checking on how the consumer threads / 
> Kafka clients behave when its assigned partitions cannot be reached.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3507: [backport-1.1] [FLINK-6006] [kafka] Always use complete r...

2017-03-14 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3507
  
Doing another Travis run locally before merging just to be safe:
https://travis-ci.org/tzulitai/flink/builds/211031758


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5481) Simplify Row creation

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924605#comment-15924605
 ] 

ASF GitHub Bot commented on FLINK-5481:
---

Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r105972317
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -17,29 +17,34 @@
  */
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
SqlTimeTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.api.java.typeutils.{Types => JTypes}
 
 /**
   * This class enumerates all supported types of the Table API.
   */
-object Types {
+object Types extends JTypes {
 
-  val STRING = BasicTypeInfo.STRING_TYPE_INFO
-  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+  val STRING = JTypes.STRING
+  val BOOLEAN = JTypes.BOOLEAN
 
-  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
-  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
-  val INT = BasicTypeInfo.INT_TYPE_INFO
-  val LONG = BasicTypeInfo.LONG_TYPE_INFO
-  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
-  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
-  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+  val BYTE = JTypes.BYTE
+  val SHORT = JTypes.SHORT
+  val INT = JTypes.INT
+  val LONG = JTypes.LONG
+  val FLOAT = JTypes.FLOAT
+  val DOUBLE = JTypes.DOUBLE
+  val DECIMAL = JTypes.DECIMAL
 
-  val DATE = SqlTimeTypeInfo.DATE
-  val TIME = SqlTimeTypeInfo.TIME
-  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val DATE = JTypes.DATE
+  val TIME = JTypes.TIME
+  val TIMESTAMP = JTypes.TIMESTAMP
   val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
   val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
+  def ROW(types: TypeInformation[_]*) = JTypes.ROW(types: _*)
+
+  def ROW(fieldNames: Array[String], types: TypeInformation[_]*) =
--- End diff --

Done


> Simplify Row creation
> -
>
> Key: FLINK-5481
> URL: https://issues.apache.org/jira/browse/FLINK-5481
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Anton Solovev
>Assignee: Anton Solovev
>Priority: Trivial
>
> When we use {{ExecutionEnvironment#fromElements(X... data)}} it takes first 
> element of {{data}} to define a type. If first Row in collection has wrong 
> number of fields (there are nulls) TypeExtractor returns not RowTypeInfo, but 
> GenericType



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5481) Simplify Row creation

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924603#comment-15924603
 ] 

ASF GitHub Bot commented on FLINK-5481:
---

Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r105972293
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/Types.java ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+public class Types {
+
+   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
+   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
+   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
+   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
+   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+
+   public static final SqlTimeTypeInfo DATE = SqlTimeTypeInfo.DATE;
--- End diff --

Done


> Simplify Row creation
> -
>
> Key: FLINK-5481
> URL: https://issues.apache.org/jira/browse/FLINK-5481
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Anton Solovev
>Assignee: Anton Solovev
>Priority: Trivial
>
> When we use {{ExecutionEnvironment#fromElements(X... data)}} it takes first 
> element of {{data}} to define a type. If first Row in collection has wrong 
> number of fields (there are nulls) TypeExtractor returns not RowTypeInfo, but 
> GenericType



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation

2017-03-14 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r105972293
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/Types.java ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+public class Types {
+
+   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
+   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
+   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
+   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
+   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+
+   public static final SqlTimeTypeInfo DATE = SqlTimeTypeInfo.DATE;
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation

2017-03-14 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r105972317
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -17,29 +17,34 @@
  */
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
SqlTimeTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.api.java.typeutils.{Types => JTypes}
 
 /**
   * This class enumerates all supported types of the Table API.
   */
-object Types {
+object Types extends JTypes {
 
-  val STRING = BasicTypeInfo.STRING_TYPE_INFO
-  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+  val STRING = JTypes.STRING
+  val BOOLEAN = JTypes.BOOLEAN
 
-  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
-  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
-  val INT = BasicTypeInfo.INT_TYPE_INFO
-  val LONG = BasicTypeInfo.LONG_TYPE_INFO
-  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
-  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
-  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+  val BYTE = JTypes.BYTE
+  val SHORT = JTypes.SHORT
+  val INT = JTypes.INT
+  val LONG = JTypes.LONG
+  val FLOAT = JTypes.FLOAT
+  val DOUBLE = JTypes.DOUBLE
+  val DECIMAL = JTypes.DECIMAL
 
-  val DATE = SqlTimeTypeInfo.DATE
-  val TIME = SqlTimeTypeInfo.TIME
-  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val DATE = JTypes.DATE
+  val TIME = JTypes.TIME
+  val TIMESTAMP = JTypes.TIMESTAMP
   val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
   val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
+  def ROW(types: TypeInformation[_]*) = JTypes.ROW(types: _*)
+
+  def ROW(fieldNames: Array[String], types: TypeInformation[_]*) =
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1579) Create a Flink History Server

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924598#comment-15924598
 ] 

ASF GitHub Bot commented on FLINK-1579:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3460
  
@uce I've addressed most of your comments. Missing is some javadocs, 
renaming of the web-ui, bounding the size of the map in the ArchiveFetcher.


> Create a Flink History Server
> -
>
> Key: FLINK-1579
> URL: https://issues.apache.org/jira/browse/FLINK-1579
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> Right now its not possible to analyze the job results for jobs that ran on 
> YARN, because we'll loose the information once the JobManager has stopped.
> Therefore, I propose to implement a "Flink History Server" which serves  the 
> results from these jobs.
> I haven't started thinking about the implementation, but I suspect it 
> involves some JSON files stored in HDFS :)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3460
  
@uce I've addressed most of your comments. Missing is some javadocs, 
renaming of the web-ui, bounding the size of the map in the ArchiveFetcher.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3538: FLINK-6051: Correct metrics scope names

2017-03-14 Thread hadronzoo
GitHub user hadronzoo opened a pull request:

https://github.com/apache/flink/pull/3538

FLINK-6051: Correct metrics scope names

Closes FLINK-6051.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/orgsync/flink FLINK-6051

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3538.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3538


commit ab4332801428e14b8077a2a7669a51f8a1607ca3
Author: Joshua Griffith 
Date:   2017-03-14T17:10:12Z

Correct metrics scope names

Closes FLINK-6051.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924586#comment-15924586
 ] 

ASF GitHub Bot commented on FLINK-6006:
---

Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/3505


> Kafka Consumer can lose state if queried partition list is incomplete on 
> restore
> 
>
> Key: FLINK-6006
> URL: https://issues.apache.org/jira/browse/FLINK-6006
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.1.5, 1.2.1
>
>
> In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying 
> on restore. Then, only restored state of partitions that exists in the 
> queried list is used to initialize the fetcher's state holders.
> If in any case the returned partition list is incomplete (i.e. missing 
> partitions that existed before, perhaps due to temporary ZK / broker 
> downtime), then the state of the missing partitions is dropped and cannot be 
> recovered anymore.
> In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 
> is affected.
> We can backport some of the behavioural changes there to 1.1 and 1.2. 
> Generally, we should not depend on the current partition list in Kafka when 
> restoring, but just restore all previous state into the fetcher's state 
> holders. 
> This would therefore also require some checking on how the consumer threads / 
> Kafka clients behave when its assigned partitions cannot be reached.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6051) Wrong metric scope names in documentation

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924587#comment-15924587
 ] 

ASF GitHub Bot commented on FLINK-6051:
---

GitHub user hadronzoo opened a pull request:

https://github.com/apache/flink/pull/3538

FLINK-6051: Correct metrics scope names

Closes FLINK-6051.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/orgsync/flink FLINK-6051

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3538.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3538


commit ab4332801428e14b8077a2a7669a51f8a1607ca3
Author: Joshua Griffith 
Date:   2017-03-14T17:10:12Z

Correct metrics scope names

Closes FLINK-6051.




> Wrong metric scope names in documentation
> -
>
> Key: FLINK-6051
> URL: https://issues.apache.org/jira/browse/FLINK-6051
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Joshua Griffith
>Priority: Minor
>
> FLINK-4402 fixed the following metrics names:
> metrics.scope.tm.task → metrics.scope.task
> metrics.scope.tm.operator → metrics.scope.operator
> However, the [1.2.0 
> documentation|https://github.com/apache/flink/blob/dabeb74c10e755c655a06cdc8846dc7227d63cb9/docs/setup/config.md#metrics]
>  lists the incorrect metric names again.
> As a side note, it would be convenient to have all of the configuration 
> options documented in a machine readable format. To build docker images I 
> [process the config file with 
> sed|https://github.com/orgsync/docker-flink/blob/a08ec4d102b623bfba74a61d3012931e28ef92e7/Dockerfile#L36]
>  to extract the options and turn them into env vars, which is how I noticed 
> this issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3505: [backport-1.2] [FLINK-6006] [kafka] Always use com...

2017-03-14 Thread tzulitai
Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/3505


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3398) Flink Kafka consumer should support auto-commit opt-outs

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924570#comment-15924570
 ] 

ASF GitHub Bot commented on FLINK-3398:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3527


> Flink Kafka consumer should support auto-commit opt-outs
> 
>
> Key: FLINK-3398
> URL: https://issues.apache.org/jira/browse/FLINK-3398
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Shikhar Bhushan
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Currently the Kafka source will commit consumer offsets to Zookeeper, either 
> upon a checkpoint if checkpointing is enabled, otherwise periodically based 
> on {{auto.commit.interval.ms}}
> It should be possible to opt-out of committing consumer offsets to Zookeeper. 
> Kafka has this config as {{auto.commit.enable}} (0.8) and 
> {{enable.auto.commit}} (0.9).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5949) Flink on YARN checks for Kerberos credentials for non-Kerberos authentication methods

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924569#comment-15924569
 ] 

ASF GitHub Bot commented on FLINK-5949:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3528


> Flink on YARN checks for Kerberos credentials for non-Kerberos authentication 
> methods
> -
>
> Key: FLINK-5949
> URL: https://issues.apache.org/jira/browse/FLINK-5949
> Project: Flink
>  Issue Type: Bug
>  Components: Security, YARN
>Affects Versions: 1.2.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.0, 1.2.1
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Yarn-and-MapR-Kerberos-issue-td11996.html
> The problem is that the Flink on YARN client incorrectly assumes 
> {{UserGroupInformation.isSecurityEnabled()}} returns {{true}} only for 
> Kerberos authentication modes, whereas it actually returns {{true}} for other 
> kinds of authentications too.
> We could make use of {{UserGroupInformation.getAuthenticationMethod()}} to 
> check for {{KERBEROS}} only.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6051) Wrong metric scope names in documentation

2017-03-14 Thread Joshua Griffith (JIRA)
Joshua Griffith created FLINK-6051:
--

 Summary: Wrong metric scope names in documentation
 Key: FLINK-6051
 URL: https://issues.apache.org/jira/browse/FLINK-6051
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.2.0
Reporter: Joshua Griffith
Priority: Minor


FLINK-4402 fixed the following metrics names:
metrics.scope.tm.task → metrics.scope.task
metrics.scope.tm.operator → metrics.scope.operator

However, the [1.2.0 
documentation|https://github.com/apache/flink/blob/dabeb74c10e755c655a06cdc8846dc7227d63cb9/docs/setup/config.md#metrics]
 lists the incorrect metric names again.

As a side note, it would be convenient to have all of the configuration options 
documented in a machine readable format. To build docker images I [process the 
config file with 
sed|https://github.com/orgsync/docker-flink/blob/a08ec4d102b623bfba74a61d3012931e28ef92e7/Dockerfile#L36]
 to extract the options and turn them into env vars, which is how I noticed 
this issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3528: [FLINK-5949] [yarn] Don't check Kerberos credentia...

2017-03-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3528


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3527: [FLINK-3398] [kafka] Allow disabling offset commit...

2017-03-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3527


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3527: [FLINK-3398] [kafka] Allow disabling offset committing fo...

2017-03-14 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3527
  
Did a final test run on a Kafka installation, and things worked as expected.
One minor improvement would be to add logs for what exactly the commit mode 
is used when it is determined in `open()`.

I think it's a safe call to add the log and then merge this :-) Will 
proceed to merge for `master`.
Thanks for all the recent reviews @rmetzger :-D


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor

2017-03-14 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924537#comment-15924537
 ] 

Stephan Ewen commented on FLINK-5756:
-

I would suggest to see if we get a response from the RocksDB community. If we 
cannot expect a fix soon, we will have to build around that using the "range 
iterator" workaround described above.

> When there are many values under the same key in ListState, 
> RocksDBStateBackend performances poor
> -
>
> Key: FLINK-5756
> URL: https://issues.apache.org/jira/browse/FLINK-5756
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>
> When using RocksDB as the StateBackend, if there are many values under the 
> same key in ListState, the windowState.get() operator performances very poor. 
> I also the the RocksDB using version 4.11.2, the performance is also very 
> poor. The problem is likely to related to RocksDB itself's get() operator 
> after using merge(). The problem may influences the window operation's 
> performance when the size is very large using ListState. I try to merge 5 
> values under the same key in RocksDB, It costs 120 seconds to execute get() 
> operation.
> ///
> The flink's code is as follows:
> {code}
> class SEventSource extends RichSourceFunction [SEvent] {
>   private var count = 0L
>   private val alphabet = 
> "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
>   override def run(sourceContext: SourceContext[SEvent]): Unit = {
> while (true) {
>   for (i <- 0 until 5000) {
> sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
> count += 1L
>   }
>   Thread.sleep(1000)
> }
>   }
> }
> env.addSource(new SEventSource)
>   .assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[SEvent] {
> override def getCurrentWatermark: Watermark = {
>   new Watermark(System.currentTimeMillis())
> }
> override def extractTimestamp(t: SEvent, l: Long): Long = {
>   System.currentTimeMillis()
> }
>   })
>   .keyBy(0)
>   .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
>   .apply(new WindowStatistic)
>   .map(x => (System.currentTimeMillis(), x))
>   .print()
> {code}
> 
> The RocksDB Test code:
> {code}
> val stringAppendOperator = new StringAppendOperator
> val options = new Options()
> options.setCompactionStyle(CompactionStyle.LEVEL)
>   .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>   .setLevelCompactionDynamicLevelBytes(true)
>   .setIncreaseParallelism(4)
>   .setUseFsync(true)
>   .setMaxOpenFiles(-1)
>   .setCreateIfMissing(true)
>   .setMergeOperator(stringAppendOperator)
> val write_options = new WriteOptions
> write_options.setSync(false)
> val rocksDB = RocksDB.open(options, "/**/Data/")
> val key = "key"
> val value = 
> "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
> val beginmerge = System.currentTimeMillis()
> for(i <- 0 to 5) {
>   rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
>   //rocksDB.put(key.getBytes, value.getBytes)
> }
> println("finish")
> val begin = System.currentTimeMillis()
> rocksDB.get(key.getBytes)
> val end = System.currentTimeMillis()
> println("merge cost:" + (begin - beginmerge))
> println("Time consuming:" + (end - begin))
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer

2017-03-14 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924535#comment-15924535
 ] 

Nico Kruber commented on FLINK-4545:


We did some thinking and would probably add the following three new 
configuration parameters (with the given defaults) to finally replace the 
{{taskmanager.network.numberOfBuffers}} parameter:

- {{taskmanager.network.memory.fraction}} (default: 0.1): fraction of JVM 
memory to use for network buffers (by reducing {{taskmanager.memory.fraction}} 
from 0.7 to 0.6)
- {{taskmanager.network.memory.min}} (default: 64MB): minimum memory size for 
network buffers
- {{taskmanager.network.memory.max}} (default: 1GB): maximum memory size for 
network buffers

A fixed size may be achieved by setting the latter two to the same value, 
{{taskmanager.network.numberOfBuffers}} will be marked deprecated and used only 
if the other three are not given, e.g. due to old config files being used.

> Flink automatically manages TM network buffer
> -
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
>  Issue Type: Wish
>  Components: Network
>Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and 
> the memory is pre-allocated through taskmanager.network.numberOfBuffers 
> config. In a Job DAG with shuffle phase, this number can go up very high 
> depends on the TM cluster size. The formula for calculating the buffer count 
> is documented here 
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>   
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster 
> size dynamically and then leverage the up-coming Flink feature to support 
> scaling job parallelism/rescaling at runtime. 
> If the buffer count config is static at runtime and cannot be changed without 
> restarting task manager process, this may add latency and complexity for 
> scaling process. I am wondering if there is already any discussion around 
> whether the network buffer should be automatically managed by Flink or at 
> least expose some API to allow it to be reconfigured. Let me know if there is 
> any existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3528: [FLINK-5949] [yarn] Don't check Kerberos credentials for ...

2017-03-14 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3528
  
Thanks for the review :-)
Failing tests seem to be something instable with Maven.
Merging this to `master` and `release-1.2` ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6050) Improve failure reporting when using Future.thenAccept

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924518#comment-15924518
 ] 

ASF GitHub Bot commented on FLINK-6050:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3537

[FLINK-6050] [robustness] Register exception handler on thenAccept futures

When applying an AcceptFunction on a Future x, then we should register the 
exception handler
on the returned thenAccept future instead of on x. This has the advantage 
that we also catch
exceptions which are thrown inside of the AcceptFunction and not only those 
which originate
from x. The PR adapts the code respectively.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink hardenAcceptFutureCalls

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3537.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3537






> Improve failure reporting when using Future.thenAccept
> --
>
> Key: FLINK-6050
> URL: https://issues.apache.org/jira/browse/FLINK-6050
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> When applying {{Future.thenAccept(Async)}} onto a {{Future}}, then we should 
> register the exception handler on the returned {{Future}} and not on 
> the original future. This has the advantage that we also catch exceptions 
> which are thrown in the {{AcceptFunction}} and not only those originating 
> from the original {{Future}}. This improve Flink's behaviour, because 
> exceptions are not swallowed in the returned {{Future}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3537: [FLINK-6050] [robustness] Register exception handl...

2017-03-14 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3537

[FLINK-6050] [robustness] Register exception handler on thenAccept futures

When applying an AcceptFunction on a Future x, then we should register the 
exception handler
on the returned thenAccept future instead of on x. This has the advantage 
that we also catch
exceptions which are thrown inside of the AcceptFunction and not only those 
which originate
from x. The PR adapts the code respectively.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink hardenAcceptFutureCalls

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3537.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3537






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3398) Flink Kafka consumer should support auto-commit opt-outs

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924511#comment-15924511
 ] 

ASF GitHub Bot commented on FLINK-3398:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3527
  
Did a final test run on a Kafka installation, and things worked as expected.
One minor improvement would be to add logs for what exactly the commit mode 
is used when it is determined in `open()`.

I think it's a safe call to add the log and then merge this :-) Will 
proceed to merge for `master`.
Thanks for all the recent reviews @rmetzger :-D


> Flink Kafka consumer should support auto-commit opt-outs
> 
>
> Key: FLINK-3398
> URL: https://issues.apache.org/jira/browse/FLINK-3398
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Shikhar Bhushan
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Currently the Kafka source will commit consumer offsets to Zookeeper, either 
> upon a checkpoint if checkpointing is enabled, otherwise periodically based 
> on {{auto.commit.interval.ms}}
> It should be possible to opt-out of committing consumer offsets to Zookeeper. 
> Kafka has this config as {{auto.commit.enable}} (0.8) and 
> {{enable.auto.commit}} (0.9).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6050) Improve failure reporting when using Future.thenAccept

2017-03-14 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-6050:


 Summary: Improve failure reporting when using Future.thenAccept
 Key: FLINK-6050
 URL: https://issues.apache.org/jira/browse/FLINK-6050
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.3.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor


When applying {{Future.thenAccept(Async)}} onto a {{Future}}, then we should 
register the exception handler on the returned {{Future}} and not on the 
original future. This has the advantage that we also catch exceptions which are 
thrown in the {{AcceptFunction}} and not only those originating from the 
original {{Future}}. This improve Flink's behaviour, because exceptions are not 
swallowed in the returned {{Future}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3505: [backport-1.2] [FLINK-6006] [kafka] Always use complete r...

2017-03-14 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3505
  
Merging ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924512#comment-15924512
 ] 

ASF GitHub Bot commented on FLINK-6006:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3505
  
Merging ..


> Kafka Consumer can lose state if queried partition list is incomplete on 
> restore
> 
>
> Key: FLINK-6006
> URL: https://issues.apache.org/jira/browse/FLINK-6006
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.1.5, 1.2.1
>
>
> In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying 
> on restore. Then, only restored state of partitions that exists in the 
> queried list is used to initialize the fetcher's state holders.
> If in any case the returned partition list is incomplete (i.e. missing 
> partitions that existed before, perhaps due to temporary ZK / broker 
> downtime), then the state of the missing partitions is dropped and cannot be 
> recovered anymore.
> In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 
> is affected.
> We can backport some of the behavioural changes there to 1.1 and 1.2. 
> Generally, we should not depend on the current partition list in Kafka when 
> restoring, but just restore all previous state into the fetcher's state 
> holders. 
> This would therefore also require some checking on how the consumer threads / 
> Kafka clients behave when its assigned partitions cannot be reached.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5713) Protect against NPE in WindowOperator window cleanup

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924507#comment-15924507
 ] 

ASF GitHub Bot commented on FLINK-5713:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3535#discussion_r105956185
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -354,22 +354,27 @@ public void merge(W mergeResult,
}
});
 
-   // drop if the window is already late
-   if (isLate(actualWindow)) {
-   
mergingWindows.retireWindow(actualWindow);
-   continue;
-   }
+   context.key = key;
+   context.window = actualWindow;
 
W stateWindow = 
mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window 
" + window + " is not in in-flight window set.");
}
 
windowState.setCurrentNamespace(stateWindow);
-   windowState.add(element.getValue());
 
-   context.key = key;
-   context.window = actualWindow;
+   // Drop if the window is already late. In rare 
cases (with a misbehaving
+   // WindowAssigner) it can happen that a window 
becomes late that already has
+   // state (contents, state and timers). That's 
why we first get the window state
+   // above and then drop everything.
+   if (isLate(actualWindow)) {
+   clearAllState(actualWindow, 
windowState, mergingWindows);
+   mergingWindows.persist();
--- End diff --

The null check is not needed here since since we know from the if block 
we're in (`if (windowAssigner instanceof MergingWindowAssigner)`) that we do in 
fact have merging windows.

Now that I look at it, though, I realise that the 
`mergingWindows.persist()` call is not necessary because we already call it at 
the end of the if block. So thanks for making me notice!  

I moved the call out of `clearAllState()` in the first place because all 
places where `clearAllState()` are called already persist afterwards. See, for 
example, `onEventTime()` and `onProcessingTime()`.



> Protect against NPE in WindowOperator window cleanup
> 
>
> Key: FLINK-5713
> URL: https://issues.apache.org/jira/browse/FLINK-5713
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.2.1
>
>
> Some (misbehaved) WindowAssigners can cause windows to be dropped from the 
> merging window set while a cleanup timer is still active. This will trigger a 
> NullPointerException when that timer fires.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3535: [FLINK-5713] Protect against NPE in WindowOperator...

2017-03-14 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3535#discussion_r105956185
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -354,22 +354,27 @@ public void merge(W mergeResult,
}
});
 
-   // drop if the window is already late
-   if (isLate(actualWindow)) {
-   
mergingWindows.retireWindow(actualWindow);
-   continue;
-   }
+   context.key = key;
+   context.window = actualWindow;
 
W stateWindow = 
mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window 
" + window + " is not in in-flight window set.");
}
 
windowState.setCurrentNamespace(stateWindow);
-   windowState.add(element.getValue());
 
-   context.key = key;
-   context.window = actualWindow;
+   // Drop if the window is already late. In rare 
cases (with a misbehaving
+   // WindowAssigner) it can happen that a window 
becomes late that already has
+   // state (contents, state and timers). That's 
why we first get the window state
+   // above and then drop everything.
+   if (isLate(actualWindow)) {
+   clearAllState(actualWindow, 
windowState, mergingWindows);
+   mergingWindows.persist();
--- End diff --

The null check is not needed here since since we know from the if block 
we're in (`if (windowAssigner instanceof MergingWindowAssigner)`) that we do in 
fact have merging windows.

Now that I look at it, though, I realise that the 
`mergingWindows.persist()` call is not necessary because we already call it at 
the end of the if block. So thanks for making me notice! 😃 

I moved the call out of `clearAllState()` in the first place because all 
places where `clearAllState()` are called already persist afterwards. See, for 
example, `onEventTime()` and `onProcessingTime()`.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5949) Flink on YARN checks for Kerberos credentials for non-Kerberos authentication methods

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924505#comment-15924505
 ] 

ASF GitHub Bot commented on FLINK-5949:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3528
  
Thanks for the review :-)
Failing tests seem to be something instable with Maven.
Merging this to `master` and `release-1.2` ..


> Flink on YARN checks for Kerberos credentials for non-Kerberos authentication 
> methods
> -
>
> Key: FLINK-5949
> URL: https://issues.apache.org/jira/browse/FLINK-5949
> Project: Flink
>  Issue Type: Bug
>  Components: Security, YARN
>Affects Versions: 1.2.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.0, 1.2.1
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Yarn-and-MapR-Kerberos-issue-td11996.html
> The problem is that the Flink on YARN client incorrectly assumes 
> {{UserGroupInformation.isSecurityEnabled()}} returns {{true}} only for 
> Kerberos authentication modes, whereas it actually returns {{true}} for other 
> kinds of authentications too.
> We could make use of {{UserGroupInformation.getAuthenticationMethod()}} to 
> check for {{KERBEROS}} only.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5846) CEP: make the operators backwards compatible.

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924489#comment-15924489
 ] 

ASF GitHub Bot commented on FLINK-5846:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3445#discussion_r105936184
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTestBase.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for the migration tests between different versions.
+ * See {@link CEPMigration11to13Test} for an example of migration test
+ * between Flink-1.1 and Flink-1.3.
+ * */
--- End diff --

Should have a newline, or at least not two stars on one line  


> CEP: make the operators backwards compatible.
> -
>
> Key: FLINK-5846
> URL: https://issues.apache.org/jira/browse/FLINK-5846
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> This targets making the new CEP operators compatible with their previous 
> versions from Flink 1.1 and Flink 1.2.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3445: [FLINK-5846] [cep] Make the CEP operators backward...

2017-03-14 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3445#discussion_r105936184
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTestBase.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for the migration tests between different versions.
+ * See {@link CEPMigration11to13Test} for an example of migration test
+ * between Flink-1.1 and Flink-1.3.
+ * */
--- End diff --

Should have a newline, or at least not two stars on one line 😉 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5846) CEP: make the operators backwards compatible.

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924490#comment-15924490
 ] 

ASF GitHub Bot commented on FLINK-5846:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3445#discussion_r105937053
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.junit.Test;
+
+public class CEPMigration11to13Test extends CEPMigrationTestBase {
+
+   @Test
+   public void testKeyedCEPFunctionMigration11() throws Exception {
+   testKeyedCEPFunctionMigration("cep-keyed-savepoint-1.1");
+   }
+
+   @Test
+   public void testNonKeyedCEPFunctionMigration11() throws Exception {
+   testNonKeyedCEPFunctionMigration("cep-non-keyed-savepoint-1.1");
+   }
+}
+
+/*
+FLINK 1.1 CODE TO PRODUCE THE SAVEPOINTS. (INCLUDE ALSO THE PATTERN CODE 
AT THE BOTTOM FOR BOTH CASES)
+
+@Test
+public void keyedCEPOperatorSavepointGen() throws Exception {
+
+   KeySelector keySelector = new KeySelector() {
+  private static final long serialVersionUID = -4873366487571254798L;
+
+  @Override
+  public Integer getKey(Event value) throws Exception {
+ return value.getId();
+  }
+   };
+
+   OneInputStreamOperatorTestHarness> harness = 
new OneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+   Event.createTypeSerializer(),
+   false,
+   keySelector,
+   IntSerializer.INSTANCE,
+   new NFAFactory()));
+   harness.configureForKeyedStream(keySelector, 
BasicTypeInfo.INT_TYPE_INFO);
+   harness.open();
+
+   Event startEvent = new Event(42, "start", 1.0);
+   SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+   Event endEvent=  new Event(42, "end", 1.0);
+
+   harness.processElement(new StreamRecord(startEvent, 1));
+   harness.processElement(new StreamRecord(new Event(42, "foobar", 
1.0), 2));
+   harness.processElement(new StreamRecord(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
+
+   harness.processWatermark(new Watermark(2));
+
+   // simulate snapshot/restore with empty element queue but NFA state
+   StreamTaskState snapshot = harness.snapshot(1, 1);
+
+   FileOutputStream out = new 
FileOutputStream("/Users/kkloudas/Desktop/cep-keyed-savepoint-1.1");
--- End diff --

This should be something like 
`"src/test/resources/cep-keyed-savepoint-1.1"`, same for the other path down 
below. With this, it would generate directly into the directory where the test 
currently is and where the test expects to find it.


> CEP: make the operators backwards compatible.
> -
>
> Key: FLINK-5846
> URL: https://issues.apache.org/jira/browse/FLINK-5846
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> This targets making the new CEP operators compatible with their previous 
> versions from Flink 1.1 and Flink 1.2.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3445: [FLINK-5846] [cep] Make the CEP operators backward...

2017-03-14 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3445#discussion_r105937053
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.junit.Test;
+
+public class CEPMigration11to13Test extends CEPMigrationTestBase {
+
+   @Test
+   public void testKeyedCEPFunctionMigration11() throws Exception {
+   testKeyedCEPFunctionMigration("cep-keyed-savepoint-1.1");
+   }
+
+   @Test
+   public void testNonKeyedCEPFunctionMigration11() throws Exception {
+   testNonKeyedCEPFunctionMigration("cep-non-keyed-savepoint-1.1");
+   }
+}
+
+/*
+FLINK 1.1 CODE TO PRODUCE THE SAVEPOINTS. (INCLUDE ALSO THE PATTERN CODE 
AT THE BOTTOM FOR BOTH CASES)
+
+@Test
+public void keyedCEPOperatorSavepointGen() throws Exception {
+
+   KeySelector keySelector = new KeySelector() {
+  private static final long serialVersionUID = -4873366487571254798L;
+
+  @Override
+  public Integer getKey(Event value) throws Exception {
+ return value.getId();
+  }
+   };
+
+   OneInputStreamOperatorTestHarness> harness = 
new OneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+   Event.createTypeSerializer(),
+   false,
+   keySelector,
+   IntSerializer.INSTANCE,
+   new NFAFactory()));
+   harness.configureForKeyedStream(keySelector, 
BasicTypeInfo.INT_TYPE_INFO);
+   harness.open();
+
+   Event startEvent = new Event(42, "start", 1.0);
+   SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+   Event endEvent=  new Event(42, "end", 1.0);
+
+   harness.processElement(new StreamRecord(startEvent, 1));
+   harness.processElement(new StreamRecord(new Event(42, "foobar", 
1.0), 2));
+   harness.processElement(new StreamRecord(new SubEvent(42, 
"barfoo", 1.0, 5.0), 3));
+
+   harness.processWatermark(new Watermark(2));
+
+   // simulate snapshot/restore with empty element queue but NFA state
+   StreamTaskState snapshot = harness.snapshot(1, 1);
+
+   FileOutputStream out = new 
FileOutputStream("/Users/kkloudas/Desktop/cep-keyed-savepoint-1.1");
--- End diff --

This should be something like 
`"src/test/resources/cep-keyed-savepoint-1.1"`, same for the other path down 
below. With this, it would generate directly into the directory where the test 
currently is and where the test expects to find it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6049) Parallelize execution of (async) snapshots in AsyncCheckpointRunnable

2017-03-14 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-6049:
-

 Summary: Parallelize execution of (async) snapshots in 
AsyncCheckpointRunnable
 Key: FLINK-6049
 URL: https://issues.apache.org/jira/browse/FLINK-6049
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.3.0
Reporter: Stefan Richter
 Fix For: 1.3.0


After the changes from [FLINK-6048], it would make sense to parallelize the 
execution of the `RunnableFuture`s of (async) snapshot in 
`AsyncCheckpointRunnable`. Currently, `AsyncCheckpointRunnable` is a thread 
that runs parallel, but executes each snapshot sequentially. The reason is, 
that previously only keyed state backends had support for async snapshots.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924477#comment-15924477
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2425
  
@StephanEwen The shared secret serves can be considered as an additional 
security extension on top of TLS integration, thus it designates only an 
authorized identity to execute actions on a running cluster.


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...

2017-03-14 Thread vijikarthi
Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2425
  
@StephanEwen The shared secret serves can be considered as an additional 
security extension on top of TLS integration, thus it designates only an 
authorized identity to execute actions on a running cluster.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6048) Asynchronous snapshots for heap-based operator state backends

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924467#comment-15924467
 ] 

ASF GitHub Bot commented on FLINK-6048:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3536
  
CC @aljoscha @StephanEwen 


> Asynchronous snapshots for heap-based operator state backends
> -
>
> Key: FLINK-6048
> URL: https://issues.apache.org/jira/browse/FLINK-6048
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> The synchronous checkpointing mechanism of heap-based operator state backends 
> blocks element processing for the duration of the checkpoint.
> We could implement an heap-based operator state backend that allows for 
> asynchronous checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6048) Asynchronous snapshots for heap-based operator state backends

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924466#comment-15924466
 ] 

ASF GitHub Bot commented on FLINK-6048:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/3536

[FLINK-6048] Asynchronous snapshots for heap-based OperatorStateBackend

This PR introduces asynchronous snapshots for the heap-based 
`DefaultOperatorStateBackend`. Compared to the asynchronous snapshots for the 
heap-based keyed state backend, this implementation is rather simple and 
eagerly generates a deep in-memory copy of the state before running the 
asynchronous part of the snapshot that writes to the filesystem.

Note that this PR should later sit on top of PR #3483 and piggyback on the 
async-flag that was introduced.

Furthermore, we could have a followup that actually parallelizes 
checkpointing the different async backends in `AsyncCheckpointRunnable`. 
Previously, this was not needed because there have only been keyed state 
backends o those have been the only async backends.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink async-opstatebackend

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3536.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3536


commit ff0930066fd6f9a5d54c548eff73fc4f34141b6a
Author: Stefan Richter 
Date:   2017-03-14T13:25:57Z

[FLINK-6048] Implement async snapshots for DefaultOperatorStateBackend

commit f38fdf7524039f2d87a4594d275959d874c4a198
Author: Stefan Richter 
Date:   2017-03-14T15:07:06Z

Unit tests for [FLINK-6048]




> Asynchronous snapshots for heap-based operator state backends
> -
>
> Key: FLINK-6048
> URL: https://issues.apache.org/jira/browse/FLINK-6048
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> The synchronous checkpointing mechanism of heap-based operator state backends 
> blocks element processing for the duration of the checkpoint.
> We could implement an heap-based operator state backend that allows for 
> asynchronous checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3536: [FLINK-6048] Asynchronous snapshots for heap-based Operat...

2017-03-14 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3536
  
CC @aljoscha @StephanEwen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3536: [FLINK-6048] Asynchronous snapshots for heap-based...

2017-03-14 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/3536

[FLINK-6048] Asynchronous snapshots for heap-based OperatorStateBackend

This PR introduces asynchronous snapshots for the heap-based 
`DefaultOperatorStateBackend`. Compared to the asynchronous snapshots for the 
heap-based keyed state backend, this implementation is rather simple and 
eagerly generates a deep in-memory copy of the state before running the 
asynchronous part of the snapshot that writes to the filesystem.

Note that this PR should later sit on top of PR #3483 and piggyback on the 
async-flag that was introduced.

Furthermore, we could have a followup that actually parallelizes 
checkpointing the different async backends in `AsyncCheckpointRunnable`. 
Previously, this was not needed because there have only been keyed state 
backends o those have been the only async backends.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink async-opstatebackend

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3536.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3536


commit ff0930066fd6f9a5d54c548eff73fc4f34141b6a
Author: Stefan Richter 
Date:   2017-03-14T13:25:57Z

[FLINK-6048] Implement async snapshots for DefaultOperatorStateBackend

commit f38fdf7524039f2d87a4594d275959d874c4a198
Author: Stefan Richter 
Date:   2017-03-14T15:07:06Z

Unit tests for [FLINK-6048]




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6048) Asynchronous snapshots for heap-based operator state backends

2017-03-14 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-6048:
-

 Summary: Asynchronous snapshots for heap-based operator state 
backends
 Key: FLINK-6048
 URL: https://issues.apache.org/jira/browse/FLINK-6048
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Affects Versions: 1.3.0
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.3.0


The synchronous checkpointing mechanism of heap-based operator state backends 
blocks element processing for the duration of the checkpoint.
We could implement an heap-based operator state backend that allows for 
asynchronous checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6047) Master Jira for "Retraction for Flink Streaming"

2017-03-14 Thread Shaoxuan Wang (JIRA)
Shaoxuan Wang created FLINK-6047:


 Summary: Master Jira for "Retraction for Flink Streaming"
 Key: FLINK-6047
 URL: https://issues.apache.org/jira/browse/FLINK-6047
 Project: Flink
  Issue Type: New Feature
Reporter: Shaoxuan Wang
Assignee: Shaoxuan Wang


[Design doc]:
https://docs.google.com/document/d/18XlGPcfsGbnPSApRipJDLPg5IFNGTQjnz7emkVpZlkw

[Introduction]:
"Retraction" is an important building block for data streaming to refine the 
early fired results in streaming. “Early firing” are very common and widely 
used in many streaming scenarios, for instance “window-less” or unbounded 
aggregate and stream-stream inner join, windowed (with early firing) aggregate 
and stream-stream inner join. There are mainly two cases that require 
retractions: 1) update on the keyed table (the key is either a primaryKey (PK) 
on source table, or a groupKey/partitionKey in an aggregate); 2) When dynamic 
windows (e.g., session window) are in use, the new value may be replacing more 
than one previous window due to window merging. 

To the best of our knowledge, the retraction for the early fired streaming 
results has never been practically solved before. In this proposal, we develop 
a retraction solution and explain how it works for the problem of “update on 
the keyed table”. The same solution can be easily extended for the dynamic 
windows merging, as the key component of retraction - how to refine an early 
fired results - is the same across different problems.  

[Proposed Jiras]:
Implement decoration phase for predicated logical plan rewriting after volcano 
optimization phase
Add source with table primary key and replace table property
Add sink tableInsert and NeedRetract property
Implement the retraction for partitioned unbounded over window aggregate
Implement the retraction for stream-stream inner join
Implement the retraction for the early firing window
Implement the retraction for the dynamic window with early firing





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105943415
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 
HistoryServerOptions#HISTORY_SERVER_DIRS} for
+* new job archives.
+*/
+   static class JobArchiveFetcherTask extends TimerTask {
+   private final List refreshDirs;
+   /** Map containing the JobID of all fetched jobs and the 
refreshDir from with they originate. */
+   private final Map cachedArchives;
+   private final File webDir;
+   private final File webTmpDir;
+   private final File webJobDir;
+   private final File webOverviewDir;
+
+   private static final String JSON_FILE_ENDING = ".json";
+
+   JobArchiveFetcherTask(List 
refreshDirs, File webDir) {
  

[jira] [Commented] (FLINK-1579) Create a Flink History Server

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924427#comment-15924427
 ] 

ASF GitHub Bot commented on FLINK-1579:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105943415
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 
HistoryServerOptions#HISTORY_SERVER_DIRS} for
+* new job archives.
+*/
+   static class JobArchiveFetcherTask extends TimerTask {
+   private final List refreshDirs;
+   /** Map containing the JobID of all fetched jobs and the 
refreshDir from with they originate. */
+   private final Map cachedArchives;
+   private final File webDir;
+   private final File webTmpDir;
+   

[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924428#comment-15924428
 ] 

ASF GitHub Bot commented on FLINK-5985:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3523
  
Sure, I just quickly prepared a backport here: 

https://github.com/StefanRRichter/flink/tree/FLINK-5985-backport-to-1.2


> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3523: [FLINK-5985] Report no task states for stateless tasks in...

2017-03-14 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3523
  
Sure, I just quickly prepared a backport here: 

https://github.com/StefanRRichter/flink/tree/FLINK-5985-backport-to-1.2


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105937756
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import io.netty.channel.ChannelHandler;
+import 
org.apache.flink.runtime.webmonitor.files.AbstractStaticFileServerHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+@ChannelHandler.Sharable
+public class HistoryServerStaticFileServerHandler extends 
AbstractStaticFileServerHandler {
--- End diff --

all right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3535: [FLINK-5713] Protect against NPE in WindowOperator...

2017-03-14 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3535#discussion_r105931848
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -354,22 +354,27 @@ public void merge(W mergeResult,
}
});
 
-   // drop if the window is already late
-   if (isLate(actualWindow)) {
-   
mergingWindows.retireWindow(actualWindow);
-   continue;
-   }
+   context.key = key;
+   context.window = actualWindow;
 
W stateWindow = 
mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window 
" + window + " is not in in-flight window set.");
}
 
windowState.setCurrentNamespace(stateWindow);
-   windowState.add(element.getValue());
 
-   context.key = key;
-   context.window = actualWindow;
+   // Drop if the window is already late. In rare 
cases (with a misbehaving
+   // WindowAssigner) it can happen that a window 
becomes late that already has
+   // state (contents, state and timers). That's 
why we first get the window state
+   // above and then drop everything.
+   if (isLate(actualWindow)) {
+   clearAllState(actualWindow, 
windowState, mergingWindows);
+   mergingWindows.persist();
--- End diff --

Why  move `mergingWindows.persist()` from  `clearAllState`  to here, And we 
need not do the null check? How about 
```
if (mergingWindows != null) {
mergingWindows.persist();
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1579) Create a Flink History Server

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924397#comment-15924397
 ] 

ASF GitHub Bot commented on FLINK-1579:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105937756
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import io.netty.channel.ChannelHandler;
+import 
org.apache.flink.runtime.webmonitor.files.AbstractStaticFileServerHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+@ChannelHandler.Sharable
+public class HistoryServerStaticFileServerHandler extends 
AbstractStaticFileServerHandler {
--- End diff --

all right.


> Create a Flink History Server
> -
>
> Key: FLINK-1579
> URL: https://issues.apache.org/jira/browse/FLINK-1579
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> Right now its not possible to analyze the job results for jobs that ran on 
> YARN, because we'll loose the information once the JobManager has stopped.
> Therefore, I propose to implement a "Flink History Server" which serves  the 
> results from these jobs.
> I haven't started thinking about the implementation, but I suspect it 
> involves some JSON files stored in HDFS :)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5713) Protect against NPE in WindowOperator window cleanup

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924386#comment-15924386
 ] 

ASF GitHub Bot commented on FLINK-5713:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3535#discussion_r105931848
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -354,22 +354,27 @@ public void merge(W mergeResult,
}
});
 
-   // drop if the window is already late
-   if (isLate(actualWindow)) {
-   
mergingWindows.retireWindow(actualWindow);
-   continue;
-   }
+   context.key = key;
+   context.window = actualWindow;
 
W stateWindow = 
mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window 
" + window + " is not in in-flight window set.");
}
 
windowState.setCurrentNamespace(stateWindow);
-   windowState.add(element.getValue());
 
-   context.key = key;
-   context.window = actualWindow;
+   // Drop if the window is already late. In rare 
cases (with a misbehaving
+   // WindowAssigner) it can happen that a window 
becomes late that already has
+   // state (contents, state and timers). That's 
why we first get the window state
+   // above and then drop everything.
+   if (isLate(actualWindow)) {
+   clearAllState(actualWindow, 
windowState, mergingWindows);
+   mergingWindows.persist();
--- End diff --

Why  move `mergingWindows.persist()` from  `clearAllState`  to here, And we 
need not do the null check? How about 
```
if (mergingWindows != null) {
mergingWindows.persist();
}
```


> Protect against NPE in WindowOperator window cleanup
> 
>
> Key: FLINK-5713
> URL: https://issues.apache.org/jira/browse/FLINK-5713
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.2.1
>
>
> Some (misbehaved) WindowAssigners can cause windows to be dropped from the 
> merging window set while a cleanup timer is still active. This will trigger a 
> NullPointerException when that timer fires.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-1579) Create a Flink History Server

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924377#comment-15924377
 ] 

ASF GitHub Bot commented on FLINK-1579:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105934771
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/AbstractStaticFileServerHandler.java
 ---
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.files;
+

+/*
+ * This code is based on the "HttpStaticFileServerHandler" from the
+ * Netty project's HTTP server example.
+ *
+ * See http://netty.io and
+ * 
https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
+ 
*/
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpChunkedInput;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
+import io.netty.util.CharsetUtil;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.net.URL;
+import java.nio.file.Files;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.Locale;
+import java.util.TimeZone;
+
+import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpHeaders.Names.DATE;
+import static io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES;
+import static 
io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE;
+import static io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Simple file server handler that serves requests to web frontend's 
static files, such as
+ * HTML, CSS, or JS files.
+ *
+ * This code is based on the "HttpStaticFileServerHandler" from the Netty 
project's HTTP server
+ * example.
+ *
+ * For every incoming requests the {@link Routed#path()} is pre-processed 
in
+ * {@link AbstractStaticFileServerHandler#preProcessRequestPath(String)}.
+ *
+ * This path is then interpreted as a 

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105934771
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/AbstractStaticFileServerHandler.java
 ---
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.files;
+

+/*
+ * This code is based on the "HttpStaticFileServerHandler" from the
+ * Netty project's HTTP server example.
+ *
+ * See http://netty.io and
+ * 
https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
+ 
*/
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpChunkedInput;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
+import io.netty.util.CharsetUtil;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.net.URL;
+import java.nio.file.Files;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.Locale;
+import java.util.TimeZone;
+
+import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpHeaders.Names.DATE;
+import static io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES;
+import static 
io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE;
+import static io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Simple file server handler that serves requests to web frontend's 
static files, such as
+ * HTML, CSS, or JS files.
+ *
+ * This code is based on the "HttpStaticFileServerHandler" from the Netty 
project's HTTP server
+ * example.
+ *
+ * For every incoming requests the {@link Routed#path()} is pre-processed 
in
+ * {@link AbstractStaticFileServerHandler#preProcessRequestPath(String)}.
+ *
+ * This path is then interpreted as a relative file path, with the 
configured rootDir being the parent.
+ *
+ * If no file exists for this path, another (optional) pre-processing step 
is executed in
+ * {@link 

[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...

2017-03-14 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3406
  
Thanks for the update @beyond1920.
The PR looks good to me. 
@twalthr do you also want to have a look at this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6046) Add support for oversized messages during deployment

2017-03-14 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-6046:
---
Description: 
This is the non-FLIP6 version of FLINK-4346, restricted to deployment messages:

Currently, messages larger than the maximum Akka Framesize cause an error when 
being transported. We should add a way to pass messages that are larger than 
{{akka.framesize}} as may happen for task deployments via the 
{{TaskDeploymentDescriptor}}.

We should use the {{BlobServer}} to offload big data items (if possible) and 
make use of any potential distributed file system behind. This way, not only do 
we avoid the akka framesize restriction, but may also be able to speed up 
deployment.

I suggest the following changes:
  - the sender, i.e. the {{Execution}} class, tries to store the serialized job 
information and serialized task information (if oversized) from the 
{{TaskDeploymentDescriptor}} (tdd) on the {{BlobServer}} as a single 
{{NAME_ADDRESSABLE}} blob under its job ID (if this does not work, we send the 
whole tdd as usual via akka)
  - if stored in a blob, these data items are removed from the tdd
  - the receiver, i.e. the {{TaskManager}} class, tries to retrieve any 
offloaded data after receiving the {{TaskDeploymentDescriptor}} from akka; it 
re-assembles the original tdd
  - the stored blob may be deleted after re-assembly of the tdd

Further (future) changes may include:
  - separating the serialized job information and serialized task information 
into two files and re-use the first one for all tasks
  - not re-deploying these two during job recovery (if possible)
  - then, as all other {{NAME_ADDRESSABLE}} blobs, these offloaded blobs may be 
removed when the job enters a final state instead

  was:
This is the non-FLIP6 version of FLINK-4346, restricted to deployment messages:

Currently, messages larger than the maximum Akka Framesize cause an error when 
being transported. We should add a way to pass messages that are larger than 
{{akka.framesize}} as may happen for task deployments via the 
{{TaskDeploymentDescriptor}}.

We should use the {{BlobServer}} to offload big data items (if possible) and 
make use of any potential distributed file system behind. This way, not only do 
we avoid the akka framesize restriction, but may also be able to speed up 
deployment.

I suggest the following changes:
  - the sender, i.e. the {{Execution}} class, tries to store the serialized job 
information and serialized task information (if oversized) from the 
{{TaskDeploymentDescriptor}} (tdd) on the {{BlobServer}} as a single 
{{NAME_ADDRESSABLE}} blob under its job ID (if this does not work, we send the 
whole tdd as usual via akka)
  - if stored in a blob, these data items are removed from the tdd
  - the receiver, i.e. the {{TaskManager}} class, tries to retrieve any 
offloaded data after receiving the {{TaskDeploymentDescriptor}} from akka; it 
re-assembles the original tdd
  - as all {{NAME_ADDRESSABLE}} blobs, these offloaded blobs are removed when 
the job enters a final state

Further (future) changes may include:
  - separating the serialized job information and serialized task information 
into two files and re-use the first one for all tasks
  - not re-deploying these two during job recovery (if possible)


> Add support for oversized messages during deployment
> 
>
> Key: FLINK-6046
> URL: https://issues.apache.org/jira/browse/FLINK-6046
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> This is the non-FLIP6 version of FLINK-4346, restricted to deployment 
> messages:
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than {{akka.framesize}} as may happen for task deployments via the 
> {{TaskDeploymentDescriptor}}.
> We should use the {{BlobServer}} to offload big data items (if possible) and 
> make use of any potential distributed file system behind. This way, not only 
> do we avoid the akka framesize restriction, but may also be able to speed up 
> deployment.
> I suggest the following changes:
>   - the sender, i.e. the {{Execution}} class, tries to store the serialized 
> job information and serialized task information (if oversized) from the 
> {{TaskDeploymentDescriptor}} (tdd) on the {{BlobServer}} as a single 
> {{NAME_ADDRESSABLE}} blob under its job ID (if this does not work, we send 
> the whole tdd as usual via akka)
>   - if stored in a blob, these data items are removed from the tdd
>   - the receiver, i.e. the {{TaskManager}} class, tries to retrieve any 
> offloaded data after receiving the {{TaskDeploymentDescriptor}} from akka; it 
> re-assembles the 

[jira] [Commented] (FLINK-1579) Create a Flink History Server

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924359#comment-15924359
 ] 

ASF GitHub Bot commented on FLINK-1579:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105931282
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 
HistoryServerOptions#HISTORY_SERVER_DIRS} for
+* new job archives.
+*/
+   static class JobArchiveFetcherTask extends TimerTask {
+   private final List refreshDirs;
+   /** Map containing the JobID of all fetched jobs and the 
refreshDir from with they originate. */
+   private final Map cachedArchives;
+   private final File webDir;
+   private final File webTmpDir;
+   

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105931282
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 
HistoryServerOptions#HISTORY_SERVER_DIRS} for
+* new job archives.
+*/
+   static class JobArchiveFetcherTask extends TimerTask {
+   private final List refreshDirs;
+   /** Map containing the JobID of all fetched jobs and the 
refreshDir from with they originate. */
+   private final Map cachedArchives;
+   private final File webDir;
+   private final File webTmpDir;
+   private final File webJobDir;
+   private final File webOverviewDir;
+
+   private static final String JSON_FILE_ENDING = ".json";
+
+   JobArchiveFetcherTask(List 
refreshDirs, File webDir) {
  

[jira] [Commented] (FLINK-1579) Create a Flink History Server

2017-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924354#comment-15924354
 ] 

ASF GitHub Bot commented on FLINK-1579:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105929511
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 
HistoryServerOptions#HISTORY_SERVER_DIRS} for
+* new job archives.
+*/
+   static class JobArchiveFetcherTask extends TimerTask {
+   private final List refreshDirs;
+   /** Map containing the JobID of all fetched jobs and the 
refreshDir from with they originate. */
+   private final Map cachedArchives;
+   private final File webDir;
+   private final File webTmpDir;
+   

[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server

2017-03-14 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3460#discussion_r105929511
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class HistoryServerArchiveFetcher {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
+
+   private static final JsonFactory jacksonFactory = new JsonFactory();
+   private static final ObjectMapper mapper = new ObjectMapper();
+
+   private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor(
+   new 
ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
+   private final JobArchiveFetcherTask fetcherTask;
+   private final long refreshIntervalMillis;
+
+   HistoryServerArchiveFetcher(long refreshIntervalMillis, 
List refreshDirs, File webDir) {
+   this.refreshIntervalMillis = refreshIntervalMillis;
+   this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, 
webDir);
+   if (LOG.isInfoEnabled()) {
+   for (HistoryServer.RefreshLocation refreshDir : 
refreshDirs) {
+   LOG.info("Monitoring directory {} for archived 
jobs.", refreshDir.getPath());
+   }
+   }
+   }
+
+   void start() {
+   executor.scheduleWithFixedDelay(fetcherTask, 0, 
refreshIntervalMillis, TimeUnit.MILLISECONDS);
+   }
+
+   void stop() {
+   executor.shutdown();
+
+   try {
+   if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+   executor.shutdownNow();
+   }
+   } catch (InterruptedException ignored) {
+   executor.shutdownNow();
+   }
+   }
+
+   /**
+* {@link TimerTask} that polls the directories configured as {@link 
HistoryServerOptions#HISTORY_SERVER_DIRS} for
+* new job archives.
+*/
+   static class JobArchiveFetcherTask extends TimerTask {
+   private final List refreshDirs;
+   /** Map containing the JobID of all fetched jobs and the 
refreshDir from with they originate. */
+   private final Map cachedArchives;
+   private final File webDir;
+   private final File webTmpDir;
+   private final File webJobDir;
+   private final File webOverviewDir;
+
+   private static final String JSON_FILE_ENDING = ".json";
+
+   JobArchiveFetcherTask(List 
refreshDirs, File webDir) {

  1   2   3   4   >