[jira] [Commented] (FLINK-5776) Improve XXMapRunner support create instance by carrying constructor parameters

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887474#comment-15887474
 ] 

ASF GitHub Bot commented on FLINK-5776:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3418
  
@fhueske thanks for your review. I have updated the PR.
IMO. The main purpose of doing this PR is to enhance the function of 
Flat/MapRunner. In addition, my next plan is:
Use CODE-GEN to generate the class which below the 
`org.apache.flink.table.runtime.aggregate` package, perhaps this PR will help 
me in the next work. What do you think?



> Improve XXMapRunner support create instance by carrying constructor parameters
> --
>
> Key: FLINK-5776
> URL: https://issues.apache.org/jira/browse/FLINK-5776
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> At present, MapRunner FlatMapRunner only supports create non-parameter 
> instance, but sometimes we need to carry constructor parameters to 
> instantiate, so I would like to improve XXMapRunner support create instance 
> by carrying constructor parameters.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner support ...

2017-02-27 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3418
  
@fhueske thanks for your review. I have updated the PR.
IMO. The main purpose of doing this PR is to enhance the function of 
Flat/MapRunner. In addition, my next plan is:
Use CODE-GEN to generate the class which below the 
`org.apache.flink.table.runtime.aggregate` package, perhaps this PR will help 
me in the next work. What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-5818) change checkpoint dir permission to 700 for security reason

2017-02-27 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui reassigned FLINK-5818:


Assignee: Tao Wang

> change checkpoint dir permission to 700 for security reason
> ---
>
> Key: FLINK-5818
> URL: https://issues.apache.org/jira/browse/FLINK-5818
> Project: Flink
>  Issue Type: Sub-task
>  Components: Security, State Backends, Checkpointing
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> Now checkpoint directory is made w/o specified permission, so it is easy for 
> another user to delete or read files under it, which will cause restore 
> failure or information leak.
> It's better to lower it down to 700.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5921) Adapt time mode indicator functions return custom data types

2017-02-27 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887417#comment-15887417
 ] 

Haohui Mai commented on FLINK-5921:
---

My understanding is that there should be no need for code generators as the 
indicators will be removed in the hep planner?

> Adapt time mode indicator functions return custom data types
> 
>
> Key: FLINK-5921
> URL: https://issues.apache.org/jira/browse/FLINK-5921
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> The functions that indicate event time ({{rowtime()}}) and processing time 
> ({{proctime()}}) are defined to return {{TIMESTAMP}}.
> These functions should be updated to return custom types in order to ease the 
> identification of the time semantics during optimization.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3397: [FLINK-5803][TableAPI] Add [partitioned] processing t...

2017-02-27 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3397
  
@fhueske thanks for the PR(#3425). I have rebased the code, and add 
timeMode check. I'm appreciated if you can look at this PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5803) Add [partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887374#comment-15887374
 ] 

ASF GitHub Bot commented on FLINK-5803:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3397
  
@fhueske thanks for the PR(#3425). I have rebased the code, and add 
timeMode check. I'm appreciated if you can look at this PR. 


> Add [partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING 
> aggregation to SQL
> ---
>
> Key: FLINK-5803
> URL: https://issues.apache.org/jira/browse/FLINK-5803
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5654)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5921) Adapt time mode indicator functions return custom data types

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887370#comment-15887370
 ] 

ASF GitHub Bot commented on FLINK-5921:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3425#discussion_r103384285
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 ---
@@ -327,6 +329,15 @@ object FunctionGenerator {
 )
   )
 
+// generate a constant for time indicator functions.
+//   this is a temporary solution and will be removed when FLINK-5884 
is implemented.
+case ProcTimeExtractor | EventTimeExtractor =>
+  Some(new CallGenerator {
+override def generate(codeGenerator: CodeGenerator, operands: 
Seq[GeneratedExpression]) = {
+  GeneratedExpression("0L", "false", "", LONG_TYPE_INFO)
--- End diff --

Can we change `LONG_TYPE_INFO` to `SqlTimeTypeInfo.TIMESTAMP`?


> Adapt time mode indicator functions return custom data types
> 
>
> Key: FLINK-5921
> URL: https://issues.apache.org/jira/browse/FLINK-5921
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> The functions that indicate event time ({{rowtime()}}) and processing time 
> ({{proctime()}}) are defined to return {{TIMESTAMP}}.
> These functions should be updated to return custom types in order to ease the 
> identification of the time semantics during optimization.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3425: [FLINK-5921] [table] Add custom data types for row...

2017-02-27 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3425#discussion_r103384285
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 ---
@@ -327,6 +329,15 @@ object FunctionGenerator {
 )
   )
 
+// generate a constant for time indicator functions.
+//   this is a temporary solution and will be removed when FLINK-5884 
is implemented.
+case ProcTimeExtractor | EventTimeExtractor =>
+  Some(new CallGenerator {
+override def generate(codeGenerator: CodeGenerator, operands: 
Seq[GeneratedExpression]) = {
+  GeneratedExpression("0L", "false", "", LONG_TYPE_INFO)
--- End diff --

Can we change `LONG_TYPE_INFO` to `SqlTimeTypeInfo.TIMESTAMP`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5752) Support push down projections for HBaseTableSource

2017-02-27 Thread ramkrishna.s.vasudevan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ramkrishna.s.vasudevan updated FLINK-5752:
--
Description: 
This is after the discussion to create NestedProjectableTableSource. 
Currently we support nested schema for the non-relational type of DBs like 
HBase. 
But this does not allow push down projection. This JIRA is to implement that. 
Once FLINK-5698 is implemented then we should be making use of the feature to 
push down the projections for a nested table. So in case of HBase if we have 
{f1:{a, b}, f2:{c, d}} as the nested structure then if we have a scan query 
that needs to select f2.c - then we should be specifically able to project only 
that column 'c' under 'f2'. FLINK-5698 plans to add a new API for such 
projections and HBaseTableSource should make use of that API to do the 
projection.
[~fhueske], [~tonycox], [~jark]


  was:
This is after the discussion to create NestedProjectableTableSource. 
Currently we support nested schema for the non-relational type of DBs like 
HBase. 
But this does not allow push down projection. This JIRA is to implement that. 
I just did a POC on the existing PR https://github.com/apache/flink/pull/3149.
The idea was to allow the family#quallifier to be specified in the query 
directly and parse the family and qualifier info and directly push down for 
projections.
I tried using fam$qual (with $) as separator but that had SQL parse error. 
Hence went with '_' as the separator and it worked. 
So now in order to maintain the order in which the projection has to be done we 
need to have a LinkedHashMap which has the qualifier to typeinfo mapping. This 
is because with f1_q1, f1_q2 as the schema, we could always project f1_q2 and 
the selection could be f1_q1. In that case the order of how the projection is 
done is very important so that there is no type mismatch. 
[~fhueske], [~tonycox], [~jark]


> Support push down projections for HBaseTableSource
> --
>
> Key: FLINK-5752
> URL: https://issues.apache.org/jira/browse/FLINK-5752
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
>
> This is after the discussion to create NestedProjectableTableSource. 
> Currently we support nested schema for the non-relational type of DBs like 
> HBase. 
> But this does not allow push down projection. This JIRA is to implement that. 
> Once FLINK-5698 is implemented then we should be making use of the feature to 
> push down the projections for a nested table. So in case of HBase if we have 
> {f1:{a, b}, f2:{c, d}} as the nested structure then if we have a scan query 
> that needs to select f2.c - then we should be specifically able to project 
> only that column 'c' under 'f2'. FLINK-5698 plans to add a new API for such 
> projections and HBaseTableSource should make use of that API to do the 
> projection.
> [~fhueske], [~tonycox], [~jark]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887167#comment-15887167
 ] 

ASF GitHub Bot commented on FLINK-5881:
---

Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r103369181
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +138,24 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var noVargs = true
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  if (method.isVarArgs) {
+noVargs = false
+  } else if 
(signatures.last.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  }
+}
+  })
+  if (trailingSeq && noVargs) {
+// We found trailing "scala.collection.Seq", but no trailing 
"Type[]", "Type..."
+throw new ValidationException("The 'eval' method do not support 
Scala type of " +
--- End diff --

I think one of the important thing here is to check type by type.


> ScalarFunction(UDF) should support variable types and variable arguments  
> -
>
> Key: FLINK-5881
> URL: https://issues.apache.org/jira/browse/FLINK-5881
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> As a sub-task of FLINK-5826. We would like to support the ScalarFunction 
> first and make the review a little bit easier.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-27 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r103369181
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +138,24 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var noVargs = true
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  if (method.isVarArgs) {
+noVargs = false
+  } else if 
(signatures.last.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  }
+}
+  })
+  if (trailingSeq && noVargs) {
+// We found trailing "scala.collection.Seq", but no trailing 
"Type[]", "Type..."
+throw new ValidationException("The 'eval' method do not support 
Scala type of " +
--- End diff --

I think one of the important thing here is to check type by type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4422) Convert all time interval measurements to System.nanoTime()

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887161#comment-15887161
 ] 

ASF GitHub Bot commented on FLINK-4422:
---

Github user jinmingjian commented on a diff in the pull request:

https://github.com/apache/flink/pull/3420#discussion_r103196536
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
 ---
@@ -355,13 +355,13 @@ public E getElementBlocking(long timeoutMillis) 
throws InterruptedException {
throw new IllegalArgumentException("invalid timeout");
}

-   final long deadline = System.currentTimeMillis() + 
timeoutMillis;
+   final long deadline = System.nanoTime() + timeoutMillis * 
1_000_000L;
--- End diff --

We can. But as an API, a ton of invocations in tests will come to change. 
Are you ready for review? :smiley_cat: And from practices, it is uncommon to 
use nano sec time unit in API except for performance/benchmark in that it is 
usual too small.


> Convert all time interval measurements to System.nanoTime()
> ---
>
> Key: FLINK-4422
> URL: https://issues.apache.org/jira/browse/FLINK-4422
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Jin Mingjian
>Priority: Minor
>
> In contrast to {{System.currentTimeMillis()}}, {{System.nanoTime()}} is 
> monotonous. To measure delays and time intervals, {{System.nanoTime()}} is 
> hence reliable, while {{System.currentTimeMillis()}} is not.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4422) Convert all time interval measurements to System.nanoTime()

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887162#comment-15887162
 ] 

ASF GitHub Bot commented on FLINK-4422:
---

Github user jinmingjian commented on a diff in the pull request:

https://github.com/apache/flink/pull/3420#discussion_r103195631
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
 ---
@@ -42,12 +42,12 @@
@SuppressWarnings("deprecation")
@Override
public void run() {
-   final long deadline = System.currentTimeMillis() + timeout;
+   final long deadline = System.nanoTime() + timeout * 1_000_000L;
long now;
 
-   while (toKill.isAlive() && (now = System.currentTimeMillis()) < 
deadline) {
+   while (toKill.isAlive() && (now = System.nanoTime()) < 
deadline) {
try {
-   toKill.join(deadline - now);
+   toKill.join( ( deadline - now ) / 1_000_000L );
--- End diff --

good catch:)


> Convert all time interval measurements to System.nanoTime()
> ---
>
> Key: FLINK-4422
> URL: https://issues.apache.org/jira/browse/FLINK-4422
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Jin Mingjian
>Priority: Minor
>
> In contrast to {{System.currentTimeMillis()}}, {{System.nanoTime()}} is 
> monotonous. To measure delays and time intervals, {{System.nanoTime()}} is 
> hence reliable, while {{System.currentTimeMillis()}} is not.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3420: [FLINK-4422] Convert all time interval measurement...

2017-02-27 Thread jinmingjian
Github user jinmingjian commented on a diff in the pull request:

https://github.com/apache/flink/pull/3420#discussion_r103195631
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KillerWatchDog.java
 ---
@@ -42,12 +42,12 @@
@SuppressWarnings("deprecation")
@Override
public void run() {
-   final long deadline = System.currentTimeMillis() + timeout;
+   final long deadline = System.nanoTime() + timeout * 1_000_000L;
long now;
 
-   while (toKill.isAlive() && (now = System.currentTimeMillis()) < 
deadline) {
+   while (toKill.isAlive() && (now = System.nanoTime()) < 
deadline) {
try {
-   toKill.join(deadline - now);
+   toKill.join( ( deadline - now ) / 1_000_000L );
--- End diff --

good catch:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3420: [FLINK-4422] Convert all time interval measurement...

2017-02-27 Thread jinmingjian
Github user jinmingjian commented on a diff in the pull request:

https://github.com/apache/flink/pull/3420#discussion_r103196536
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
 ---
@@ -355,13 +355,13 @@ public E getElementBlocking(long timeoutMillis) 
throws InterruptedException {
throw new IllegalArgumentException("invalid timeout");
}

-   final long deadline = System.currentTimeMillis() + 
timeoutMillis;
+   final long deadline = System.nanoTime() + timeoutMillis * 
1_000_000L;
--- End diff --

We can. But as an API, a ton of invocations in tests will come to change. 
Are you ready for review? :smiley_cat: And from practices, it is uncommon to 
use nano sec time unit in API except for performance/benchmark in that it is 
usual too small.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887126#comment-15887126
 ] 

ASF GitHub Bot commented on FLINK-5881:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r103366163
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 ---
@@ -114,7 +114,9 @@ object ScalarSqlFunction {
 
 inferredTypes.zipWithIndex.foreach {
   case (inferredType, i) =>
-operandTypes(i) = inferredType
+if (operandTypes.length > 0) {
+  operandTypes(i) = inferredType
--- End diff --

If this is a varargs method, the inferredType is an array type. The operand 
type should be the component type of the array type, not the array type. 

And the `operandTypes.length > 0` condition is still not safe, say the 
method is `eval(String a, int... b)`  and calling `eval("hello")`, the 
`operandTypes`'s length is 1, but `inferredTypes`'s length is 2. An 
IndexOutOfBoundsException would be thrown as before.


> ScalarFunction(UDF) should support variable types and variable arguments  
> -
>
> Key: FLINK-5881
> URL: https://issues.apache.org/jira/browse/FLINK-5881
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> As a sub-task of FLINK-5826. We would like to support the ScalarFunction 
> first and make the review a little bit easier.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887127#comment-15887127
 ] 

ASF GitHub Bot commented on FLINK-5881:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r103361639
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +138,24 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var noVargs = true
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  if (method.isVarArgs) {
+noVargs = false
+  } else if 
(signatures.last.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  }
+}
+  })
+  if (trailingSeq && noVargs) {
+// We found trailing "scala.collection.Seq", but no trailing 
"Type[]", "Type..."
+throw new ValidationException("The 'eval' method do not support 
Scala type of " +
--- End diff --

If there is multiple eval methods found (not ambiguous), one is varargs, 
the other is not. It seems that no exception is thrown to tell users that the 
non-varargs eval method is not work.

```
@varargs
def eval(args: String*): String = {...}
// no varargs annotation
def eval(args: Int*): Int = {...}
```


> ScalarFunction(UDF) should support variable types and variable arguments  
> -
>
> Key: FLINK-5881
> URL: https://issues.apache.org/jira/browse/FLINK-5881
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>
> As a sub-task of FLINK-5826. We would like to support the ScalarFunction 
> first and make the review a little bit easier.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r103361639
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +138,24 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var noVargs = true
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  if (method.isVarArgs) {
+noVargs = false
+  } else if 
(signatures.last.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  }
+}
+  })
+  if (trailingSeq && noVargs) {
+// We found trailing "scala.collection.Seq", but no trailing 
"Type[]", "Type..."
+throw new ValidationException("The 'eval' method do not support 
Scala type of " +
--- End diff --

If there is multiple eval methods found (not ambiguous), one is varargs, 
the other is not. It seems that no exception is thrown to tell users that the 
non-varargs eval method is not work.

```
@varargs
def eval(args: String*): String = {...}
// no varargs annotation
def eval(args: Int*): Int = {...}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r103366163
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 ---
@@ -114,7 +114,9 @@ object ScalarSqlFunction {
 
 inferredTypes.zipWithIndex.foreach {
   case (inferredType, i) =>
-operandTypes(i) = inferredType
+if (operandTypes.length > 0) {
+  operandTypes(i) = inferredType
--- End diff --

If this is a varargs method, the inferredType is an array type. The operand 
type should be the component type of the array type, not the array type. 

And the `operandTypes.length > 0` condition is still not safe, say the 
method is `eval(String a, int... b)`  and calling `eval("hello")`, the 
`operandTypes`'s length is 1, but `inferredTypes`'s length is 2. An 
IndexOutOfBoundsException would be thrown as before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3426: [FLINK-5414] [table] Bump up Calcite version to 1.11

2017-02-27 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3426
  
Hi @haohui , the code looks very good to me. Thanks for investigating this, 
waiting for the CI pass.

@twalthr , I think you would like to have a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5414) Bump up Calcite version to 1.11

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887104#comment-15887104
 ] 

ASF GitHub Bot commented on FLINK-5414:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/3426
  
Hi @haohui , the code looks very good to me. Thanks for investigating this, 
waiting for the CI pass.

@twalthr , I think you would like to have a look.


> Bump up Calcite version to 1.11
> ---
>
> Key: FLINK-5414
> URL: https://issues.apache.org/jira/browse/FLINK-5414
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> The upcoming Calcite release 1.11 has a lot of stability fixes and new 
> features. We should update it for the Table API.
> E.g. we can hopefully merge FLINK-4864



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5917) Remove MapState.size()

2017-02-27 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15887094#comment-15887094
 ] 

Xiaogang Shi commented on FLINK-5917:
-

[~StephanEwen] We just use the cache to avoid the costly scanning. It's 
initialized the first time the `size()` method is called. After then, the cache 
will be updated every time a new entry is inserted or an entry is removed. When 
the backend is closed, we can simply drop the cache. 

A better choice, i think, is to use a RocksDB entry to record the value of the 
`size`. We don't need to write the value into the entry everytime it's updated. 
We can update it only when taking snapshots. But this requires states to be 
aware of checkpointing which is missing in our current implementation.

> Remove MapState.size()
> --
>
> Key: FLINK-5917
> URL: https://issues.apache.org/jira/browse/FLINK-5917
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>
> I'm proposing to remove {{size()}} because it is a prohibitively expensive 
> operation and users might not be aware of it. Instead of {{size()}} users can 
> use an iterator over all mappings to determine the size, when doing this they 
> will be aware of the fact that it is a costly operation.
> Right now, {{size()}} is only costly on the RocksDB state backend but I think 
> with future developments on the in-memory state backend it might also become 
> an expensive operation there.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5907) RowCsvInputFormat bug on parsing tsv

2017-02-27 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-5907.

   Resolution: Fixed
Fix Version/s: 1.2.1
   1.3.0

Fixed for 1.2.1 with 5168b9f62a05176aca5bd3c094241daaa4d14b2e
Fixed for 1.3.0 with 1a062b796274c9f63caeb2bf12aad96e34efd0aa

> RowCsvInputFormat bug on parsing tsv
> 
>
> Key: FLINK-5907
> URL: https://issues.apache.org/jira/browse/FLINK-5907
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.2.0
>Reporter: Flavio Pompermaier
>Assignee: Kurt Young
>  Labels: csv, parsing
> Fix For: 1.3.0, 1.2.1
>
> Attachments: test.tsv
>
>
> The following snippet reproduce the problem (using the attached file as 
> input):
> {code:language=java}
> char fieldDelim = '\t';
> TypeInformation[] fieldTypes = new TypeInformation[51];
> for (int i = 0; i < fieldTypes.length; i++) {
>   fieldTypes[i] = BasicTypeInfo.STRING_TYPE_INFO;
> }
> int[] fieldMask = new int[fieldTypes.length];
> for (int i = 0; i < fieldMask.length; i++) {
>   fieldMask[i] = i;
> }
> RowCsvInputFormat csvIF = new RowCsvInputFormat(new Path(testCsv), 
> fieldTypes, "\n", fieldDelim +"", 
>fieldMask, true);
> csvIF.setNestedFileEnumeration(true);
> DataSet csv = env.createInput(csvIF);
>csv.print()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-3475) DISTINCT aggregate function support for SQL queries

2017-02-27 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-3475.

   Resolution: Implemented
Fix Version/s: 1.3.0

Implemented for 1.3.0 with 36c9348ff06cae1fe55925bcc6081154be2f10f5

> DISTINCT aggregate function support for SQL queries
> ---
>
> Key: FLINK-3475
> URL: https://issues.apache.org/jira/browse/FLINK-3475
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Chengxiang Li
>Assignee: Zhenghua Gao
> Fix For: 1.3.0
>
>
> DISTINCT aggregate function may be able to reuse the aggregate function 
> instead of separate implementation, and let Flink runtime take care of 
> duplicate records.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support for SQL queries

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886763#comment-15886763
 ] 

ASF GitHub Bot commented on FLINK-3475:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3111


> DISTINCT aggregate function support for SQL queries
> ---
>
> Key: FLINK-3475
> URL: https://issues.apache.org/jira/browse/FLINK-3475
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Chengxiang Li
>Assignee: Zhenghua Gao
>
> DISTINCT aggregate function may be able to reuse the aggregate function 
> instead of separate implementation, and let Flink runtime take care of 
> duplicate records.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3417: [FLINK-5907] [java api] Fix trailing empty fields ...

2017-02-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3417


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5907) RowCsvInputFormat bug on parsing tsv

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886762#comment-15886762
 ] 

ASF GitHub Bot commented on FLINK-5907:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3417


> RowCsvInputFormat bug on parsing tsv
> 
>
> Key: FLINK-5907
> URL: https://issues.apache.org/jira/browse/FLINK-5907
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.2.0
>Reporter: Flavio Pompermaier
>Assignee: Kurt Young
>  Labels: csv, parsing
> Attachments: test.tsv
>
>
> The following snippet reproduce the problem (using the attached file as 
> input):
> {code:language=java}
> char fieldDelim = '\t';
> TypeInformation[] fieldTypes = new TypeInformation[51];
> for (int i = 0; i < fieldTypes.length; i++) {
>   fieldTypes[i] = BasicTypeInfo.STRING_TYPE_INFO;
> }
> int[] fieldMask = new int[fieldTypes.length];
> for (int i = 0; i < fieldMask.length; i++) {
>   fieldMask[i] = i;
> }
> RowCsvInputFormat csvIF = new RowCsvInputFormat(new Path(testCsv), 
> fieldTypes, "\n", fieldDelim +"", 
>fieldMask, true);
> csvIF.setNestedFileEnumeration(true);
> DataSet csv = env.createInput(csvIF);
>csv.print()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3111: [FLINK-3475] [Table] DISTINCT aggregate function s...

2017-02-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3111


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...

2017-02-27 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r103339489
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
--- End diff --

Can you please suggest where it should be put?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886750#comment-15886750
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3314#discussion_r103339489
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/RichKeyedDeserializationSchema.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
--- End diff --

Can you please suggest where it should be put?


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5133) Support to set resource for operator in DataStream and DataSet

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886691#comment-15886691
 ] 

ASF GitHub Bot commented on FLINK-5133:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3303
  
I have merged this to my local repository.
There were some issues left, partly in the commented out code.

In particular `checkNotNull(variable !=null) does not work, because 
`variable !=null` evaluates to `Boolean(true)` or Boolean(false)` and then the 
check is that the Boolean is not null (which is always true).

I fixed that and also renamed the variables to use "resources" consistently 
as plural, and harmonized the Java getter style.
Will merge to Flink master tomorrow if the tests pass...


> Support to set resource for operator in DataStream and DataSet
> --
>
> Key: FLINK-5133
> URL: https://issues.apache.org/jira/browse/FLINK-5133
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API, DataStream API
>Reporter: zhijiang
>Assignee: zhijiang
>
> This is part of the fine-grained resource configuration.
> For *DataStream*, the *setResource* API will be setted onto 
> *SingleOutputStreamOperator* similar with other existing properties like 
> parallelism, name, etc.
> For *DataSet*, the *setResource* API will be setted onto *Operator* in the 
> similar way.
> There are two parameters described with minimum *ResourceSpec* and maximum 
> *ResourceSpec* separately in the API for considering resource resize in 
> future improvements.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3303: [FLINK-5133][core] Support to set resource for operator i...

2017-02-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3303
  
I have merged this to my local repository.
There were some issues left, partly in the commented out code.

In particular `checkNotNull(variable !=null) does not work, because 
`variable !=null` evaluates to `Boolean(true)` or Boolean(false)` and then the 
check is that the Boolean is not null (which is always true).

I fixed that and also renamed the variables to use "resources" consistently 
as plural, and harmonized the Java getter style.
Will merge to Flink master tomorrow if the tests pass...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5501) Determine whether the job starts from last JobManager failure

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886688#comment-15886688
 ] 

ASF GitHub Bot commented on FLINK-5501:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3385
  
One test case seemed to be failing in this PR:
I have merged the PR to my local repository, fixed the test, and added some 
fixes/cleanups on top.
Will merge back to Flink master tomorrow...


> Determine whether the job starts from last JobManager failure
> -
>
> Key: FLINK-5501
> URL: https://issues.apache.org/jira/browse/FLINK-5501
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: zhijiang
>Assignee: shuai.xu
>
> When the {{JobManagerRunner}} grants leadership, it should check whether the 
> current job is already running or not. If the job is running, the 
> {{JobManager}} should reconcile itself (enter RECONCILING state) and waits 
> for the {{TaskManager}} reporting task status. Otherwise the {{JobManger}} 
> can schedule the {{ExecutionGraph}} in common way.
> The {{RunningJobsRegistry}} can provide the way to check the job running 
> status, but we should expand the current interface and fix the related 
> process to support this function.
> 1. {{RunningJobsRegistry}} sets RUNNING status after {{JobManagerRunner}} 
> granting leadership at the first time.
> 2. If the job finishes, the job status will be set FINISHED by 
> {{RunningJobsRegistry}} and the status will be deleted before exit. 
> 3. If the mini cluster starts multi {{JobManagerRunner}}, and the leader 
> {{JobManagerRunner}} already finishes the job to set the job status FINISHED, 
> other {{JobManagerRunner}} will exit after grants the leadership again.
> 4. If the {{JobManager}} fails, the job status will be still in RUNNING. So 
> if the {{JobManagerRunner}} (the previous or new one) grants leadership 
> again, it will check the job status and enters {{RECONCILING}} state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3385: [FLINK-5501] JM use running job registry to determine whe...

2017-02-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3385
  
One test case seemed to be failing in this PR:
I have merged the PR to my local repository, fixed the test, and added some 
fixes/cleanups on top.
Will merge back to Flink master tomorrow...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5414) Bump up Calcite version to 1.11

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886650#comment-15886650
 ] 

ASF GitHub Bot commented on FLINK-5414:
---

GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/3426

[FLINK-5414] [table] Bump up Calcite version to 1.11

This PR resembles #3338 except that it canonizes the nullable types.

@wuchong can you please take a look?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-5414

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3426.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3426


commit 339f310dec67d40cd4ed4ecbabf09a0b0dcba518
Author: Haohui Mai 
Date:   2017-02-27T22:24:08Z

[FLINK-5414] [table] Bump up Calcite version to 1.11. (Jark Wu and Haohui 
Mai)




> Bump up Calcite version to 1.11
> ---
>
> Key: FLINK-5414
> URL: https://issues.apache.org/jira/browse/FLINK-5414
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>
> The upcoming Calcite release 1.11 has a lot of stability fixes and new 
> features. We should update it for the Table API.
> E.g. we can hopefully merge FLINK-4864



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3426: [FLINK-5414] [table] Bump up Calcite version to 1....

2017-02-27 Thread haohui
GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/3426

[FLINK-5414] [table] Bump up Calcite version to 1.11

This PR resembles #3338 except that it canonizes the nullable types.

@wuchong can you please take a look?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-5414

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3426.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3426


commit 339f310dec67d40cd4ed4ecbabf09a0b0dcba518
Author: Haohui Mai 
Date:   2017-02-27T22:24:08Z

[FLINK-5414] [table] Bump up Calcite version to 1.11. (Jark Wu and Haohui 
Mai)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5803) Add [partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886579#comment-15886579
 ] 

ASF GitHub Bot commented on FLINK-5803:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3397
  
Hi @sunjincheng121, I opened PR #3425, which adds custom data types for 
`proctime()` and `rowtime()` and replaces the functions during translation with 
constants.

You can rebase your commit on this PR and continue working on it.
Thanks, Fabian


> Add [partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING 
> aggregation to SQL
> ---
>
> Key: FLINK-5803
> URL: https://issues.apache.org/jira/browse/FLINK-5803
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5654)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3397: [FLINK-5803][TableAPI] Add [partitioned] processing t...

2017-02-27 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3397
  
Hi @sunjincheng121, I opened PR #3425, which adds custom data types for 
`proctime()` and `rowtime()` and replaces the functions during translation with 
constants.

You can rebase your commit on this PR and continue working on it.
Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5921) Adapt time mode indicator functions return custom data types

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886575#comment-15886575
 ] 

ASF GitHub Bot commented on FLINK-5921:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/3425

[FLINK-5921] [table] Add custom data types for rowtime and proctime.

We add a custom data type for the result of the `rowtime()` and 
`proctime()` marker functions to easier identify the chosen time semantics in 
when translating window operations.

Both method return a constant timestamp (0L) which is injected during 
constant expression reduction.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableTimeTypes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3425.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3425


commit 6cc819b88c408de7b42a968d2a8cdb21ab927ffd
Author: Fabian Hueske 
Date:   2017-02-25T23:28:54Z

[FLINK-5921] [table] Add custom data types for rowtime and proctime.

- proctime() and rowtime() return constont timestamp (0L).




> Adapt time mode indicator functions return custom data types
> 
>
> Key: FLINK-5921
> URL: https://issues.apache.org/jira/browse/FLINK-5921
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> The functions that indicate event time ({{rowtime()}}) and processing time 
> ({{proctime()}}) are defined to return {{TIMESTAMP}}.
> These functions should be updated to return custom types in order to ease the 
> identification of the time semantics during optimization.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3425: [FLINK-5921] [table] Add custom data types for row...

2017-02-27 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/3425

[FLINK-5921] [table] Add custom data types for rowtime and proctime.

We add a custom data type for the result of the `rowtime()` and 
`proctime()` marker functions to easier identify the chosen time semantics in 
when translating window operations.

Both method return a constant timestamp (0L) which is injected during 
constant expression reduction.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableTimeTypes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3425.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3425


commit 6cc819b88c408de7b42a968d2a8cdb21ab927ffd
Author: Fabian Hueske 
Date:   2017-02-25T23:28:54Z

[FLINK-5921] [table] Add custom data types for rowtime and proctime.

- proctime() and rowtime() return constont timestamp (0L).




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5907) RowCsvInputFormat bug on parsing tsv

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886552#comment-15886552
 ] 

ASF GitHub Bot commented on FLINK-5907:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3417
  
Merging


> RowCsvInputFormat bug on parsing tsv
> 
>
> Key: FLINK-5907
> URL: https://issues.apache.org/jira/browse/FLINK-5907
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.2.0
>Reporter: Flavio Pompermaier
>Assignee: Kurt Young
>  Labels: csv, parsing
> Attachments: test.tsv
>
>
> The following snippet reproduce the problem (using the attached file as 
> input):
> {code:language=java}
> char fieldDelim = '\t';
> TypeInformation[] fieldTypes = new TypeInformation[51];
> for (int i = 0; i < fieldTypes.length; i++) {
>   fieldTypes[i] = BasicTypeInfo.STRING_TYPE_INFO;
> }
> int[] fieldMask = new int[fieldTypes.length];
> for (int i = 0; i < fieldMask.length; i++) {
>   fieldMask[i] = i;
> }
> RowCsvInputFormat csvIF = new RowCsvInputFormat(new Path(testCsv), 
> fieldTypes, "\n", fieldDelim +"", 
>fieldMask, true);
> csvIF.setNestedFileEnumeration(true);
> DataSet csv = env.createInput(csvIF);
>csv.print()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3417: [FLINK-5907] [java api] Fix trailing empty fields in CsvI...

2017-02-27 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3417
  
Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support for SQL queries

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886551#comment-15886551
 ] 

ASF GitHub Bot commented on FLINK-3475:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3111
  
Merging


> DISTINCT aggregate function support for SQL queries
> ---
>
> Key: FLINK-3475
> URL: https://issues.apache.org/jira/browse/FLINK-3475
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Chengxiang Li
>Assignee: Zhenghua Gao
>
> DISTINCT aggregate function may be able to reuse the aggregate function 
> instead of separate implementation, and let Flink runtime take care of 
> duplicate records.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...

2017-02-27 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3111
  
Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5570) Support register external catalog to table environment

2017-02-27 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886524#comment-15886524
 ] 

Haohui Mai commented on FLINK-5570:
---

Thanks [~fhueske] for the pointer. Just took a very quick skim -- if I 
understand correctly, [#3409|https://github.com/apache/flink/pull/3409] allows 
other users to manipulate the catalogs within Flink.

This is different from having a Metastore-like API that gives the statistics of 
the tables. Personally I think it might make sense to look into both issues 
holistically and come up with a design that works well with both the Metastore 
and HCatalog use cases.

> Support register external catalog to table environment
> --
>
> Key: FLINK-5570
> URL: https://issues.apache.org/jira/browse/FLINK-5570
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: jingzhang
>
> This issue aims to support register one or more {{ExternalCatalog}} (which is 
> referred in https://issues.apache.org/jira/browse/FLINK-5568) to 
> {{TableEnvironment}}. After registration, SQL and TableAPI queries could 
> access to tables in the external catalogs without register those tables one 
> by one to {{TableEnvironment}} beforehand.
> We plan to add two APIs in {{TableEnvironment}}:
> 1. register externalCatalog
> {code}
> def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): 
> Unit
> {code}
> 2. scan a table from registered catalog and returns the resulting {{Table}},  
> the API is very useful in TableAPI queries.
> {code}
> def scan(catalogName: String, tableIdentifier: TableIdentifier): Table
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3424: [FLINK-5928] [checkpoints] Use custom metadata file for e...

2017-02-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3424
  
Thanks, good and critical fix!
Looking at this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5928) Externalized checkpoints overwritting each other

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886521#comment-15886521
 ] 

ASF GitHub Bot commented on FLINK-5928:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3424
  
Thanks, good and critical fix!
Looking at this...


> Externalized checkpoints overwritting each other
> 
>
> Key: FLINK-5928
> URL: https://issues.apache.org/jira/browse/FLINK-5928
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>Priority: Critical
>
> I noticed that PR #3346 accidentally broke externalized checkpoints by using 
> a fixed meta data file name. We should restore the old behaviour with 
> creating random files and double check why no test caught this.
> This will likely superseded by upcoming changes from [~StephanEwen] to use 
> metadata streams on the JobManager.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886446#comment-15886446
 ] 

Seth Wiesman commented on FLINK-5929:
-

[~aljoscha] Yes this does look like what we were discussing. Regarding cleanup 
vs using gc horizon, as a user I think I would expect there to be a cleanup 
method to be consistent with the trigger context. I already have a rudimentary 
version of this implemented for my own use so I would be happy to take this 
ticket, clean up my code, and submit a pr. 

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886450#comment-15886450
 ] 

Seth Wiesman commented on FLINK-5929:
-

[~rehevkor5] Yes, state for individual window panes. 

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3314: [FLINK-3679] DeserializationSchema should handle zero or ...

2017-02-27 Thread haohui
Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3314
  
@rmetzger ping...
just wondering what do you think about all the approaches we have discussed 
here? Your comments are appreciated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886445#comment-15886445
 ] 

ASF GitHub Bot commented on FLINK-3679:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3314
  
@rmetzger ping...
just wondering what do you think about all the approaches we have discussed 
here? Your comments are appreciated.


> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5931) Make Flink highly available even if defaultFS is unavailable

2017-02-27 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886438#comment-15886438
 ] 

Haohui Mai commented on FLINK-5931:
---

I have confirmed with [~gtCarrera9] (a YARN committer and Hadoop PMC) that YARN 
can continue to operate even if the {{defaultFS}} is unavailable.

> Make Flink highly available even if defaultFS is unavailable
> 
>
> Key: FLINK-5931
> URL: https://issues.apache.org/jira/browse/FLINK-5931
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> In order to use Flink in mission-critical environments, Flink must be 
> available even if the {{defaultFS}} is unavailable.
> We have deployed HDFS in HA mode in our production environment. In our 
> experience we have experienced performance degradations and downtime when the 
> HDFS cluster is being expanded or under maintenances. Under this case it is 
> desirable to deploy jobs through alternative filesystem (e.g., S3).
> This jira is to track the improvements to Flink to enable Flink to continue 
> to operate even {{defaultFS}} is unavailable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5931) Make Flink highly available even if defaultFS is unavailable

2017-02-27 Thread Haohui Mai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haohui Mai updated FLINK-5931:
--
Description: 
In order to use Flink in mission-critical environments, Flink must be available 
even if the {{defaultFS}} is unavailable.

We have deployed HDFS in HA mode in our production environment. In our 
experience we have experienced performance degradations and downtime when the 
HDFS cluster is being expanded or under maintenances. Under this case it is 
desirable to deploy jobs through alternative filesystem (e.g., S3).

This jira is to track the improvements to Flink to enable Flink to continue to 
operate even {{defaultFS}} is unavailable.


  was:
In order to use Flink in mission-critical environments, Flink must be available 
even if the {{defaultFS}} is unavailable.




> Make Flink highly available even if defaultFS is unavailable
> 
>
> Key: FLINK-5931
> URL: https://issues.apache.org/jira/browse/FLINK-5931
> Project: Flink
>  Issue Type: Improvement
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> In order to use Flink in mission-critical environments, Flink must be 
> available even if the {{defaultFS}} is unavailable.
> We have deployed HDFS in HA mode in our production environment. In our 
> experience we have experienced performance degradations and downtime when the 
> HDFS cluster is being expanded or under maintenances. Under this case it is 
> desirable to deploy jobs through alternative filesystem (e.g., S3).
> This jira is to track the improvements to Flink to enable Flink to continue 
> to operate even {{defaultFS}} is unavailable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886419#comment-15886419
 ] 

Shannon Carey commented on FLINK-5929:
--

If I understand correctly, I agree this would be useful. Currently we are 
working around this limitation in order to achieve communication between the 
Trigger (per-pane state) and the WindowFunction (per-operator state) by a hack 
within the WindowFunction that looks like this (we're not on 1.2 yet so we 
haven't looked at new ways to do this yet):

{code}
  def apply(key: String, window: TimeWindow, input, out): Unit = {
val fireTimestampState: ValueState[java.lang.Long] =
  getRuntimeContext.getState[java.lang.Long](fireTimestampStateDescriptor)

if (fireTimestampState.isInstanceOf[MemValueState[String, TimeWindow, 
java.lang.Long]]) {
  fireTimestampState.asInstanceOf[MemValueState[String, TimeWindow, 
java.lang.Long]].setCurrentNamespace(window)
} else if (fireTimestampState.isInstanceOf[RocksDBValueState[String, 
TimeWindow, java.lang.Long]]) {
  fireTimestampState.asInstanceOf[RocksDBValueState[String, TimeWindow, 
java.lang.Long]].setCurrentNamespace(window)
} else if (fireTimestampState.isInstanceOf[FsValueState[String, TimeWindow, 
java.lang.Long]]) {
  fireTimestampState.asInstanceOf[FsValueState[String, TimeWindow, 
java.lang.Long]].setCurrentNamespace(window)
}
fireTimestampState.value()
...
{code}

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5931) Make Flink highly available even if defaultFS is unavailable

2017-02-27 Thread Haohui Mai (JIRA)
Haohui Mai created FLINK-5931:
-

 Summary: Make Flink highly available even if defaultFS is 
unavailable
 Key: FLINK-5931
 URL: https://issues.apache.org/jira/browse/FLINK-5931
 Project: Flink
  Issue Type: Improvement
Reporter: Haohui Mai
Assignee: Haohui Mai


In order to use Flink in mission-critical environments, Flink must be available 
even if the {{defaultFS}} is unavailable.





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5706) Implement Flink's own S3 filesystem

2017-02-27 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886417#comment-15886417
 ] 

Seth Wiesman commented on FLINK-5706:
-

[~StephanEwen] To expand on our conversation from the mailing list.

As I mentioned before, EMR handles S3 consistency by building a [consistent 
view|http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html]
 using a dynamo db table. The core idea is that we should be able to trust that 
Flink is in a consistent state, so when a file system operation is performed 
such as a rename or a delete, we should first check that the requested file 
exists and if not wait for it. However, the file may actually not exist for 
whatever external reason or it may take S3 longer than we are willing to wait 
to become consistent because S3 consistency has no upper bound. 

{code:java}
public void delete(String path) throws Exception {
for (int i = 0;  i < numPasses; i++) {
   if (exists(path)) {
   delete_impl(path);
} else {
Thread.sleep(...);
} 
}

throw new FileNotFoundException("File either does not exist or took to long 
to become consistent);
}
{code} 

This is as far as I went with my implementation and it seems to work for most 
cases but it does contains two core issues: 

1) We are not able to differentiate between inconsistent files and missing 
files. If Flink is running in real time we are probably running into a 
consistency error, but what if the user restarts from a checkpoint in the past? 
In that case the files may actually not exist leaving Flink in an inconsistent 
state which breaks the core invariant of this solution. 

2) Certain operations really do take to long to become consistent, causing the 
entire pipeline to slow down. Take the bucketing sink as an example. On 
checkpoint the current in progress file is [renamed to pending | 
https://github.com/apache/flink/blob/8bcb2ae3ccf6a58d8f42f29d67fdb7d88a95f8ed/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L563].
 This will work because S3 has read on write consistency. But after the 
checkpoint is complete the [pending file is again renamed to complete | 
https://github.com/apache/flink/blob/8bcb2ae3ccf6a58d8f42f29d67fdb7d88a95f8ed/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L655].
 S3 renames do not have consistency guarantees and so this will eventually 
become inconsistent. The problem I ran into was even with the updated fs 
implementation there would inevitably be files which took upwards of several 
minutes to become consistent. Eventually this lead to writing a custom sink 
specifically for S3 which understands what it is capable of. Ultimately I 
believe this shows that for true S3 interop certain parts of this problem will 
bleed through the file system abstraction. 

> Implement Flink's own S3 filesystem
> ---
>
> Key: FLINK-5706
> URL: https://issues.apache.org/jira/browse/FLINK-5706
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Stephan Ewen
>
> As part of the effort to make Flink completely independent from Hadoop, Flink 
> needs its own S3 filesystem implementation. Currently Flink relies on 
> Hadoop's S3a and S3n file systems.
> An own S3 file system can be implemented using the AWS SDK. As the basis of 
> the implementation, the Hadoop File System can be used (Apache Licensed, 
> should be okay to reuse some code as long as we do a proper attribution).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5668) Reduce dependency on HDFS at job startup time

2017-02-27 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886404#comment-15886404
 ] 

Haohui Mai commented on FLINK-5668:
---

[~bill.liu8904] I believe that the title of the jira is confusing. Can you 
summarize what you want to achieve. Adding a PR will also help.

[~rmetzger] regarding this issue, do you have an idea why the current 
implementation writes the configuration into a file on {{default.FS}}? What do 
you think if passing the configuration through the ``taskManagerEnv``? Any 
downside for this approach?


> Reduce dependency on HDFS at job startup time
> -
>
> Key: FLINK-5668
> URL: https://issues.apache.org/jira/browse/FLINK-5668
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: Bill Liu
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> When create a Flink cluster on Yarn,  JobManager depends on  HDFS to share  
> taskmanager-conf.yaml  with TaskManager.
> It's better to share the taskmanager-conf.yaml  on JobManager Web server 
> instead of HDFS, which could reduce the HDFS dependency  at job startup.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886378#comment-15886378
 ] 

Shannon Carey edited comment on FLINK-5929 at 2/27/17 7:40 PM:
---

Just to clarify, you're referring to the state from 
AbstractRichFunction#getRuntimeContext.getState(), right? That's the state 
keyed by the combination of event key and window operator (that's what you mean 
by "global"). What you're suggesting is adding state for the individual window 
panes, right?


was (Author: rehevkor5):
Just to clarify, you're referring to the state from 
AbstractRichFunction#getRuntimeContext.getState(), right? That's the state 
keyed by the combination of event key and window operator. What you're 
suggesting is adding state for the individual window panes, right?

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886378#comment-15886378
 ] 

Shannon Carey edited comment on FLINK-5929 at 2/27/17 7:38 PM:
---

Just to clarify, you're referring to the state from 
AbstractRichFunction#getRuntimeContext.getState(), right? That's the state 
keyed by the combination of event key and window operator. What you're 
suggesting is adding state for the individual window panes, right?


was (Author: rehevkor5):
Just to clarify, you're referring to the state from 
AbstractRichFunction#getRuntimeContext.getState(), right? That's the state 
keyed by the combination of event key and window operator. What you're 
suggesting is adding state for the individual panes within the window, right?

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5907) RowCsvInputFormat bug on parsing tsv

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886379#comment-15886379
 ] 

ASF GitHub Bot commented on FLINK-5907:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3417
  
Thanks for the fast update @KurtYoung.
+1 to merge


> RowCsvInputFormat bug on parsing tsv
> 
>
> Key: FLINK-5907
> URL: https://issues.apache.org/jira/browse/FLINK-5907
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.2.0
>Reporter: Flavio Pompermaier
>Assignee: Kurt Young
>  Labels: csv, parsing
> Attachments: test.tsv
>
>
> The following snippet reproduce the problem (using the attached file as 
> input):
> {code:language=java}
> char fieldDelim = '\t';
> TypeInformation[] fieldTypes = new TypeInformation[51];
> for (int i = 0; i < fieldTypes.length; i++) {
>   fieldTypes[i] = BasicTypeInfo.STRING_TYPE_INFO;
> }
> int[] fieldMask = new int[fieldTypes.length];
> for (int i = 0; i < fieldMask.length; i++) {
>   fieldMask[i] = i;
> }
> RowCsvInputFormat csvIF = new RowCsvInputFormat(new Path(testCsv), 
> fieldTypes, "\n", fieldDelim +"", 
>fieldMask, true);
> csvIF.setNestedFileEnumeration(true);
> DataSet csv = env.createInput(csvIF);
>csv.print()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886378#comment-15886378
 ] 

Shannon Carey commented on FLINK-5929:
--

Just to clarify, you're referring to the state from 
AbstractRichFunction#getRuntimeContext.getState(), right? That's the state 
keyed by the combination of event key and window operator. What you're 
suggesting is adding state for the individual panes within the window, right?

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3417: [FLINK-5907] [java api] Fix trailing empty fields in CsvI...

2017-02-27 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3417
  
Thanks for the fast update @KurtYoung.
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-5668) Reduce dependency on HDFS at job startup time

2017-02-27 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886342#comment-15886342
 ] 

Haohui Mai edited comment on FLINK-5668 at 2/27/17 7:37 PM:


[~rmetzger] -- just want to clarify FLINK-5631 here.

YARN downloads the resources from the specified paths and localizes the 
resources on worker nodes. Note that the {{Path}} class in the Hadoop APIs 
supports specifying filesystem other than the one specified in {{default.FS}}. 
For example, {{new Path(URI.create("s3a://foo")}} specifies the a resource on 
S3, regardless what {{default.FS}} is specified. FLINK-5631 enables YARN to 
localize resources that are not stored on {{default.FS}}.

As a result I think that the current configuration should be sufficient as 
Flink properly recognizes paths that specifies different file systems.



was (Author: wheat9):
[~rmetzger] -- just want to clarify FLINK-5631 here.

YARN downloads the resources from the specified paths and localizes the 
resources on worker nodes. Note that the {{Path}} class in the Hadoop APIs 
supports specifying filesystem other than the one specified in {{default.FS}}. 
For example, {{new Path(URI.create("s3a://foo")}} specifies the a resource on 
S3, regardless what {{default.FS}} is specified. FLINK-5631 enables YARN to 
localize resources that are not stored on {{default.FS}}.

As a result I think that the current configuration should be sufficient as 
Flink probably recognizes paths that specifies different file systems.


> Reduce dependency on HDFS at job startup time
> -
>
> Key: FLINK-5668
> URL: https://issues.apache.org/jira/browse/FLINK-5668
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: Bill Liu
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> When create a Flink cluster on Yarn,  JobManager depends on  HDFS to share  
> taskmanager-conf.yaml  with TaskManager.
> It's better to share the taskmanager-conf.yaml  on JobManager Web server 
> instead of HDFS, which could reduce the HDFS dependency  at job startup.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5668) Reduce dependency on HDFS at job startup time

2017-02-27 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886342#comment-15886342
 ] 

Haohui Mai edited comment on FLINK-5668 at 2/27/17 7:37 PM:


[~rmetzger] -- just want to clarify FLINK-5631 here.

YARN downloads the resources from the specified paths and localizes the 
resources on worker nodes. Note that the {{Path}} class in the Hadoop APIs 
supports specifying filesystem other than the one specified in {{default.FS}}. 
For example, {{new Path(URI.create("s3a://foo")}} specifies the a resource on 
S3, regardless what {{default.FS}} is specified. FLINK-5631 enables YARN to 
localize resources that are not stored on {{default.FS}}.

As a result I think that the current configuration should be sufficient as 
Flink probably recognizes paths that specifies different file systems.



was (Author: wheat9):
[~rmetzger] -- just want to clarify FLINK-5631 here.

YARN downloads the resources from the specified paths and localizes the 
resources on worker nodes. Note that the {{Path}} class in the Hadoop APIs 
supports specifying filesystem other than the one specified in {{default.FS}}. 
For example, {{new Path(URI.create("s3a://foo")}} specifies the a resource on 
S3, regardless what {{default.FS}} is specified. FLINK-5631 enables YARN to 
localize resources that are not stored on {{default.FS}}.



> Reduce dependency on HDFS at job startup time
> -
>
> Key: FLINK-5668
> URL: https://issues.apache.org/jira/browse/FLINK-5668
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: Bill Liu
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> When create a Flink cluster on Yarn,  JobManager depends on  HDFS to share  
> taskmanager-conf.yaml  with TaskManager.
> It's better to share the taskmanager-conf.yaml  on JobManager Web server 
> instead of HDFS, which could reduce the HDFS dependency  at job startup.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5668) Reduce dependency on HDFS at job startup time

2017-02-27 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886342#comment-15886342
 ] 

Haohui Mai commented on FLINK-5668:
---

[~rmetzger] -- just want to clarify FLINK-5631 here.

YARN downloads the resources from the specified paths and localizes the 
resources on worker nodes. Note that the {{Path}} class in the Hadoop APIs 
supports specifying filesystem other than the one specified in {{default.FS}}. 
For example, {{new Path(URI.create("s3a://foo")}} specifies the a resource on 
S3, regardless what {{default.FS}} is specified. FLINK-5631 enables YARN to 
localize resources that are not stored on {{default.FS}}.



> Reduce dependency on HDFS at job startup time
> -
>
> Key: FLINK-5668
> URL: https://issues.apache.org/jira/browse/FLINK-5668
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: Bill Liu
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> When create a Flink cluster on Yarn,  JobManager depends on  HDFS to share  
> taskmanager-conf.yaml  with TaskManager.
> It's better to share the taskmanager-conf.yaml  on JobManager Web server 
> instead of HDFS, which could reduce the HDFS dependency  at job startup.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5923) Test instability in SavepointITCase testTriggerSavepointAndResume

2017-02-27 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886298#comment-15886298
 ] 

Nico Kruber commented on FLINK-5923:


I guess the following error may also originate from that race condition(?)

https://s3.amazonaws.com/archive.travis-ci.org/jobs/205888798/log.txt

{code}
testTriggerSavepointAndResume(org.apache.flink.test.checkpointing.SavepointITCase)
  Time elapsed: 1.581 sec  <<< ERROR!
java.io.IOException: Unable to delete file: 
/tmp/junit1592062472104041767/junit8429426931866360142/checkpoints/5ec09c5215b989bd25752be56ca02a46/chk-5/15b909b5-f375-45e5-8737-10935d77c9a4
at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2279)
at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270)
at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270)
at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2270)
at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1653)
at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:1535)
at 
org.apache.flink.test.checkpointing.SavepointITCase.testTriggerSavepointAndResume(SavepointITCase.java:411)
{code}

> Test instability in SavepointITCase testTriggerSavepointAndResume
> -
>
> Key: FLINK-5923
> URL: https://issues.apache.org/jira/browse/FLINK-5923
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>  Labels: test-stability
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/205042538/log.txt
> {code}
> Failed tests: 
>   SavepointITCase.testTriggerSavepointAndResume:258 Checkpoints directory not 
> cleaned up: 
> [/tmp/junit1029044621247843839/junit7338507921051602138/checkpoints/47fa12635d098bdafd52def453e6d66c/chk-4]
>  expected:<0> but was:<1>
> {code}
> I think this is due to a race in the test. When shutting down the cluster it 
> can happen that in progress checkpoints linger around.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5758) Port-range for the web interface via YARN

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886235#comment-15886235
 ] 

ASF GitHub Bot commented on FLINK-5758:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3391
  
@barcahead  Thansk for contributing this.
I can try and get to this later this week. Big pull request backlog right 
now ;-)


> Port-range for the web interface via YARN
> -
>
> Key: FLINK-5758
> URL: https://issues.apache.org/jira/browse/FLINK-5758
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
>Affects Versions: 1.2.0, 1.1.4, 1.3.0
>Reporter: Kanstantsin Kamkou
>Assignee: Yelei Feng
>  Labels: network
>
> In case of YARN, the {{ConfigConstants.JOB_MANAGER_WEB_PORT_KEY}}   [is 
> changed to 
> 0|https://github.com/apache/flink/blob/release-1.2.0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java#L526].
>  Please allow port ranges in this case. DevOps need that.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3391: [FLINK-5758] [yarn] support port range for web monitor

2017-02-27 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3391
  
@barcahead  Thansk for contributing this.
I can try and get to this later this week. Big pull request backlog right 
now ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-5668) Reduce dependency on HDFS at job startup time

2017-02-27 Thread Bill Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886220#comment-15886220
 ] 

Bill Liu edited comment on FLINK-5668 at 2/27/17 6:00 PM:
--

[~rmetzger] 
 [~wheat9] and I are working on implementing a flink job deployer  for a Yarn 
with {{HttpFs}} and {{S3}}.
The Yarn Container could resolve the {{http/s3}}  file scheme. 

We use HttpFs  instead of  HDFS to bootstrap the JobManager
Here is the code to set up the AM container (JobManager)
{code}
Path resourcePath = new Path("http://localhost:19989/flink-dist.jar;)
FileStatus fileStatus = resourcePath.getFileSystem(yarnConfiguration)
.getFileStatus(resourcePath);
LOG.info("resource {}", ConverterUtils.getYarnUrlFromPath(resourcePath));
LocalResource packageResource =
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(resourcePath),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
fileStatus.getLen(), fileStatus.getModificationTime());
LOG.info("add localresource {}", packageResource);
localResources.put("flink.jar", packageResource);
   amContainer.setLocalResources(localResources);
{code}
{{yarn.deploy.fs}}  is not a goog idea, because these bootstrap jars/files may 
be located on different filesystem.
It's better to parse the jar Path to get the underneath filesystem of jar.



was (Author: bill.liu8904):
[~rmetzger] 
 [~wheat9] and I are working on implementing a flink job deployer  for a Yarn 
with `HttpFs` and `S3`.
The Yarn Container could resolve the `http/s3`  file scheme. 

We use `HttpFs` instead of `HDFS` to bootstrap the JobManager
Here is the code to set up the AM container (JobManager)
```
Path resourcePath = new Path("http://localhost:19989/flink-dist.jar;)
FileStatus fileStatus = resourcePath.getFileSystem(yarnConfiguration)
.getFileStatus(resourcePath);
LOG.info("resource {}", ConverterUtils.getYarnUrlFromPath(resourcePath));
LocalResource packageResource =
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(resourcePath),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
fileStatus.getLen(), fileStatus.getModificationTime());
LOG.info("add localresource {}", packageResource);
localResources.put("flink.jar", packageResource);
   amContainer.setLocalResources(localResources);
```
`yarn.deploy.fs`  is not a goog idea, because these bootstrap jars/files may be 
located on different filesystem.
It's better to parse the jar Path to get the underneath filesystem of jar.


> Reduce dependency on HDFS at job startup time
> -
>
> Key: FLINK-5668
> URL: https://issues.apache.org/jira/browse/FLINK-5668
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: Bill Liu
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> When create a Flink cluster on Yarn,  JobManager depends on  HDFS to share  
> taskmanager-conf.yaml  with TaskManager.
> It's better to share the taskmanager-conf.yaml  on JobManager Web server 
> instead of HDFS, which could reduce the HDFS dependency  at job startup.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5668) Reduce dependency on HDFS at job startup time

2017-02-27 Thread Bill Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886220#comment-15886220
 ] 

Bill Liu edited comment on FLINK-5668 at 2/27/17 5:55 PM:
--

[~rmetzger] 
 [~wheat9] and I are working on implementing a flink job deployer  for a Yarn 
with `HttpFs` and `S3`.
The Yarn Container could resolve the `http/s3`  file scheme. 

We use `HttpFs` instead of `HDFS` to bootstrap the JobManager
Here is the code to set up the AM container (JobManager)
```
Path resourcePath = new Path("http://localhost:19989/flink-dist.jar;)
FileStatus fileStatus = resourcePath.getFileSystem(yarnConfiguration)
.getFileStatus(resourcePath);
LOG.info("resource {}", ConverterUtils.getYarnUrlFromPath(resourcePath));
LocalResource packageResource =
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(resourcePath),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
fileStatus.getLen(), fileStatus.getModificationTime());
LOG.info("add localresource {}", packageResource);
localResources.put("flink.jar", packageResource);
   amContainer.setLocalResources(localResources);
```
`yarn.deploy.fs`  is not a goog idea, because these bootstrap jars/files may be 
located on different filesystem.
It's better to parse the jar Path to get the underneath filesystem of jar.



was (Author: bill.liu8904):
[~rmetzger] 
 [~wheat9]] and I are working on implementing a flink job deployer  for a Yarn 
with `HttpFs` and `S3`.
The Yarn Container could resolve the `http/s3`  file scheme. 

We use `HttpFs` instead of `HDFS` to bootstrap the JobManager
Here is the code to set up the AM container (JobManager)
```
Path resourcePath = new Path("http://localhost:19989/flink-dist.jar;)
FileStatus fileStatus = resourcePath.getFileSystem(yarnConfiguration)
.getFileStatus(resourcePath);
LOG.info("resource {}", ConverterUtils.getYarnUrlFromPath(resourcePath));
LocalResource packageResource =
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(resourcePath),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
fileStatus.getLen(), fileStatus.getModificationTime());
LOG.info("add localresource {}", packageResource);
localResources.put("flink.jar", packageResource);
   amContainer.setLocalResources(localResources);
```
`yarn.deploy.fs`  is not a goog idea, because these bootstrap jars/files may be 
located on different filesystem.
It's better to parse the jar Path to get the underneath filesystem of jar.


> Reduce dependency on HDFS at job startup time
> -
>
> Key: FLINK-5668
> URL: https://issues.apache.org/jira/browse/FLINK-5668
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: Bill Liu
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> When create a Flink cluster on Yarn,  JobManager depends on  HDFS to share  
> taskmanager-conf.yaml  with TaskManager.
> It's better to share the taskmanager-conf.yaml  on JobManager Web server 
> instead of HDFS, which could reduce the HDFS dependency  at job startup.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5668) Reduce dependency on HDFS at job startup time

2017-02-27 Thread Bill Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886220#comment-15886220
 ] 

Bill Liu commented on FLINK-5668:
-

[~rmetzger] 
 [~wheat9]] and I are working on implementing a flink job deployer  for a Yarn 
with `HttpFs` and `S3`.
The Yarn Container could resolve the `http/s3`  file scheme. 

We use `HttpFs` instead of `HDFS` to bootstrap the JobManager
Here is the code to set up the AM container (JobManager)
```
Path resourcePath = new Path("http://localhost:19989/flink-dist.jar;)
FileStatus fileStatus = resourcePath.getFileSystem(yarnConfiguration)
.getFileStatus(resourcePath);
LOG.info("resource {}", ConverterUtils.getYarnUrlFromPath(resourcePath));
LocalResource packageResource =
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(resourcePath),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
fileStatus.getLen(), fileStatus.getModificationTime());
LOG.info("add localresource {}", packageResource);
localResources.put("flink.jar", packageResource);
   amContainer.setLocalResources(localResources);
```
`yarn.deploy.fs`  is not a goog idea, because these bootstrap jars/files may be 
located on different filesystem.
It's better to parse the jar Path to get the underneath filesystem of jar.


> Reduce dependency on HDFS at job startup time
> -
>
> Key: FLINK-5668
> URL: https://issues.apache.org/jira/browse/FLINK-5668
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: Bill Liu
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> When create a Flink cluster on Yarn,  JobManager depends on  HDFS to share  
> taskmanager-conf.yaml  with TaskManager.
> It's better to share the taskmanager-conf.yaml  on JobManager Web server 
> instead of HDFS, which could reduce the HDFS dependency  at job startup.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5930) Allow Access to Job Id from Runtime Context

2017-02-27 Thread Seth Wiesman (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seth Wiesman updated FLINK-5930:

Affects Version/s: (was: 1.2.1)
   (was: 1.3.0)

> Allow Access to Job Id from Runtime Context
> ---
>
> Key: FLINK-5930
> URL: https://issues.apache.org/jira/browse/FLINK-5930
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Seth Wiesman
>Priority: Minor
>  Labels: features
>
> There is no way to accessing the JobId from the RuntimeContext. This would be 
> useful for scoping custom output to a job similarly to how state is scoped to 
> the current job. The only approximation is to create a different JobId from 
> using the JobId class which can make it difficult to readily understand which 
> output corresponds to which job. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-5849) Kafka Consumer checkpointed state may contain undefined offsets

2017-02-27 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-5849.

   Resolution: Fixed
Fix Version/s: 1.3.0

Resolved for {{master}} via 
http://git-wip-us.apache.org/repos/asf/flink/commit/ed68fed

> Kafka Consumer checkpointed state may contain undefined offsets
> ---
>
> Key: FLINK-5849
> URL: https://issues.apache.org/jira/browse/FLINK-5849
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> This is a regression due to FLINK-4280.
> In FLINK-4280, all initial offset determination was refactored to be 
> consolidated at the start of {{AbstractFetcher#runFetchLoop}}. However, this 
> caused checkpoints that were triggered before the method was ever reached to 
> contain undefined partition offsets.
> Ref:
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:392)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
> at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:32)
> at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(KafkaConsumerTestBase.java:942)
> at 
> org.apache.flink.streaming.connectors.kafka.Kafka09ITCase.testMultipleSourcesOnePartition(Kafka09ITCase.java:76)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:915)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: Restoring from a checkpoint / 
> savepoint, but found a partition state Partition: 
> KafkaTopicPartition{topic='manyToOneTopic', partition=2}, 
> KafkaPartitionHandle=manyToOneTopic-2, offset=(not set) that does not have a 
> defined offset.
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(KafkaConsumerThread.java:133)
> at 
> 

[GitHub] flink pull request #3378: [FLINK-5849] [kafka] Move FlinkKafkaConsumer start...

2017-02-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3378


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5849) Kafka Consumer checkpointed state may contain undefined offsets

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886214#comment-15886214
 ] 

ASF GitHub Bot commented on FLINK-5849:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3378


> Kafka Consumer checkpointed state may contain undefined offsets
> ---
>
> Key: FLINK-5849
> URL: https://issues.apache.org/jira/browse/FLINK-5849
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> This is a regression due to FLINK-4280.
> In FLINK-4280, all initial offset determination was refactored to be 
> consolidated at the start of {{AbstractFetcher#runFetchLoop}}. However, this 
> caused checkpoints that were triggered before the method was ever reached to 
> contain undefined partition offsets.
> Ref:
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:392)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
> at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:32)
> at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(KafkaConsumerTestBase.java:942)
> at 
> org.apache.flink.streaming.connectors.kafka.Kafka09ITCase.testMultipleSourcesOnePartition(Kafka09ITCase.java:76)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:915)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: Restoring from a checkpoint / 
> savepoint, but found a partition state Partition: 
> KafkaTopicPartition{topic='manyToOneTopic', partition=2}, 
> KafkaPartitionHandle=manyToOneTopic-2, offset=(not set) that does not have a 
> defined offset.
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(KafkaConsumerThread.java:133)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:113)
> at 
> 

[GitHub] flink issue #3378: [FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset...

2017-02-27 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3378
  
Tests pass, merging ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5849) Kafka Consumer checkpointed state may contain undefined offsets

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886210#comment-15886210
 ] 

ASF GitHub Bot commented on FLINK-5849:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3378
  
Tests pass, merging ..


> Kafka Consumer checkpointed state may contain undefined offsets
> ---
>
> Key: FLINK-5849
> URL: https://issues.apache.org/jira/browse/FLINK-5849
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> This is a regression due to FLINK-4280.
> In FLINK-4280, all initial offset determination was refactored to be 
> consolidated at the start of {{AbstractFetcher#runFetchLoop}}. However, this 
> caused checkpoints that were triggered before the method was ever reached to 
> contain undefined partition offsets.
> Ref:
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:392)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
> at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:32)
> at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(KafkaConsumerTestBase.java:942)
> at 
> org.apache.flink.streaming.connectors.kafka.Kafka09ITCase.testMultipleSourcesOnePartition(Kafka09ITCase.java:76)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:915)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: Restoring from a checkpoint / 
> savepoint, but found a partition state Partition: 
> KafkaTopicPartition{topic='manyToOneTopic', partition=2}, 
> KafkaPartitionHandle=manyToOneTopic-2, offset=(not set) that does not have a 
> defined offset.
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.(KafkaConsumerThread.java:133)
> at 
> 

[jira] [Commented] (FLINK-5917) Remove MapState.size()

2017-02-27 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886157#comment-15886157
 ] 

Stephan Ewen commented on FLINK-5917:
-

Doesn't the cache compromise the "out-of-core" capabilities of the 
RocksDB-based MapState?

> Remove MapState.size()
> --
>
> Key: FLINK-5917
> URL: https://issues.apache.org/jira/browse/FLINK-5917
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Aljoscha Krettek
>
> I'm proposing to remove {{size()}} because it is a prohibitively expensive 
> operation and users might not be aware of it. Instead of {{size()}} users can 
> use an iterator over all mappings to determine the size, when doing this they 
> will be aware of the fact that it is a costly operation.
> Right now, {{size()}} is only costly on the RocksDB state backend but I think 
> with future developments on the in-memory state backend it might also become 
> an expensive operation there.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5930) Allow Access to Job Id from Runtime Context

2017-02-27 Thread Seth Wiesman (JIRA)
Seth Wiesman created FLINK-5930:
---

 Summary: Allow Access to Job Id from Runtime Context
 Key: FLINK-5930
 URL: https://issues.apache.org/jira/browse/FLINK-5930
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 1.3.0, 1.2.1
Reporter: Seth Wiesman
Priority: Minor


There is no way to accessing the JobId from the RuntimeContext. This would be 
useful for scoping custom output to a job similarly to how state is scoped to 
the current job. The only approximation is to create a different JobId from 
using the JobId class which can make it difficult to readily understand which 
output corresponds to which job. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5849) Kafka Consumer checkpointed state may contain undefined offsets

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886122#comment-15886122
 ] 

ASF GitHub Bot commented on FLINK-5849:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3378
  
Thanks for the review @rmetzger!
For the moving of `open()` to before `run()`, I've included that as the 
commit tagged with FLINK-5849.
For the star import fix, I'm going to include that within a follow-up 
hotfix that cleansup all Flink Kafka connector tests of star & unused imports.

Doing one final Travis run locally and merging this to `master` once it 
turns green.


> Kafka Consumer checkpointed state may contain undefined offsets
> ---
>
> Key: FLINK-5849
> URL: https://issues.apache.org/jira/browse/FLINK-5849
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> This is a regression due to FLINK-4280.
> In FLINK-4280, all initial offset determination was refactored to be 
> consolidated at the start of {{AbstractFetcher#runFetchLoop}}. However, this 
> caused checkpoints that were triggered before the method was ever reached to 
> contain undefined partition offsets.
> Ref:
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:392)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209)
> at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
> at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:32)
> at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(KafkaConsumerTestBase.java:942)
> at 
> org.apache.flink.streaming.connectors.kafka.Kafka09ITCase.testMultipleSourcesOnePartition(Kafka09ITCase.java:76)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:915)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: Restoring from a checkpoint / 
> savepoint, but found a partition state Partition: 
> 

[GitHub] flink issue #3378: [FLINK-5849] [kafka] Move FlinkKafkaConsumer start offset...

2017-02-27 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3378
  
Thanks for the review @rmetzger!
For the moving of `open()` to before `run()`, I've included that as the 
commit tagged with FLINK-5849.
For the star import fix, I'm going to include that within a follow-up 
hotfix that cleansup all Flink Kafka connector tests of star & unused imports.

Doing one final Travis run locally and merging this to `master` once it 
turns green.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-5929:
---

 Summary: Allow Access to Per-Window State in ProcessWindowFunction
 Key: FLINK-5929
 URL: https://issues.apache.org/jira/browse/FLINK-5929
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Aljoscha Krettek


Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} can 
access is scoped to the key of the window but not the window itself. That is, 
state is global across all windows for a given key.

For some use cases it is beneficial to keep state scoped to a window. For 
example, if you expect to have several {{Trigger}} firings (due to early and 
late firings) a user can keep state per window to keep some information between 
those firings.

The per-window state has to be cleaned up in some way. For this I see two 
options:
 - Keep track of all state that a user uses and clean up when we reach the 
window GC horizon.
 - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called when 
we reach the window GC horizon that users can/should use to clean up their 
state.

On the API side, we can add a method {{windowState()}} on 
{{ProcessWindowFunction.Context}} that retrieves the per-window state and 
{{globalState()}} that would allow access to the (already available) global 
state. The {{Context}} would then look like this:
{code}
/**
 * The context holding window metadata
 */
public abstract class Context {
/**
 * @return The window that is being evaluated.
 */
public abstract W window();

/**
 * State accessor for per-key and per-window state.
 */
KeyedStateStore windowState();

/**
 * State accessor for per-key global state.
 */
KeyedStateStore globalState();
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5928) Externalized checkpoints overwritting each other

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886022#comment-15886022
 ] 

ASF GitHub Bot commented on FLINK-5928:
---

GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/3424

[FLINK-5928] [checkpoints] Use custom metadata file for externalized 
checkpoints

- Adds a checkpoint coordinator test for externalized checkpoints. This was 
covered only in unit tests for the involved checkpoint components like 
PendingCheckpoint etc. This would have caught the issue.
- The fix is to not use a `_metadata` but a random 
`checkpoint_metadata-:randomSuffix` file for externalized checkpoints, because 
they are not unique per configured directory. Hopefully, we can get rid of this 
soon with @StephanEwen's refactorings.

This is based on top of #3411, which is already good to merge.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 5928-ext_chk_metadata

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3424.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3424


commit a3d2405f690e08d3de4f641428887ab04ba2ca2d
Author: Stephan Ewen 
Date:   2017-02-17T16:51:00Z

[FLINK-5822] [state backends] Make JobManager / Checkpoint Coordinator 
aware of the root state backend

commit b2f3bc41bc991e8deb22fb89822f28c75d94c8f7
Author: Stephan Ewen 
Date:   2017-02-22T21:18:50Z

[FLINK-5897] [checkpoints] Make checkpoint externalization not depend 
strictly on FileSystems

That is the first step towards checkpoints that can be externalized to 
other stores as well,
like k/v stores and databases, if supported by the state backend.

commit 537be203dab0614383645e859bbafb6ebfeb3161
Author: Ufuk Celebi 
Date:   2017-02-27T15:12:37Z

[FLINK-5928] [checkpoints] Add 
CheckpointCoordinatorExternalizedCheckpointsTest

Problem: there were only unit tests for the checkpoint instances available
that don't test the behaviour of the checkpoint coordinator with respect
to externalized checkpoints.

commit 88e4700cce630f8ae869abff22acfd46ab999aa0
Author: Ufuk Celebi 
Date:   2017-02-27T15:58:14Z

[FLINK-5928] [checkpoints] Use custom metadata file for externalized 
checkpoints




> Externalized checkpoints overwritting each other
> 
>
> Key: FLINK-5928
> URL: https://issues.apache.org/jira/browse/FLINK-5928
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>Priority: Critical
>
> I noticed that PR #3346 accidentally broke externalized checkpoints by using 
> a fixed meta data file name. We should restore the old behaviour with 
> creating random files and double check why no test caught this.
> This will likely superseded by upcoming changes from [~StephanEwen] to use 
> metadata streams on the JobManager.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3424: [FLINK-5928] [checkpoints] Use custom metadata fil...

2017-02-27 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/3424

[FLINK-5928] [checkpoints] Use custom metadata file for externalized 
checkpoints

- Adds a checkpoint coordinator test for externalized checkpoints. This was 
covered only in unit tests for the involved checkpoint components like 
PendingCheckpoint etc. This would have caught the issue.
- The fix is to not use a `_metadata` but a random 
`checkpoint_metadata-:randomSuffix` file for externalized checkpoints, because 
they are not unique per configured directory. Hopefully, we can get rid of this 
soon with @StephanEwen's refactorings.

This is based on top of #3411, which is already good to merge.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 5928-ext_chk_metadata

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3424.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3424


commit a3d2405f690e08d3de4f641428887ab04ba2ca2d
Author: Stephan Ewen 
Date:   2017-02-17T16:51:00Z

[FLINK-5822] [state backends] Make JobManager / Checkpoint Coordinator 
aware of the root state backend

commit b2f3bc41bc991e8deb22fb89822f28c75d94c8f7
Author: Stephan Ewen 
Date:   2017-02-22T21:18:50Z

[FLINK-5897] [checkpoints] Make checkpoint externalization not depend 
strictly on FileSystems

That is the first step towards checkpoints that can be externalized to 
other stores as well,
like k/v stores and databases, if supported by the state backend.

commit 537be203dab0614383645e859bbafb6ebfeb3161
Author: Ufuk Celebi 
Date:   2017-02-27T15:12:37Z

[FLINK-5928] [checkpoints] Add 
CheckpointCoordinatorExternalizedCheckpointsTest

Problem: there were only unit tests for the checkpoint instances available
that don't test the behaviour of the checkpoint coordinator with respect
to externalized checkpoints.

commit 88e4700cce630f8ae869abff22acfd46ab999aa0
Author: Ufuk Celebi 
Date:   2017-02-27T15:58:14Z

[FLINK-5928] [checkpoints] Use custom metadata file for externalized 
checkpoints




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2017-02-27 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885951#comment-15885951
 ] 

Aljoscha Krettek commented on FLINK-3674:
-

The main reason was not wanting to expose to much because it's not yet clear to 
me what would be a good interface for users. You're right, though, that we 
could add a {{cancel()}} method. The side reason was that deleting timers can 
be very expensive and users might not be aware of it and therefore write slow 
programs.

> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
> Fix For: 1.2.0
>
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction, 
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-2922) Add Queryable Window Operator

2017-02-27 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885949#comment-15885949
 ] 

Aljoscha Krettek commented on FLINK-2922:
-

By the way, it would be as easy as adding
{code}
stateDesc.setQueryable("queryable-name");
{code}

in {{WindowOperator.java:line 354}} and similar places to make the window 
contents queryable.

> Add Queryable Window Operator
> -
>
> Key: FLINK-2922
> URL: https://issues.apache.org/jira/browse/FLINK-2922
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>  Labels: requires-design-doc
> Attachments: FLINK-2922.pdf
>
>
> The idea is to provide a window operator that allows to query the current 
> window result at any time without discarding the current result.
> For example, a user might have an aggregation window operation with tumbling 
> windows of 1 hour. Now, at any time they might be interested in the current 
> aggregated value for the currently in-flight hour window.
> The idea is to make the operator a two input operator where normal elements 
> arrive on input one while queries arrive on input two. The query stream must 
> be keyed by the same key as the input stream. If an input arrives for a key 
> the current value for that key is emitted along with the query element so 
> that the user can map the result to the query.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5524) Support early out for code generated conjunctive conditions

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885926#comment-15885926
 ] 

ASF GitHub Bot commented on FLINK-5524:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3372
  
hey @twalthr, any more comments or this can be merged


> Support early out for code generated conjunctive conditions
> ---
>
> Key: FLINK-5524
> URL: https://issues.apache.org/jira/browse/FLINK-5524
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0, 1.1.4, 1.3.0
>Reporter: Fabian Hueske
>Assignee: Kurt Young
>
> Currently, all nested conditions for a conjunctive predicate are evaluated 
> before the conjunction is checked.
> A condition like {{(v1 == v2) && (v3 < 5)}} would be compiled into
> {code}
> boolean res1;
> if (v1 == v2) {
>   res1 = true;
> } else {
>   res1 = false;
> }
> boolean res2;
> if (v3 < 5) {
>   res2 = true;
> } else {
>   res2 = false;
> }
> boolean res3;
> if (res1 && res2) {
>   res3 = true;
> } else {
>   res3 = false;
> }
> if (res3) {
>   // emit something
> }
> {code}
> It would be better to leave the generated code as early as possible, e.g., 
> with a {{return}} instead of {{res1 = false}}. The code generator needs a bit 
> of context information for that.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3372: [FLINK-5524] [table] Support early out for code generated...

2017-02-27 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3372
  
hey @twalthr, any more comments or this can be merged


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5586) Extend TableProgramsTestBase for object reuse modes

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885923#comment-15885923
 ] 

ASF GitHub Bot commented on FLINK-5586:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3339
  
hey @fhueske can you take a look at this?


> Extend TableProgramsTestBase for object reuse modes
> ---
>
> Key: FLINK-5586
> URL: https://issues.apache.org/jira/browse/FLINK-5586
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Kurt Young
>
> We should also test if all runtime operators of the Table API work correctly 
> if object reuse mode is set to true. This should be done for all 
> cluster-based ITCases, not the collection-based ones.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3339: [FLINK-5586] [table] Extend TableProgramsClusterTestBase ...

2017-02-27 Thread KurtYoung
Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/3339
  
hey @fhueske can you take a look at this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner s...

2017-02-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3418#discussion_r103206651
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
 ---
@@ -43,7 +45,13 @@ class FlatMapRunner[IN, OUT](
 LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
 val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, 
code)
 LOG.debug("Instantiating FlatMapFunction.")
-function = clazz.newInstance()
+if (null == parameterValues || null == parameterTypes) {
--- End diff --

move `function =` out of the `if` condition, i.e.,
```
function = if (null == parameterValues || null == parameterTypes) {
  clazz.newInstance()
} else {
  clazz
.getConstructor(parameterTypes: _*)
.newInstance(parameterValues: _*)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner s...

2017-02-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3418#discussion_r103226511
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1562,6 +1561,39 @@ class CodeGenerator(
   }
 
   /**
+* Adds a reusable constructor statement with the given parameter types,
+* and the member variable's name that stores the variables passed in 
the constructor.
+*
+* @param memberVariables The member variable's name that stores the 
variables passed in
+*the constructor.
+* @param parameterTypes The parameter types to construct the function
+*/
+  def addReusableConstructorWithMemberVariables(
--- End diff --

This method is basically doing the same as `addReusableConstructor` except 
that it allows to hardcode the names of the member variables. The same 
functionality (in a safe manner) can be achieved by reusing existing code, IMO.

For instance, the code of the test could be rewritten to:

```
val params = generator.addReusableConstructor(classOf[Long])
val card = params(0)

val body =
  s"""
 |return java.lang.Long.valueOf(in1) + 
java.lang.Long.valueOf($card);
 """.stripMargin
```

We would have to remove the `null` assignment for the initially empty field 
variable in `addReusableConstructor` though (or check if it is a primitive 
type).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5776) Improve XXMapRunner support create instance by carrying constructor parameters

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885916#comment-15885916
 ] 

ASF GitHub Bot commented on FLINK-5776:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3418#discussion_r103226511
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1562,6 +1561,39 @@ class CodeGenerator(
   }
 
   /**
+* Adds a reusable constructor statement with the given parameter types,
+* and the member variable's name that stores the variables passed in 
the constructor.
+*
+* @param memberVariables The member variable's name that stores the 
variables passed in
+*the constructor.
+* @param parameterTypes The parameter types to construct the function
+*/
+  def addReusableConstructorWithMemberVariables(
--- End diff --

This method is basically doing the same as `addReusableConstructor` except 
that it allows to hardcode the names of the member variables. The same 
functionality (in a safe manner) can be achieved by reusing existing code, IMO.

For instance, the code of the test could be rewritten to:

```
val params = generator.addReusableConstructor(classOf[Long])
val card = params(0)

val body =
  s"""
 |return java.lang.Long.valueOf(in1) + 
java.lang.Long.valueOf($card);
 """.stripMargin
```

We would have to remove the `null` assignment for the initially empty field 
variable in `addReusableConstructor` though (or check if it is a primitive 
type).


> Improve XXMapRunner support create instance by carrying constructor parameters
> --
>
> Key: FLINK-5776
> URL: https://issues.apache.org/jira/browse/FLINK-5776
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> At present, MapRunner FlatMapRunner only supports create non-parameter 
> instance, but sometimes we need to carry constructor parameters to 
> instantiate, so I would like to improve XXMapRunner support create instance 
> by carrying constructor parameters.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5776) Improve XXMapRunner support create instance by carrying constructor parameters

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885917#comment-15885917
 ] 

ASF GitHub Bot commented on FLINK-5776:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3418#discussion_r103206923
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
 ---
@@ -41,7 +43,13 @@ class MapRunner[IN, OUT](
 LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
 val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, 
code)
 LOG.debug("Instantiating MapFunction.")
-function = clazz.newInstance()
+if (null == parameterValues || null == parameterTypes) {
--- End diff --

Same as `FlatMapRunner`


> Improve XXMapRunner support create instance by carrying constructor parameters
> --
>
> Key: FLINK-5776
> URL: https://issues.apache.org/jira/browse/FLINK-5776
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> At present, MapRunner FlatMapRunner only supports create non-parameter 
> instance, but sometimes we need to carry constructor parameters to 
> instantiate, so I would like to improve XXMapRunner support create instance 
> by carrying constructor parameters.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5776) Improve XXMapRunner support create instance by carrying constructor parameters

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885915#comment-15885915
 ] 

ASF GitHub Bot commented on FLINK-5776:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3418#discussion_r103206651
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala
 ---
@@ -43,7 +45,13 @@ class FlatMapRunner[IN, OUT](
 LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
 val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, 
code)
 LOG.debug("Instantiating FlatMapFunction.")
-function = clazz.newInstance()
+if (null == parameterValues || null == parameterTypes) {
--- End diff --

move `function =` out of the `if` condition, i.e.,
```
function = if (null == parameterValues || null == parameterTypes) {
  clazz.newInstance()
} else {
  clazz
.getConstructor(parameterTypes: _*)
.newInstance(parameterValues: _*)
}
```


> Improve XXMapRunner support create instance by carrying constructor parameters
> --
>
> Key: FLINK-5776
> URL: https://issues.apache.org/jira/browse/FLINK-5776
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> At present, MapRunner FlatMapRunner only supports create non-parameter 
> instance, but sometimes we need to carry constructor parameters to 
> instantiate, so I would like to improve XXMapRunner support create instance 
> by carrying constructor parameters.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3418: [FLINK-5776][Table API & SQL]Improve XXMapRunner s...

2017-02-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3418#discussion_r103206923
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
 ---
@@ -41,7 +43,13 @@ class MapRunner[IN, OUT](
 LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
 val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, 
code)
 LOG.debug("Instantiating MapFunction.")
-function = clazz.newInstance()
+if (null == parameterValues || null == parameterTypes) {
--- End diff --

Same as `FlatMapRunner`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3417: [FLINK-5907] [java api] Fix trailing empty fields ...

2017-02-27 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3417#discussion_r103226821
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 ---
@@ -358,24 +358,27 @@ protected boolean parseRecord(Object[] holders, 
byte[] bytes, int offset, int nu
for (int field = 0, output = 0; field < fieldIncluded.length; 
field++) {

// check valid start position
-   if (startPos >= limit) {
+   if (startPos > limit || (startPos == limit && field != 
fieldIncluded.length - 1)) {
if (lenient) {
return false;
} else {
throw new ParseException("Row too 
short: " + new String(bytes, offset, numBytes));
}
}
-   
+
if (fieldIncluded[field]) {
// parse field
@SuppressWarnings("unchecked")
FieldParser parser = 
(FieldParser) this.fieldParsers[output];
Object reuse = holders[output];
startPos = 
parser.resetErrorStateAndParse(bytes, startPos, limit, this.fieldDelim, reuse);
holders[output] = parser.getLastResult();
-   
+
// check parse result
-   if (startPos < 0) {
+   if (startPos < 0 ||
+   (startPos == limit
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5907) RowCsvInputFormat bug on parsing tsv

2017-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15885905#comment-15885905
 ] 

ASF GitHub Bot commented on FLINK-5907:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3417#discussion_r103225729
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---
@@ -197,6 +197,14 @@ protected boolean parseRecord(Object[] holders, byte[] 
bytes, int offset, int nu
if (startPos < 0) {
throw new 
ParseException(String.format("Unexpected parser position for column %1$s of row 
'%2$s'",
field, new String(bytes, offset, 
numBytes)));
+   } else if (startPos == limit
+   && field != fieldIncluded.length - 1
+   && 
!FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelimiter)) {
+   if (isLenient()) {
--- End diff --

added


> RowCsvInputFormat bug on parsing tsv
> 
>
> Key: FLINK-5907
> URL: https://issues.apache.org/jira/browse/FLINK-5907
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Affects Versions: 1.2.0
>Reporter: Flavio Pompermaier
>Assignee: Kurt Young
>  Labels: csv, parsing
> Attachments: test.tsv
>
>
> The following snippet reproduce the problem (using the attached file as 
> input):
> {code:language=java}
> char fieldDelim = '\t';
> TypeInformation[] fieldTypes = new TypeInformation[51];
> for (int i = 0; i < fieldTypes.length; i++) {
>   fieldTypes[i] = BasicTypeInfo.STRING_TYPE_INFO;
> }
> int[] fieldMask = new int[fieldTypes.length];
> for (int i = 0; i < fieldMask.length; i++) {
>   fieldMask[i] = i;
> }
> RowCsvInputFormat csvIF = new RowCsvInputFormat(new Path(testCsv), 
> fieldTypes, "\n", fieldDelim +"", 
>fieldMask, true);
> csvIF.setNestedFileEnumeration(true);
> DataSet csv = env.createInput(csvIF);
>csv.print()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5808) Missing verification for setParallelism and setMaxParallelism

2017-02-27 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5808:

Priority: Blocker  (was: Major)

> Missing verification for setParallelism and setMaxParallelism
> -
>
> Key: FLINK-5808
> URL: https://issues.apache.org/jira/browse/FLINK-5808
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.0, 1.2.1
>
>
> When {{setParallelism()}} is called we don't verify that it is <= than max 
> parallelism. Likewise, when {{setMaxParallelism()}} is called we don't check 
> that the new value doesn't clash with a previously set parallelism.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3417: [FLINK-5907] [java api] Fix trailing empty fields ...

2017-02-27 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3417#discussion_r103225729
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java ---
@@ -197,6 +197,14 @@ protected boolean parseRecord(Object[] holders, byte[] 
bytes, int offset, int nu
if (startPos < 0) {
throw new 
ParseException(String.format("Unexpected parser position for column %1$s of row 
'%2$s'",
field, new String(bytes, offset, 
numBytes)));
+   } else if (startPos == limit
+   && field != fieldIncluded.length - 1
+   && 
!FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelimiter)) {
+   if (isLenient()) {
--- End diff --

added


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >