[GitHub] flink pull request #5222: Fix typo in docs/dev/api_concepts.md

2017-12-31 Thread okumin
GitHub user okumin opened a pull request:

https://github.com/apache/flink/pull/5222

Fix typo in docs/dev/api_concepts.md

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

* Remove duplicated `program`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/okumin/flink fix-typo-api-concepts

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5222.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5222


commit e50f287609be3e19d904c17fa7330b2e72088420
Author: okumin 
Date:   2018-01-01T06:58:37Z

Fix typo in docs/dev/api_concepts.md

Remove duplicated `program`.




---


[GitHub] flink pull request #5221: [hotfix] [docs] Format Scala programs in docs

2017-12-31 Thread okumin
GitHub user okumin opened a pull request:

https://github.com/apache/flink/pull/5221

[hotfix] [docs] Format Scala programs in docs

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*



* remove unneeded semi-colons
* add `()` to `print` method
* typically, methods with some side-effects are invoked with `()`
* replace some Java codes in Scala examples into Scala codes

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/okumin/flink format-scala-code

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5221.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5221


commit 96521134a0b0ac2be0b8afc7b1c01a0a0c18fe09
Author: okumin 
Date:   2018-01-01T06:25:54Z

Format Scala programs in docs

* remove unneeded semi-colons
* add `()` to `print` method
* typically, methods with some side-effects are invoked with `()`
* replace some Java codes in Scala examples into Scala codes




---


[jira] [Closed] (FLINK-8227) Optimize the performance of SharedBufferSerializer

2017-12-31 Thread Dian Fu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-8227.
--

> Optimize the performance of SharedBufferSerializer
> --
>
> Key: FLINK-8227
> URL: https://issues.apache.org/jira/browse/FLINK-8227
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently {{SharedBufferSerializer.serialize()}} will create a HashMap and 
> put all the {{SharedBufferEntry}} into it. Usually this is not a problem. But 
> we obverse that in some cases the calculation of hashCode may become the 
> bottleneck. The performance will decrease as the number of 
> {{SharedBufferEdge}} increases. For looping pattern {{A*}}, if the number of 
> {{SharedBufferEntry}} is {{N}}, the number of {{SharedBufferEdge}} is about 
> {{N * N}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-8227) Optimize the performance of SharedBufferSerializer

2017-12-31 Thread Dian Fu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu resolved FLINK-8227.

Resolution: Fixed

> Optimize the performance of SharedBufferSerializer
> --
>
> Key: FLINK-8227
> URL: https://issues.apache.org/jira/browse/FLINK-8227
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently {{SharedBufferSerializer.serialize()}} will create a HashMap and 
> put all the {{SharedBufferEntry}} into it. Usually this is not a problem. But 
> we obverse that in some cases the calculation of hashCode may become the 
> bottleneck. The performance will decrease as the number of 
> {{SharedBufferEdge}} increases. For looping pattern {{A*}}, if the number of 
> {{SharedBufferEntry}} is {{N}}, the number of {{SharedBufferEdge}} is about 
> {{N * N}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8226) Dangling reference generated after NFA clean up timed out SharedBufferEntry

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307367#comment-16307367
 ] 

ASF GitHub Bot commented on FLINK-8226:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5141#discussion_r159149194
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
@@ -191,22 +191,28 @@ public boolean isEmpty() {
 */
public boolean prune(long pruningTimestamp) {
Iterator>> iter = 
pages.entrySet().iterator();
-   boolean pruned = false;
+   List> prunedEntries = new ArrayList<>();
 
while (iter.hasNext()) {
SharedBufferPage page = iter.next().getValue();
 
-   if (page.prune(pruningTimestamp)) {
-   pruned = true;
-   }
+   page.prune(pruningTimestamp, prunedEntries);
 
if (page.isEmpty()) {
// delete page if it is empty
iter.remove();
}
}
 
-   return pruned;
+   if (!prunedEntries.isEmpty()) {
+   for (Map.Entry> entry : 
pages.entrySet()) {
+   entry.getValue().removeEdges(prunedEntries);
+   }
+   prunedEntries.clear();
--- End diff --

Updated.


> Dangling reference generated after NFA clean up timed out SharedBufferEntry
> ---
>
> Key: FLINK-8226
> URL: https://issues.apache.org/jira/browse/FLINK-8226
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
> Fix For: 1.5.0, 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5141: [FLINK-8226] [cep] Dangling reference generated af...

2017-12-31 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5141#discussion_r159149194
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
@@ -191,22 +191,28 @@ public boolean isEmpty() {
 */
public boolean prune(long pruningTimestamp) {
Iterator>> iter = 
pages.entrySet().iterator();
-   boolean pruned = false;
+   List> prunedEntries = new ArrayList<>();
 
while (iter.hasNext()) {
SharedBufferPage page = iter.next().getValue();
 
-   if (page.prune(pruningTimestamp)) {
-   pruned = true;
-   }
+   page.prune(pruningTimestamp, prunedEntries);
 
if (page.isEmpty()) {
// delete page if it is empty
iter.remove();
}
}
 
-   return pruned;
+   if (!prunedEntries.isEmpty()) {
+   for (Map.Entry> entry : 
pages.entrySet()) {
+   entry.getValue().removeEdges(prunedEntries);
+   }
+   prunedEntries.clear();
--- End diff --

Updated.


---


[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148351
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -109,4 +110,75 @@ object ScalarFunctions {
   Math.log(x) / Math.log(base)
 }
   }
+
+  /**
+* Returns the Int number after the input number left shift n bits
+* @param input Int type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Int, n: Int): Int = {
+_shiftLeft(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number left shift n bits
+* @param input Long type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Long, n: Int): Long = {
+_shiftLeft(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the Int number after the input number right shift n bits
+* @param input Int type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Int, n: Int): Int = {
+_shiftRight(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number right shift n bits
+* @param input Long type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Long, n: Int): Long = {
+_shiftRight(input, n).asInstanceOf[Long]
--- End diff --

input >> n


---


[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307349#comment-16307349
 ] 

ASF GitHub Bot commented on FLINK-8302:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148340
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -109,4 +110,75 @@ object ScalarFunctions {
   Math.log(x) / Math.log(base)
 }
   }
+
+  /**
+* Returns the Int number after the input number left shift n bits
+* @param input Int type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Int, n: Int): Int = {
+_shiftLeft(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number left shift n bits
+* @param input Long type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Long, n: Int): Long = {
+_shiftLeft(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the Int number after the input number right shift n bits
+* @param input Int type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Int, n: Int): Int = {
+_shiftRight(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number right shift n bits
+* @param input Long type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Long, n: Int): Long = {
+_shiftRight(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the number after the input number left shift n bits
+* Input must be type 'Long' or type 'Int'
+*/
+  private def _shiftLeft(input: Any, n: Int): Any = {
+input match {
+  case l: jl.Long => l << n
+  case i: jl.Integer => i << n
+  case _ =>
+throw new IllegalArgumentException(
+  s"type of input in function 'shiftLeft(input, n)' must be Long 
or Integer"
+)
+}
+  }
+
+  /**
+* Returns the number after the input number right shift n bits
+* Input must be type 'Long' or type 'Int'
+*/
+  private def _shiftRight(input: Any, n: Int): Any = {
+input match {
+  case l: jl.Long => l >> n
+  case i: jl.Integer => i >> n
+  case _ =>
+throw new IllegalArgumentException(
--- End diff --

never run here?


> Support shift_left and shift_right in TableAPI
> --
>
> Key: FLINK-8302
> URL: https://issues.apache.org/jira/browse/FLINK-8302
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: DuBin
>  Labels: features
> Fix For: 1.5.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Add shift_left and shift_right support in TableAPI, shift_left(input, n) act 
> as input << n, shift_right(input, n) act as input >> n.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307346#comment-16307346
 ] 

ASF GitHub Bot commented on FLINK-8302:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148351
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -109,4 +110,75 @@ object ScalarFunctions {
   Math.log(x) / Math.log(base)
 }
   }
+
+  /**
+* Returns the Int number after the input number left shift n bits
+* @param input Int type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Int, n: Int): Int = {
+_shiftLeft(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number left shift n bits
+* @param input Long type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Long, n: Int): Long = {
+_shiftLeft(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the Int number after the input number right shift n bits
+* @param input Int type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Int, n: Int): Int = {
+_shiftRight(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number right shift n bits
+* @param input Long type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Long, n: Int): Long = {
+_shiftRight(input, n).asInstanceOf[Long]
--- End diff --

input >> n


> Support shift_left and shift_right in TableAPI
> --
>
> Key: FLINK-8302
> URL: https://issues.apache.org/jira/browse/FLINK-8302
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: DuBin
>  Labels: features
> Fix For: 1.5.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Add shift_left and shift_right support in TableAPI, shift_left(input, n) act 
> as input << n, shift_right(input, n) act as input >> n.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307348#comment-16307348
 ] 

ASF GitHub Bot commented on FLINK-8302:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148053
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -1216,6 +1216,69 @@ class ScalarFunctionsTest extends 
ScalarTypesTestBase {
 )
   }
 
+  @Test
+  def testShiftLeft(): Unit = {
+testSqlApi(
+  "SHIFT_LEFT(1,1)",
+  "2"
+)
+
+testSqlApi(
+  "SHIFT_LEFT(21,1)",
+  "42"
+)
+
+testSqlApi(
+  "SHIFT_LEFT(21,1)",
+  "42"
+)
+
--- End diff --

Remove duplicate test case.


> Support shift_left and shift_right in TableAPI
> --
>
> Key: FLINK-8302
> URL: https://issues.apache.org/jira/browse/FLINK-8302
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: DuBin
>  Labels: features
> Fix For: 1.5.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Add shift_left and shift_right support in TableAPI, shift_left(input, n) act 
> as input << n, shift_right(input, n) act as input >> n.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307351#comment-16307351
 ] 

ASF GitHub Bot commented on FLINK-8302:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148349
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -109,4 +110,75 @@ object ScalarFunctions {
   Math.log(x) / Math.log(base)
 }
   }
+
+  /**
+* Returns the Int number after the input number left shift n bits
+* @param input Int type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Int, n: Int): Int = {
+_shiftLeft(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number left shift n bits
+* @param input Long type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Long, n: Int): Long = {
+_shiftLeft(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the Int number after the input number right shift n bits
+* @param input Int type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Int, n: Int): Int = {
+_shiftRight(input, n).asInstanceOf[Int]
--- End diff --

input >> n


> Support shift_left and shift_right in TableAPI
> --
>
> Key: FLINK-8302
> URL: https://issues.apache.org/jira/browse/FLINK-8302
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: DuBin
>  Labels: features
> Fix For: 1.5.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Add shift_left and shift_right support in TableAPI, shift_left(input, n) act 
> as input << n, shift_right(input, n) act as input >> n.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148499
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
 ---
@@ -58,4 +58,22 @@ object ScalarSqlFunctions {
   OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)),
 SqlFunctionCategory.NUMERIC)
 
+  val SHIFT_LEFT = new SqlFunction(
+"SHIFT_LEFT",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.ARG0,
+null,
+OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC),
--- End diff --

Second operand should be  SqlTypeFamily.INTEGER.
 `OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC) -> 
OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.INTEGER) `


---


[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307354#comment-16307354
 ] 

ASF GitHub Bot commented on FLINK-8302:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148352
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -109,4 +110,75 @@ object ScalarFunctions {
   Math.log(x) / Math.log(base)
 }
   }
+
+  /**
+* Returns the Int number after the input number left shift n bits
+* @param input Int type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Int, n: Int): Int = {
+_shiftLeft(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number left shift n bits
+* @param input Long type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Long, n: Int): Long = {
+_shiftLeft(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the Int number after the input number right shift n bits
+* @param input Int type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Int, n: Int): Int = {
+_shiftRight(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number right shift n bits
+* @param input Long type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Long, n: Int): Long = {
+_shiftRight(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the number after the input number left shift n bits
+* Input must be type 'Long' or type 'Int'
+*/
+  private def _shiftLeft(input: Any, n: Int): Any = {
--- End diff --

remove this method? 


> Support shift_left and shift_right in TableAPI
> --
>
> Key: FLINK-8302
> URL: https://issues.apache.org/jira/browse/FLINK-8302
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: DuBin
>  Labels: features
> Fix For: 1.5.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Add shift_left and shift_right support in TableAPI, shift_left(input, n) act 
> as input << n, shift_right(input, n) act as input >> n.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148345
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -109,4 +110,75 @@ object ScalarFunctions {
   Math.log(x) / Math.log(base)
 }
   }
+
+  /**
+* Returns the Int number after the input number left shift n bits
+* @param input Int type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Int, n: Int): Int = {
+_shiftLeft(input, n).asInstanceOf[Int]
--- End diff --

input << n ?


---


[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307347#comment-16307347
 ] 

ASF GitHub Bot commented on FLINK-8302:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148345
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -109,4 +110,75 @@ object ScalarFunctions {
   Math.log(x) / Math.log(base)
 }
   }
+
+  /**
+* Returns the Int number after the input number left shift n bits
+* @param input Int type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Int, n: Int): Int = {
+_shiftLeft(input, n).asInstanceOf[Int]
--- End diff --

input << n ?


> Support shift_left and shift_right in TableAPI
> --
>
> Key: FLINK-8302
> URL: https://issues.apache.org/jira/browse/FLINK-8302
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: DuBin
>  Labels: features
> Fix For: 1.5.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Add shift_left and shift_right support in TableAPI, shift_left(input, n) act 
> as input << n, shift_right(input, n) act as input >> n.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307353#comment-16307353
 ] 

ASF GitHub Bot commented on FLINK-8302:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148353
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -109,4 +110,75 @@ object ScalarFunctions {
   Math.log(x) / Math.log(base)
 }
   }
+
+  /**
+* Returns the Int number after the input number left shift n bits
+* @param input Int type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Int, n: Int): Int = {
+_shiftLeft(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number left shift n bits
+* @param input Long type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Long, n: Int): Long = {
+_shiftLeft(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the Int number after the input number right shift n bits
+* @param input Int type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Int, n: Int): Int = {
+_shiftRight(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number right shift n bits
+* @param input Long type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Long, n: Int): Long = {
+_shiftRight(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the number after the input number left shift n bits
+* Input must be type 'Long' or type 'Int'
+*/
+  private def _shiftLeft(input: Any, n: Int): Any = {
+input match {
+  case l: jl.Long => l << n
+  case i: jl.Integer => i << n
+  case _ =>
+throw new IllegalArgumentException(
+  s"type of input in function 'shiftLeft(input, n)' must be Long 
or Integer"
+)
+}
+  }
+
+  /**
+* Returns the number after the input number right shift n bits
+* Input must be type 'Long' or type 'Int'
+*/
+  private def _shiftRight(input: Any, n: Int): Any = {
--- End diff --

remove this method? 


> Support shift_left and shift_right in TableAPI
> --
>
> Key: FLINK-8302
> URL: https://issues.apache.org/jira/browse/FLINK-8302
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: DuBin
>  Labels: features
> Fix For: 1.5.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Add shift_left and shift_right support in TableAPI, shift_left(input, n) act 
> as input << n, shift_right(input, n) act as input >> n.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307350#comment-16307350
 ] 

ASF GitHub Bot commented on FLINK-8302:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148346
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -109,4 +110,75 @@ object ScalarFunctions {
   Math.log(x) / Math.log(base)
 }
   }
+
+  /**
+* Returns the Int number after the input number left shift n bits
+* @param input Int type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Int, n: Int): Int = {
+_shiftLeft(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number left shift n bits
+* @param input Long type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Long, n: Int): Long = {
+_shiftLeft(input, n).asInstanceOf[Long]
--- End diff --

input << n


> Support shift_left and shift_right in TableAPI
> --
>
> Key: FLINK-8302
> URL: https://issues.apache.org/jira/browse/FLINK-8302
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: DuBin
>  Labels: features
> Fix For: 1.5.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Add shift_left and shift_right support in TableAPI, shift_left(input, n) act 
> as input << n, shift_right(input, n) act as input >> n.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307355#comment-16307355
 ] 

ASF GitHub Bot commented on FLINK-8302:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148499
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
 ---
@@ -58,4 +58,22 @@ object ScalarSqlFunctions {
   OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)),
 SqlFunctionCategory.NUMERIC)
 
+  val SHIFT_LEFT = new SqlFunction(
+"SHIFT_LEFT",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.ARG0,
+null,
+OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC),
--- End diff --

Second operand should be  SqlTypeFamily.INTEGER.
 `OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC) -> 
OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.INTEGER) `


> Support shift_left and shift_right in TableAPI
> --
>
> Key: FLINK-8302
> URL: https://issues.apache.org/jira/browse/FLINK-8302
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: DuBin
>  Labels: features
> Fix For: 1.5.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Add shift_left and shift_right support in TableAPI, shift_left(input, n) act 
> as input << n, shift_right(input, n) act as input >> n.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307352#comment-16307352
 ] 

ASF GitHub Bot commented on FLINK-8302:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148500
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
 ---
@@ -58,4 +58,22 @@ object ScalarSqlFunctions {
   OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)),
 SqlFunctionCategory.NUMERIC)
 
+  val SHIFT_LEFT = new SqlFunction(
+"SHIFT_LEFT",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.ARG0,
+null,
+OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC),
+SqlFunctionCategory.NUMERIC
+  )
+
+  val SHIFT_RIGHT = new SqlFunction(
+"SHIFT_RIGHT",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.ARG0,
+null,
+OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC),
+SqlFunctionCategory.NUMERIC
--- End diff --

Second operand should be  SqlTypeFamily.INTEGER.
 `OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC) -> 
OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.INTEGER) `


> Support shift_left and shift_right in TableAPI
> --
>
> Key: FLINK-8302
> URL: https://issues.apache.org/jira/browse/FLINK-8302
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: DuBin
>  Labels: features
> Fix For: 1.5.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Add shift_left and shift_right support in TableAPI, shift_left(input, n) act 
> as input << n, shift_right(input, n) act as input >> n.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148353
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -109,4 +110,75 @@ object ScalarFunctions {
   Math.log(x) / Math.log(base)
 }
   }
+
+  /**
+* Returns the Int number after the input number left shift n bits
+* @param input Int type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Int, n: Int): Int = {
+_shiftLeft(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number left shift n bits
+* @param input Long type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Long, n: Int): Long = {
+_shiftLeft(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the Int number after the input number right shift n bits
+* @param input Int type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Int, n: Int): Int = {
+_shiftRight(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number right shift n bits
+* @param input Long type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Long, n: Int): Long = {
+_shiftRight(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the number after the input number left shift n bits
+* Input must be type 'Long' or type 'Int'
+*/
+  private def _shiftLeft(input: Any, n: Int): Any = {
+input match {
+  case l: jl.Long => l << n
+  case i: jl.Integer => i << n
+  case _ =>
+throw new IllegalArgumentException(
+  s"type of input in function 'shiftLeft(input, n)' must be Long 
or Integer"
+)
+}
+  }
+
+  /**
+* Returns the number after the input number right shift n bits
+* Input must be type 'Long' or type 'Int'
+*/
+  private def _shiftRight(input: Any, n: Int): Any = {
--- End diff --

remove this method? 


---


[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148352
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -109,4 +110,75 @@ object ScalarFunctions {
   Math.log(x) / Math.log(base)
 }
   }
+
+  /**
+* Returns the Int number after the input number left shift n bits
+* @param input Int type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Int, n: Int): Int = {
+_shiftLeft(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number left shift n bits
+* @param input Long type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Long, n: Int): Long = {
+_shiftLeft(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the Int number after the input number right shift n bits
+* @param input Int type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Int, n: Int): Int = {
+_shiftRight(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number right shift n bits
+* @param input Long type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Long, n: Int): Long = {
+_shiftRight(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the number after the input number left shift n bits
+* Input must be type 'Long' or type 'Int'
+*/
+  private def _shiftLeft(input: Any, n: Int): Any = {
--- End diff --

remove this method? 


---


[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148346
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -109,4 +110,75 @@ object ScalarFunctions {
   Math.log(x) / Math.log(base)
 }
   }
+
+  /**
+* Returns the Int number after the input number left shift n bits
+* @param input Int type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Int, n: Int): Int = {
+_shiftLeft(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number left shift n bits
+* @param input Long type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Long, n: Int): Long = {
+_shiftLeft(input, n).asInstanceOf[Long]
--- End diff --

input << n


---


[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148340
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -109,4 +110,75 @@ object ScalarFunctions {
   Math.log(x) / Math.log(base)
 }
   }
+
+  /**
+* Returns the Int number after the input number left shift n bits
+* @param input Int type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Int, n: Int): Int = {
+_shiftLeft(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number left shift n bits
+* @param input Long type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Long, n: Int): Long = {
+_shiftLeft(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the Int number after the input number right shift n bits
+* @param input Int type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Int, n: Int): Int = {
+_shiftRight(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number right shift n bits
+* @param input Long type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Long, n: Int): Long = {
+_shiftRight(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the number after the input number left shift n bits
+* Input must be type 'Long' or type 'Int'
+*/
+  private def _shiftLeft(input: Any, n: Int): Any = {
+input match {
+  case l: jl.Long => l << n
+  case i: jl.Integer => i << n
+  case _ =>
+throw new IllegalArgumentException(
+  s"type of input in function 'shiftLeft(input, n)' must be Long 
or Integer"
+)
+}
+  }
+
+  /**
+* Returns the number after the input number right shift n bits
+* Input must be type 'Long' or type 'Int'
+*/
+  private def _shiftRight(input: Any, n: Int): Any = {
+input match {
+  case l: jl.Long => l >> n
+  case i: jl.Integer => i >> n
+  case _ =>
+throw new IllegalArgumentException(
--- End diff --

never run here?


---


[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148053
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -1216,6 +1216,69 @@ class ScalarFunctionsTest extends 
ScalarTypesTestBase {
 )
   }
 
+  @Test
+  def testShiftLeft(): Unit = {
+testSqlApi(
+  "SHIFT_LEFT(1,1)",
+  "2"
+)
+
+testSqlApi(
+  "SHIFT_LEFT(21,1)",
+  "42"
+)
+
+testSqlApi(
+  "SHIFT_LEFT(21,1)",
+  "42"
+)
+
--- End diff --

Remove duplicate test case.


---


[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148349
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ---
@@ -109,4 +110,75 @@ object ScalarFunctions {
   Math.log(x) / Math.log(base)
 }
   }
+
+  /**
+* Returns the Int number after the input number left shift n bits
+* @param input Int type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Int, n: Int): Int = {
+_shiftLeft(input, n).asInstanceOf[Int]
+  }
+
+  /**
+* Returns the Long number after the input number left shift n bits
+* @param input Long type
+* @param n
+* @return input << n
+*/
+  def shiftLeft(input: Long, n: Int): Long = {
+_shiftLeft(input, n).asInstanceOf[Long]
+  }
+
+  /**
+* Returns the Int number after the input number right shift n bits
+* @param input Int type
+* @param n
+* @return input >> n
+*/
+  def shiftRight(input: Int, n: Int): Int = {
+_shiftRight(input, n).asInstanceOf[Int]
--- End diff --

input >> n


---


[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5202#discussion_r159148500
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
 ---
@@ -58,4 +58,22 @@ object ScalarSqlFunctions {
   OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)),
 SqlFunctionCategory.NUMERIC)
 
+  val SHIFT_LEFT = new SqlFunction(
+"SHIFT_LEFT",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.ARG0,
+null,
+OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC),
+SqlFunctionCategory.NUMERIC
+  )
+
+  val SHIFT_RIGHT = new SqlFunction(
+"SHIFT_RIGHT",
+SqlKind.OTHER_FUNCTION,
+ReturnTypes.ARG0,
+null,
+OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC),
+SqlFunctionCategory.NUMERIC
--- End diff --

Second operand should be  SqlTypeFamily.INTEGER.
 `OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC) -> 
OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.INTEGER) `


---


[jira] [Commented] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase

2017-12-31 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307281#comment-16307281
 ] 

Ted Yu commented on FLINK-6105:
---

In 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
 :
{code}
  try {
Thread.sleep(500);
  } catch (InterruptedException e1) {
// ignore it
  }
{code}

> Properly handle InterruptedException in HadoopInputFormatBase
> -
>
> Key: FLINK-6105
> URL: https://issues.apache.org/jira/browse/FLINK-6105
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Assignee: mingleizhang
>
> When catching InterruptedException, we should throw InterruptedIOException 
> instead of IOException.
> The following example is from HadoopInputFormatBase :
> {code}
> try {
>   splits = this.mapreduceInputFormat.getSplits(jobContext);
> } catch (InterruptedException e) {
>   throw new IOException("Could not get Splits.", e);
> }
> {code}
> There may be other places where IOE is thrown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8333) Split command options from deployment options in CliFrontend

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307274#comment-16307274
 ] 

ASF GitHub Bot commented on FLINK-8333:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5220

[FLINK-8333] [flip6] Separate deployment options from command options

## What is the purpose of the change

This commit separates the parsing of command options and deployment options 
into two
steps. This makes it easier to make the CustomCommandLines non-static.

Moreover, this commit moves the CliFrontend into the cli sub package.

This PR is based on #5219. 

## Brief change log

- Move `CliFrontend` to `org.apache.flink.client.cli`
- Decouple creation of `RunOptions`, `ListOptions`, etc. from 
`CliFrontendParser`

## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink splitCliOptions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5220.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5220


commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b
Author: Till Rohrmann 
Date:   2017-12-07T12:57:24Z

[FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of 
YarnClusterClient

Introduce YarnApplicationStatusMonitor which does the Yarn 
ApplicationStatus polling in
the FlinkYarnSessionCli. This decouples the YarnClusterClient from the 
actual communication
with Yarn and, thus, gives a better separation of concerns.

commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44
Author: Till Rohrmann 
Date:   2017-12-20T15:43:21Z

[FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor

Moves the YarnClient from the YarnClusterClient to the 
AbstractYarnClusterDescriptor.
This makes the latter responsible for the lifecycle management of the 
client and gives
a better separation of concerns.

commit 0cd22bf559eb820f3e2d381686752f583f4f16ff
Author: Till Rohrmann 
Date:   2017-12-18T17:59:30Z

[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

Move the savepoint disposal logic from the CliFrontend into the 
ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with 
different
ClusterClient implementations.

commit 2b4c1efb79ffdfd1423240047dce8e2f038958c7
Author: Till Rohrmann 
Date:   2017-12-20T16:19:59Z

[FLINK-8333] [flip6] Separate deployment options from command options

This commit separates the parsing of command options and deployment options 
into two
steps. This makes it easier to make the CustomCommandLines non-static.

Moreover, this commit moves the CliFrontend into the cli sub package.




> Split command options from deployment options in CliFrontend
> 
>
> Key: FLINK-8333
> URL: https://issues.apache.org/jira/browse/FLINK-8333
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to better support different {{CustomCommandLines}} we should split 
> the command and deployment option parsing in the {{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...

2017-12-31 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5220

[FLINK-8333] [flip6] Separate deployment options from command options

## What is the purpose of the change

This commit separates the parsing of command options and deployment options 
into two
steps. This makes it easier to make the CustomCommandLines non-static.

Moreover, this commit moves the CliFrontend into the cli sub package.

This PR is based on #5219. 

## Brief change log

- Move `CliFrontend` to `org.apache.flink.client.cli`
- Decouple creation of `RunOptions`, `ListOptions`, etc. from 
`CliFrontendParser`

## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink splitCliOptions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5220.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5220


commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b
Author: Till Rohrmann 
Date:   2017-12-07T12:57:24Z

[FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of 
YarnClusterClient

Introduce YarnApplicationStatusMonitor which does the Yarn 
ApplicationStatus polling in
the FlinkYarnSessionCli. This decouples the YarnClusterClient from the 
actual communication
with Yarn and, thus, gives a better separation of concerns.

commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44
Author: Till Rohrmann 
Date:   2017-12-20T15:43:21Z

[FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor

Moves the YarnClient from the YarnClusterClient to the 
AbstractYarnClusterDescriptor.
This makes the latter responsible for the lifecycle management of the 
client and gives
a better separation of concerns.

commit 0cd22bf559eb820f3e2d381686752f583f4f16ff
Author: Till Rohrmann 
Date:   2017-12-18T17:59:30Z

[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

Move the savepoint disposal logic from the CliFrontend into the 
ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with 
different
ClusterClient implementations.

commit 2b4c1efb79ffdfd1423240047dce8e2f038958c7
Author: Till Rohrmann 
Date:   2017-12-20T16:19:59Z

[FLINK-8333] [flip6] Separate deployment options from command options

This commit separates the parsing of command options and deployment options 
into two
steps. This makes it easier to make the CustomCommandLines non-static.

Moreover, this commit moves the CliFrontend into the cli sub package.




---


[jira] [Created] (FLINK-8333) Split command options from deployment options in CliFrontend

2017-12-31 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8333:


 Summary: Split command options from deployment options in 
CliFrontend
 Key: FLINK-8333
 URL: https://issues.apache.org/jira/browse/FLINK-8333
 Project: Flink
  Issue Type: Improvement
  Components: Client
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


In order to better support different {{CustomCommandLines}} we should split the 
command and deployment option parsing in the {{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8332) Move dispose savepoint into ClusterClient

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307269#comment-16307269
 ] 

ASF GitHub Bot commented on FLINK-8332:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5219

[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

## What is the purpose of the change

Move the savepoint disposal logic from the CliFrontend into the 
ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with 
different
ClusterClient implementations.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink refactorSavepointCommand

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5219.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5219


commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b
Author: Till Rohrmann 
Date:   2017-12-07T12:57:24Z

[FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of 
YarnClusterClient

Introduce YarnApplicationStatusMonitor which does the Yarn 
ApplicationStatus polling in
the FlinkYarnSessionCli. This decouples the YarnClusterClient from the 
actual communication
with Yarn and, thus, gives a better separation of concerns.

commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44
Author: Till Rohrmann 
Date:   2017-12-20T15:43:21Z

[FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor

Moves the YarnClient from the YarnClusterClient to the 
AbstractYarnClusterDescriptor.
This makes the latter responsible for the lifecycle management of the 
client and gives
a better separation of concerns.

commit 0cd22bf559eb820f3e2d381686752f583f4f16ff
Author: Till Rohrmann 
Date:   2017-12-18T17:59:30Z

[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

Move the savepoint disposal logic from the CliFrontend into the 
ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with 
different
ClusterClient implementations.




> Move dispose savepoint into ClusterClient
> -
>
> Key: FLINK-8332
> URL: https://issues.apache.org/jira/browse/FLINK-8332
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{CliFrontend}} sends the command for disposing a savepoint. 
> In order to better abstract this functionality we should move it to the 
> {{ClusterClient}}. That way we can have different implementations of the 
> {{ClusterClient}} (Flip-6 and old code) which are used by the same 
> {{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...

2017-12-31 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5219

[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

## What is the purpose of the change

Move the savepoint disposal logic from the CliFrontend into the 
ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with 
different
ClusterClient implementations.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink refactorSavepointCommand

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5219.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5219


commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b
Author: Till Rohrmann 
Date:   2017-12-07T12:57:24Z

[FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of 
YarnClusterClient

Introduce YarnApplicationStatusMonitor which does the Yarn 
ApplicationStatus polling in
the FlinkYarnSessionCli. This decouples the YarnClusterClient from the 
actual communication
with Yarn and, thus, gives a better separation of concerns.

commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44
Author: Till Rohrmann 
Date:   2017-12-20T15:43:21Z

[FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor

Moves the YarnClient from the YarnClusterClient to the 
AbstractYarnClusterDescriptor.
This makes the latter responsible for the lifecycle management of the 
client and gives
a better separation of concerns.

commit 0cd22bf559eb820f3e2d381686752f583f4f16ff
Author: Till Rohrmann 
Date:   2017-12-18T17:59:30Z

[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

Move the savepoint disposal logic from the CliFrontend into the 
ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with 
different
ClusterClient implementations.




---


[jira] [Created] (FLINK-8332) Move dispose savepoint into ClusterClient

2017-12-31 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8332:


 Summary: Move dispose savepoint into ClusterClient
 Key: FLINK-8332
 URL: https://issues.apache.org/jira/browse/FLINK-8332
 Project: Flink
  Issue Type: Improvement
  Components: Client
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


Currently, the {{CliFrontend}} sends the command for disposing a savepoint. In 
order to better abstract this functionality we should move it to the 
{{ClusterClient}}. That way we can have different implementations of the 
{{ClusterClient}} (Flip-6 and old code) which are used by the same 
{{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8329) Move YarnClient out of YarnClusterClient

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307268#comment-16307268
 ] 

ASF GitHub Bot commented on FLINK-8329:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5216
  
Corrected the checkstyle violations.


> Move YarnClient out of YarnClusterClient
> 
>
> Key: FLINK-8329
> URL: https://issues.apache.org/jira/browse/FLINK-8329
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Move the {{YarnClient}} from the {{YarnClusterClient}} to the 
> {{AbstractYarnClusterDescriptor}} which will be responsible for the lifecycle 
> management of the {{YarnClient}}. This change is a clean up task which will 
> better structure the client code.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5216: [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClust...

2017-12-31 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5216
  
Corrected the checkstyle violations.


---


[jira] [Closed] (FLINK-8330) Remove FlinkYarnCLI

2017-12-31 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8330.

Resolution: Fixed

Removed via ce62945ae6c1b1878bebc9a528e63ef7a54cb897

> Remove FlinkYarnCLI
> ---
>
> Key: FLINK-8330
> URL: https://issues.apache.org/jira/browse/FLINK-8330
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Remove the {{FlinkYarnCLI}} because it is no longer needed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8330) Remove FlinkYarnCLI

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307246#comment-16307246
 ] 

ASF GitHub Bot commented on FLINK-8330:
---

Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/5217


> Remove FlinkYarnCLI
> ---
>
> Key: FLINK-8330
> URL: https://issues.apache.org/jira/browse/FLINK-8330
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Remove the {{FlinkYarnCLI}} because it is no longer needed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5217: [FLINK-8330] [flip6] Remove FlinkYarnCLI

2017-12-31 Thread tillrohrmann
Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/5217


---


[jira] [Commented] (FLINK-8330) Remove FlinkYarnCLI

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307243#comment-16307243
 ] 

ASF GitHub Bot commented on FLINK-8330:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5217
  
Thanks for the review @GJL. Merging this PR.


> Remove FlinkYarnCLI
> ---
>
> Key: FLINK-8330
> URL: https://issues.apache.org/jira/browse/FLINK-8330
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Remove the {{FlinkYarnCLI}} because it is no longer needed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5217: [FLINK-8330] [flip6] Remove FlinkYarnCLI

2017-12-31 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5217
  
Thanks for the review @GJL. Merging this PR.


---


[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'

2017-12-31 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307241#comment-16307241
 ] 

Till Rohrmann commented on FLINK-8268:
--

Just for the record: Another test failure on Travis: 
https://travis-ci.org/tillrohrmann/flink/jobs/322955365

> Test instability for 'TwoPhaseCommitSinkFunctionTest'
> -
>
> Key: FLINK-8268
> URL: https://issues.apache.org/jira/browse/FLINK-8268
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: test-stability
>
> The following exception / failure message occurs.
> {code}
> Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< 
> FAILURE! - in 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest
> testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest)
>   Time elapsed: 0.068 sec  <<< ERROR!
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>   at java.io.FileOutputStream.writeBytes(Native Method)
>   at java.io.FileOutputStream.write(FileOutputStream.java:326)
>   at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>   at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
>   at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
>   at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
>   at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
>   at java.io.BufferedWriter.flush(BufferedWriter.java:254)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8171) Remove work arounds in Flip6LocalStreamEnvironment

2017-12-31 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8171.

Resolution: Fixed

Fixed via 8e7a71c053135fdce4073eaf8022d5727d87b5fa

> Remove work arounds in Flip6LocalStreamEnvironment
> --
>
> Key: FLINK-8171
> URL: https://issues.apache.org/jira/browse/FLINK-8171
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> After adding FLINK-7956, it is no longer necessary that the 
> {{Flip6LocalStreamEnvironment}} waits for the registration of TaskManagers 
> before submitting a job. Moreover, it is also possible to use slot sharing 
> when submitting jobs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8171) Remove work arounds in Flip6LocalStreamEnvironment

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307238#comment-16307238
 ] 

ASF GitHub Bot commented on FLINK-8171:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5101


> Remove work arounds in Flip6LocalStreamEnvironment
> --
>
> Key: FLINK-8171
> URL: https://issues.apache.org/jira/browse/FLINK-8171
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> After adding FLINK-7956, it is no longer necessary that the 
> {{Flip6LocalStreamEnvironment}} waits for the registration of TaskManagers 
> before submitting a job. Moreover, it is also possible to use slot sharing 
> when submitting jobs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5101: [FLINK-8171] [flip6] Remove work arounds from Flip...

2017-12-31 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5101


---


[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307217#comment-16307217
 ] 

ASF GitHub Bot commented on FLINK-8331:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/5218
  
@fhueske thanks for your explanation, make sense to me. I updated the PR. 
The changes as follows:

- Revert the change of `RowCsvInputFormatTest.testTailingEmptyFields`.
- Unity parseField behavior when startPos = limit, except StringParser,  
that is `StringParser` set error state `EMPTY_COLUMN` and set empty string to 
`result`,return `limit` value as index, and other set error state 
`EMPTY_COLUMN`, return `-1` as index;

Best, Jincheng  


> FieldParsers do not correctly set EMPT_COLUMN error state
> -
>
> Key: FLINK-8331
> URL: https://issues.apache.org/jira/browse/FLINK-8331
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0, 1.4.1
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a 
> field is empty.
> Instead, they try to parse the field value from an empty String which fails, 
> e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}.
> The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} 
> values. The implementation requires that all {{FieldParser}} correctly return 
> the EMPTY_COLUMN error state in case of an empty field.
> Affected {{FieldParser}}:
> - BigDecParser
> - BigIntParser
> - DoubleParser
> - FloatParser
> - SqlDateParser
> - SqlTimeParser
> - SqlTimestampParser



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5218: [FLINK-8331][core] FieldParser do not correctly set EMPT_...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/5218
  
@fhueske thanks for your explanation, make sense to me. I updated the PR. 
The changes as follows:

- Revert the change of `RowCsvInputFormatTest.testTailingEmptyFields`.
- Unity parseField behavior when startPos = limit, except StringParser,  
that is `StringParser` set error state `EMPTY_COLUMN` and set empty string to 
`result`,return `limit` value as index, and other set error state 
`EMPTY_COLUMN`, return `-1` as index;

Best, Jincheng  


---


[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307201#comment-16307201
 ] 

ASF GitHub Bot commented on FLINK-8331:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159139198
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
--- End diff --

Yes, we can keep `testEmptyFieldInIsolation`.


> FieldParsers do not correctly set EMPT_COLUMN error state
> -
>
> Key: FLINK-8331
> URL: https://issues.apache.org/jira/browse/FLINK-8331
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0, 1.4.1
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a 
> field is empty.
> Instead, they try to parse the field value from an empty String which fails, 
> e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}.
> The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} 
> values. The implementation requires that all {{FieldParser}} correctly return 
> the EMPTY_COLUMN error state in case of an empty field.
> Affected {{FieldParser}}:
> - BigDecParser
> - BigIntParser
> - DoubleParser
> - FloatParser
> - SqlDateParser
> - SqlTimeParser
> - SqlTimestampParser



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159139198
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
--- End diff --

Yes, we can keep `testEmptyFieldInIsolation`.


---


[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307198#comment-16307198
 ] 

ASF GitHub Bot commented on FLINK-8331:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159139167
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -362,61 +365,95 @@ public void readStringFieldsWithTrailingDelimiters() 
throws Exception {
 
@Test
public void testTailingEmptyFields() throws Exception {
-   String fileContent = "abc|-def|-ghijk\n" +
-   "abc|-def|-\n" +
-   "abc|-|-\n" +
-   "|-|-|-\n" +
-   "|-|-\n" +
-   "abc|-def\n";
 
-   FileInputSplit split = createTempFile(fileContent);
-
-   TypeInformation[] fieldTypes = new TypeInformation[]{
-   BasicTypeInfo.STRING_TYPE_INFO,
-   BasicTypeInfo.STRING_TYPE_INFO,
-   BasicTypeInfo.STRING_TYPE_INFO};
-
-   RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
fieldTypes, "\n", "|");
-   format.setFieldDelimiter("|-");
-   format.configure(new Configuration());
-   format.open(split);
-
-   Row result = new Row(3);
-
-   result = format.nextRecord(result);
-   assertNotNull(result);
-   assertEquals("abc", result.getField(0));
-   assertEquals("def", result.getField(1));
-   assertEquals("ghijk", result.getField(2));
+   List> dataList 
= new java.util.ArrayList<>();
+
+   // test String
+   dataList.add(new Tuple4<>(BasicTypeInfo.STRING_TYPE_INFO, 
"bdc", "bdc", ""));
+   // test BigInt
+   dataList.add(new Tuple4<>(BasicTypeInfo.BIG_INT_TYPE_INFO,
--- End diff --

ooh,you are right. make sense to me. :)


> FieldParsers do not correctly set EMPT_COLUMN error state
> -
>
> Key: FLINK-8331
> URL: https://issues.apache.org/jira/browse/FLINK-8331
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0, 1.4.1
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a 
> field is empty.
> Instead, they try to parse the field value from an empty String which fails, 
> e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}.
> The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} 
> values. The implementation requires that all {{FieldParser}} correctly return 
> the EMPTY_COLUMN error state in case of an empty field.
> Affected {{FieldParser}}:
> - BigDecParser
> - BigIntParser
> - DoubleParser
> - FloatParser
> - SqlDateParser
> - SqlTimeParser
> - SqlTimestampParser



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159139167
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -362,61 +365,95 @@ public void readStringFieldsWithTrailingDelimiters() 
throws Exception {
 
@Test
public void testTailingEmptyFields() throws Exception {
-   String fileContent = "abc|-def|-ghijk\n" +
-   "abc|-def|-\n" +
-   "abc|-|-\n" +
-   "|-|-|-\n" +
-   "|-|-\n" +
-   "abc|-def\n";
 
-   FileInputSplit split = createTempFile(fileContent);
-
-   TypeInformation[] fieldTypes = new TypeInformation[]{
-   BasicTypeInfo.STRING_TYPE_INFO,
-   BasicTypeInfo.STRING_TYPE_INFO,
-   BasicTypeInfo.STRING_TYPE_INFO};
-
-   RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
fieldTypes, "\n", "|");
-   format.setFieldDelimiter("|-");
-   format.configure(new Configuration());
-   format.open(split);
-
-   Row result = new Row(3);
-
-   result = format.nextRecord(result);
-   assertNotNull(result);
-   assertEquals("abc", result.getField(0));
-   assertEquals("def", result.getField(1));
-   assertEquals("ghijk", result.getField(2));
+   List> dataList 
= new java.util.ArrayList<>();
+
+   // test String
+   dataList.add(new Tuple4<>(BasicTypeInfo.STRING_TYPE_INFO, 
"bdc", "bdc", ""));
+   // test BigInt
+   dataList.add(new Tuple4<>(BasicTypeInfo.BIG_INT_TYPE_INFO,
--- End diff --

ooh,you are right. make sense to me. :)


---


[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307168#comment-16307168
 ] 

ASF GitHub Bot commented on FLINK-8331:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159137345
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 0, bytes.length, 
new byte[]{'|'}, parser.createValue());
+
+   assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+   if (this.allowsEmptyField()) {
+   assertTrue("Parser declared the empty string as 
invalid.", numRead != -1);
+   assertEquals("Invalid number of bytes read 
returned.", bytes.length, numRead);
+   } else {
+   assertTrue("Parser accepted the empty string.", 
numRead == -1);
+   }
+   } catch (Exception e) {
+   System.err.println(e.getMessage());
+   e.printStackTrace();
+   fail("Test erroneous: " + e.getMessage());
+   }
+   }
+
+   @Test
+   public void testTailingEmptyField() {
+   try {
+   String[] tailingEmptyFieldStrings = new String[]{"a|", 
"a||"};
 
FieldParser parser = getParser();
 
-   for (String emptyString : emptyStrings) {
-   byte[] bytes = 
emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
-   int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[]{'|'}, parser.createValue());
+   for (String tailingEmptyFieldString : 
tailingEmptyFieldStrings) {
+   byte[] bytes = 
tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 1, 
bytes.length, new byte[]{'|'}, parser.createValue());
 

assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
 
-   if(this.allowsEmptyField()) {
+   if (this.allowsEmptyField()) {
--- End diff --

Yes, you are right. I misinterpreted the `allowsEmptyField()` method. We 
need to keep it.


> FieldParsers do not correctly set EMPT_COLUMN error state
> -
>
> Key: FLINK-8331
> URL: https://issues.apache.org/jira/browse/FLINK-8331
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0, 1.4.1
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a 
> field is empty.
> Instead, they try to parse the field value from an empty String which fails, 
> e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}.
> The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} 
> values. The implementation requires that all {{FieldParser}} correctly return 
> the EMPTY_COLUMN error state in case of an empty field.
> Affected {{FieldParser}}:
> - BigDecParser
> - BigIntParser
> - DoubleParser
> - FloatParser
> - SqlDateParser
> - SqlTimeParser
> - SqlTimestampParser



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-31 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159137345
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 0, bytes.length, 
new byte[]{'|'}, parser.createValue());
+
+   assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+   if (this.allowsEmptyField()) {
+   assertTrue("Parser declared the empty string as 
invalid.", numRead != -1);
+   assertEquals("Invalid number of bytes read 
returned.", bytes.length, numRead);
+   } else {
+   assertTrue("Parser accepted the empty string.", 
numRead == -1);
+   }
+   } catch (Exception e) {
+   System.err.println(e.getMessage());
+   e.printStackTrace();
+   fail("Test erroneous: " + e.getMessage());
+   }
+   }
+
+   @Test
+   public void testTailingEmptyField() {
+   try {
+   String[] tailingEmptyFieldStrings = new String[]{"a|", 
"a||"};
 
FieldParser parser = getParser();
 
-   for (String emptyString : emptyStrings) {
-   byte[] bytes = 
emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
-   int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[]{'|'}, parser.createValue());
+   for (String tailingEmptyFieldString : 
tailingEmptyFieldStrings) {
+   byte[] bytes = 
tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 1, 
bytes.length, new byte[]{'|'}, parser.createValue());
 

assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
 
-   if(this.allowsEmptyField()) {
+   if (this.allowsEmptyField()) {
--- End diff --

Yes, you are right. I misinterpreted the `allowsEmptyField()` method. We 
need to keep it.


---


[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307162#comment-16307162
 ] 

ASF GitHub Bot commented on FLINK-8331:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159136839
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 0, bytes.length, 
new byte[]{'|'}, parser.createValue());
+
+   assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+   if (this.allowsEmptyField()) {
+   assertTrue("Parser declared the empty string as 
invalid.", numRead != -1);
+   assertEquals("Invalid number of bytes read 
returned.", bytes.length, numRead);
+   } else {
+   assertTrue("Parser accepted the empty string.", 
numRead == -1);
+   }
+   } catch (Exception e) {
+   System.err.println(e.getMessage());
+   e.printStackTrace();
+   fail("Test erroneous: " + e.getMessage());
+   }
+   }
+
+   @Test
+   public void testTailingEmptyField() {
+   try {
+   String[] tailingEmptyFieldStrings = new String[]{"a|", 
"a||"};
 
FieldParser parser = getParser();
 
-   for (String emptyString : emptyStrings) {
-   byte[] bytes = 
emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
-   int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[]{'|'}, parser.createValue());
+   for (String tailingEmptyFieldString : 
tailingEmptyFieldStrings) {
+   byte[] bytes = 
tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 1, 
bytes.length, new byte[]{'|'}, parser.createValue());
 

assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
 
-   if(this.allowsEmptyField()) {
+   if (this.allowsEmptyField()) {
--- End diff --

Oh, I find you added 4 test file ( `QuotedStringParserTest` | 
`QuotedStringValueParserTest` 
|`UnquotedStringParserTest`|`UnquotedStringValueParserTest` ) 
`allowsEmptyField` is true. 
so i keep this `allowsEmptyField` in `ParserTestBase`. 


> FieldParsers do not correctly set EMPT_COLUMN error state
> -
>
> Key: FLINK-8331
> URL: https://issues.apache.org/jira/browse/FLINK-8331
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0, 1.4.1
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a 
> field is empty.
> Instead, they try to parse the field value from an empty String which fails, 
> e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}.
> The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} 
> values. The implementation requires that all {{FieldParser}} correctly return 
> the EMPTY_COLUMN error state in case of an empty field.
> Affected {{FieldParser}}:
> - BigDecParser
> - BigIntParser
> - DoubleParser
> - FloatParser
> - SqlDateParser
> - SqlTimeParser
> - SqlTimestampParser



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159136839
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 0, bytes.length, 
new byte[]{'|'}, parser.createValue());
+
+   assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+   if (this.allowsEmptyField()) {
+   assertTrue("Parser declared the empty string as 
invalid.", numRead != -1);
+   assertEquals("Invalid number of bytes read 
returned.", bytes.length, numRead);
+   } else {
+   assertTrue("Parser accepted the empty string.", 
numRead == -1);
+   }
+   } catch (Exception e) {
+   System.err.println(e.getMessage());
+   e.printStackTrace();
+   fail("Test erroneous: " + e.getMessage());
+   }
+   }
+
+   @Test
+   public void testTailingEmptyField() {
+   try {
+   String[] tailingEmptyFieldStrings = new String[]{"a|", 
"a||"};
 
FieldParser parser = getParser();
 
-   for (String emptyString : emptyStrings) {
-   byte[] bytes = 
emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
-   int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[]{'|'}, parser.createValue());
+   for (String tailingEmptyFieldString : 
tailingEmptyFieldStrings) {
+   byte[] bytes = 
tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 1, 
bytes.length, new byte[]{'|'}, parser.createValue());
 

assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
 
-   if(this.allowsEmptyField()) {
+   if (this.allowsEmptyField()) {
--- End diff --

Oh, I find you added 4 test file ( `QuotedStringParserTest` | 
`QuotedStringValueParserTest` 
|`UnquotedStringParserTest`|`UnquotedStringValueParserTest` ) 
`allowsEmptyField` is true. 
so i keep this `allowsEmptyField` in `ParserTestBase`. 


---


[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307153#comment-16307153
 ] 

ASF GitHub Bot commented on FLINK-8331:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159135908
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 0, bytes.length, 
new byte[]{'|'}, parser.createValue());
+
+   assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+   if (this.allowsEmptyField()) {
+   assertTrue("Parser declared the empty string as 
invalid.", numRead != -1);
+   assertEquals("Invalid number of bytes read 
returned.", bytes.length, numRead);
+   } else {
+   assertTrue("Parser accepted the empty string.", 
numRead == -1);
+   }
+   } catch (Exception e) {
+   System.err.println(e.getMessage());
+   e.printStackTrace();
+   fail("Test erroneous: " + e.getMessage());
+   }
+   }
+
+   @Test
+   public void testTailingEmptyField() {
+   try {
+   String[] tailingEmptyFieldStrings = new String[]{"a|", 
"a||"};
 
FieldParser parser = getParser();
 
-   for (String emptyString : emptyStrings) {
-   byte[] bytes = 
emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
-   int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[]{'|'}, parser.createValue());
+   for (String tailingEmptyFieldString : 
tailingEmptyFieldStrings) {
+   byte[] bytes = 
tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 1, 
bytes.length, new byte[]{'|'}, parser.createValue());
 

assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
 
-   if(this.allowsEmptyField()) {
+   if (this.allowsEmptyField()) {
--- End diff --

We should remove the `allowsEmptyField()` method from `ParserTestBase`. All 
parsers have to support empty fields.


> FieldParsers do not correctly set EMPT_COLUMN error state
> -
>
> Key: FLINK-8331
> URL: https://issues.apache.org/jira/browse/FLINK-8331
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0, 1.4.1
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a 
> field is empty.
> Instead, they try to parse the field value from an empty String which fails, 
> e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}.
> The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} 
> values. The implementation requires that all {{FieldParser}} correctly return 
> the EMPTY_COLUMN error state in case of an empty field.
> Affected {{FieldParser}}:
> - BigDecParser
> - BigIntParser
> - DoubleParser
> - FloatParser
> - SqlDateParser
> - SqlTimeParser
> - SqlTimestampParser



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307155#comment-16307155
 ] 

ASF GitHub Bot commented on FLINK-8331:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159136001
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -362,61 +365,95 @@ public void readStringFieldsWithTrailingDelimiters() 
throws Exception {
 
@Test
public void testTailingEmptyFields() throws Exception {
-   String fileContent = "abc|-def|-ghijk\n" +
-   "abc|-def|-\n" +
-   "abc|-|-\n" +
-   "|-|-|-\n" +
-   "|-|-\n" +
-   "abc|-def\n";
 
-   FileInputSplit split = createTempFile(fileContent);
-
-   TypeInformation[] fieldTypes = new TypeInformation[]{
-   BasicTypeInfo.STRING_TYPE_INFO,
-   BasicTypeInfo.STRING_TYPE_INFO,
-   BasicTypeInfo.STRING_TYPE_INFO};
-
-   RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
fieldTypes, "\n", "|");
-   format.setFieldDelimiter("|-");
-   format.configure(new Configuration());
-   format.open(split);
-
-   Row result = new Row(3);
-
-   result = format.nextRecord(result);
-   assertNotNull(result);
-   assertEquals("abc", result.getField(0));
-   assertEquals("def", result.getField(1));
-   assertEquals("ghijk", result.getField(2));
+   List> dataList 
= new java.util.ArrayList<>();
+
+   // test String
+   dataList.add(new Tuple4<>(BasicTypeInfo.STRING_TYPE_INFO, 
"bdc", "bdc", ""));
+   // test BigInt
+   dataList.add(new Tuple4<>(BasicTypeInfo.BIG_INT_TYPE_INFO,
--- End diff --

I think we can extend this test but should not exhaustively test all types. 
It's the responsibility of the different `FieldParser` to implement the empty 
field logic correctly. Hence, this should be tested in the field parser tests 
and not here.


> FieldParsers do not correctly set EMPT_COLUMN error state
> -
>
> Key: FLINK-8331
> URL: https://issues.apache.org/jira/browse/FLINK-8331
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0, 1.4.1
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a 
> field is empty.
> Instead, they try to parse the field value from an empty String which fails, 
> e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}.
> The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} 
> values. The implementation requires that all {{FieldParser}} correctly return 
> the EMPTY_COLUMN error state in case of an empty field.
> Affected {{FieldParser}}:
> - BigDecParser
> - BigIntParser
> - DoubleParser
> - FloatParser
> - SqlDateParser
> - SqlTimeParser
> - SqlTimestampParser



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307154#comment-16307154
 ] 

ASF GitHub Bot commented on FLINK-8331:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159135944
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
--- End diff --

this is the same case as `"a|"` in `testTailingEmptyField()`. We only need 
one of these.


> FieldParsers do not correctly set EMPT_COLUMN error state
> -
>
> Key: FLINK-8331
> URL: https://issues.apache.org/jira/browse/FLINK-8331
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0, 1.4.1
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a 
> field is empty.
> Instead, they try to parse the field value from an empty String which fails, 
> e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}.
> The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} 
> values. The implementation requires that all {{FieldParser}} correctly return 
> the EMPTY_COLUMN error state in case of an empty field.
> Affected {{FieldParser}}:
> - BigDecParser
> - BigIntParser
> - DoubleParser
> - FloatParser
> - SqlDateParser
> - SqlTimeParser
> - SqlTimestampParser



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-31 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159135944
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
--- End diff --

this is the same case as `"a|"` in `testTailingEmptyField()`. We only need 
one of these.


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-31 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159135908
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 0, bytes.length, 
new byte[]{'|'}, parser.createValue());
+
+   assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+   if (this.allowsEmptyField()) {
+   assertTrue("Parser declared the empty string as 
invalid.", numRead != -1);
+   assertEquals("Invalid number of bytes read 
returned.", bytes.length, numRead);
+   } else {
+   assertTrue("Parser accepted the empty string.", 
numRead == -1);
+   }
+   } catch (Exception e) {
+   System.err.println(e.getMessage());
+   e.printStackTrace();
+   fail("Test erroneous: " + e.getMessage());
+   }
+   }
+
+   @Test
+   public void testTailingEmptyField() {
+   try {
+   String[] tailingEmptyFieldStrings = new String[]{"a|", 
"a||"};
 
FieldParser parser = getParser();
 
-   for (String emptyString : emptyStrings) {
-   byte[] bytes = 
emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
-   int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[]{'|'}, parser.createValue());
+   for (String tailingEmptyFieldString : 
tailingEmptyFieldStrings) {
+   byte[] bytes = 
tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 1, 
bytes.length, new byte[]{'|'}, parser.createValue());
 

assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
 
-   if(this.allowsEmptyField()) {
+   if (this.allowsEmptyField()) {
--- End diff --

We should remove the `allowsEmptyField()` method from `ParserTestBase`. All 
parsers have to support empty fields.


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-31 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159136001
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -362,61 +365,95 @@ public void readStringFieldsWithTrailingDelimiters() 
throws Exception {
 
@Test
public void testTailingEmptyFields() throws Exception {
-   String fileContent = "abc|-def|-ghijk\n" +
-   "abc|-def|-\n" +
-   "abc|-|-\n" +
-   "|-|-|-\n" +
-   "|-|-\n" +
-   "abc|-def\n";
 
-   FileInputSplit split = createTempFile(fileContent);
-
-   TypeInformation[] fieldTypes = new TypeInformation[]{
-   BasicTypeInfo.STRING_TYPE_INFO,
-   BasicTypeInfo.STRING_TYPE_INFO,
-   BasicTypeInfo.STRING_TYPE_INFO};
-
-   RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
fieldTypes, "\n", "|");
-   format.setFieldDelimiter("|-");
-   format.configure(new Configuration());
-   format.open(split);
-
-   Row result = new Row(3);
-
-   result = format.nextRecord(result);
-   assertNotNull(result);
-   assertEquals("abc", result.getField(0));
-   assertEquals("def", result.getField(1));
-   assertEquals("ghijk", result.getField(2));
+   List> dataList 
= new java.util.ArrayList<>();
+
+   // test String
+   dataList.add(new Tuple4<>(BasicTypeInfo.STRING_TYPE_INFO, 
"bdc", "bdc", ""));
+   // test BigInt
+   dataList.add(new Tuple4<>(BasicTypeInfo.BIG_INT_TYPE_INFO,
--- End diff --

I think we can extend this test but should not exhaustively test all types. 
It's the responsibility of the different `FieldParser` to implement the empty 
field logic correctly. Hence, this should be tested in the field parser tests 
and not here.


---


[jira] [Commented] (FLINK-7155) Add Influxdb metrics reporter

2017-12-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307152#comment-16307152
 ] 

ASF GitHub Bot commented on FLINK-7155:
---

Github user gobozov commented on a diff in the pull request:

https://github.com/apache/flink/pull/4299#discussion_r159136038
  
--- Diff: 
flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/InfluxdbReporter.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+import org.apache.flink.metrics.MetricConfig;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.ScheduledReporter;
+import metrics_influxdb.HttpInfluxdbProtocol;
+import 
metrics_influxdb.api.measurements.CategoriesMetricMeasurementTransformer;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class acts as a factory for the {@link 
metrics_influxdb.InfluxdbReporter} and allows using it as a Flink
+ * reporter.
+ */
+@PublicEvolving
+public class InfluxdbReporter extends ScheduledDropwizardReporter {
+
+   public static final String ARG_USER = "user";
+   public static final String ARG_PASSWORD = "password";
+   public static final String ARG_DB = "db";
+
+   @Override
+   public ScheduledReporter getReporter(final MetricConfig config) {
+   final String host = config.getString(ARG_HOST, "localhost");
+   final Integer port = config.getInteger(ARG_PORT, 
HttpInfluxdbProtocol.DEFAULT_PORT);
+   final String user = config.getString(ARG_USER, null);
+   final String password = config.getString(ARG_PASSWORD, null);
+   final String db = config.getString(ARG_DB, "flink");
+   final String conversionRate = 
config.getString(ARG_CONVERSION_RATE, null);
+   final String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, null);
+
+   final metrics_influxdb.InfluxdbReporter.Builder builder =
+   metrics_influxdb.InfluxdbReporter.forRegistry(registry);
+
+   builder.protocol(new HttpInfluxdbProtocol(host, port, user, 
password, db));
--- End diff --

We built same thing for our needs using same InfluxDb reporter. The 
disadvantage is that it does not support https, we modified it to support 
sslContext. I also don't see way to provide tags to reporter builder.


> Add Influxdb metrics reporter
> -
>
> Key: FLINK-7155
> URL: https://issues.apache.org/jira/browse/FLINK-7155
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Patrick Lucas
>Assignee: Patrick Lucas
>
> [~jgrier] has a [simple Influxdb metrics reporter for 
> Flink|https://github.com/jgrier/flink-stuff/tree/master/flink-influx-reporter]
>  that is a thing wrapper around [a lightweight, public-domain Influxdb 
> reporter|https://github.com/davidB/metrics-influxdb] for Codahale metrics.
> We can implement this very easily in Java in the same as as 
> flink-metrics-graphite.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4299: [FLINK-7155] [metrics] Add Influxdb reporter

2017-12-31 Thread gobozov
Github user gobozov commented on a diff in the pull request:

https://github.com/apache/flink/pull/4299#discussion_r159136038
  
--- Diff: 
flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/InfluxdbReporter.java
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
+import org.apache.flink.metrics.MetricConfig;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.ScheduledReporter;
+import metrics_influxdb.HttpInfluxdbProtocol;
+import 
metrics_influxdb.api.measurements.CategoriesMetricMeasurementTransformer;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class acts as a factory for the {@link 
metrics_influxdb.InfluxdbReporter} and allows using it as a Flink
+ * reporter.
+ */
+@PublicEvolving
+public class InfluxdbReporter extends ScheduledDropwizardReporter {
+
+   public static final String ARG_USER = "user";
+   public static final String ARG_PASSWORD = "password";
+   public static final String ARG_DB = "db";
+
+   @Override
+   public ScheduledReporter getReporter(final MetricConfig config) {
+   final String host = config.getString(ARG_HOST, "localhost");
+   final Integer port = config.getInteger(ARG_PORT, 
HttpInfluxdbProtocol.DEFAULT_PORT);
+   final String user = config.getString(ARG_USER, null);
+   final String password = config.getString(ARG_PASSWORD, null);
+   final String db = config.getString(ARG_DB, "flink");
+   final String conversionRate = 
config.getString(ARG_CONVERSION_RATE, null);
+   final String conversionDuration = 
config.getString(ARG_CONVERSION_DURATION, null);
+
+   final metrics_influxdb.InfluxdbReporter.Builder builder =
+   metrics_influxdb.InfluxdbReporter.forRegistry(registry);
+
+   builder.protocol(new HttpInfluxdbProtocol(host, port, user, 
password, db));
--- End diff --

We built same thing for our needs using same InfluxDb reporter. The 
disadvantage is that it does not support https, we modified it to support 
sslContext. I also don't see way to provide tags to reporter builder.


---