[GitHub] flink pull request #5222: Fix typo in docs/dev/api_concepts.md
GitHub user okumin opened a pull request: https://github.com/apache/flink/pull/5222 Fix typo in docs/dev/api_concepts.md *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* * Remove duplicated `program`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/okumin/flink fix-typo-api-concepts Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5222.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5222 commit e50f287609be3e19d904c17fa7330b2e72088420 Author: okuminDate: 2018-01-01T06:58:37Z Fix typo in docs/dev/api_concepts.md Remove duplicated `program`. ---
[GitHub] flink pull request #5221: [hotfix] [docs] Format Scala programs in docs
GitHub user okumin opened a pull request: https://github.com/apache/flink/pull/5221 [hotfix] [docs] Format Scala programs in docs *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* * remove unneeded semi-colons * add `()` to `print` method * typically, methods with some side-effects are invoked with `()` * replace some Java codes in Scala examples into Scala codes You can merge this pull request into a Git repository by running: $ git pull https://github.com/okumin/flink format-scala-code Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5221.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5221 commit 96521134a0b0ac2be0b8afc7b1c01a0a0c18fe09 Author: okuminDate: 2018-01-01T06:25:54Z Format Scala programs in docs * remove unneeded semi-colons * add `()` to `print` method * typically, methods with some side-effects are invoked with `()` * replace some Java codes in Scala examples into Scala codes ---
[jira] [Closed] (FLINK-8227) Optimize the performance of SharedBufferSerializer
[ https://issues.apache.org/jira/browse/FLINK-8227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-8227. -- > Optimize the performance of SharedBufferSerializer > -- > > Key: FLINK-8227 > URL: https://issues.apache.org/jira/browse/FLINK-8227 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > Currently {{SharedBufferSerializer.serialize()}} will create a HashMap and > put all the {{SharedBufferEntry}} into it. Usually this is not a problem. But > we obverse that in some cases the calculation of hashCode may become the > bottleneck. The performance will decrease as the number of > {{SharedBufferEdge}} increases. For looping pattern {{A*}}, if the number of > {{SharedBufferEntry}} is {{N}}, the number of {{SharedBufferEdge}} is about > {{N * N}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8227) Optimize the performance of SharedBufferSerializer
[ https://issues.apache.org/jira/browse/FLINK-8227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu resolved FLINK-8227. Resolution: Fixed > Optimize the performance of SharedBufferSerializer > -- > > Key: FLINK-8227 > URL: https://issues.apache.org/jira/browse/FLINK-8227 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > Currently {{SharedBufferSerializer.serialize()}} will create a HashMap and > put all the {{SharedBufferEntry}} into it. Usually this is not a problem. But > we obverse that in some cases the calculation of hashCode may become the > bottleneck. The performance will decrease as the number of > {{SharedBufferEdge}} increases. For looping pattern {{A*}}, if the number of > {{SharedBufferEntry}} is {{N}}, the number of {{SharedBufferEdge}} is about > {{N * N}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8226) Dangling reference generated after NFA clean up timed out SharedBufferEntry
[ https://issues.apache.org/jira/browse/FLINK-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307367#comment-16307367 ] ASF GitHub Bot commented on FLINK-8226: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/5141#discussion_r159149194 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java --- @@ -191,22 +191,28 @@ public boolean isEmpty() { */ public boolean prune(long pruningTimestamp) { Iterator>> iter = pages.entrySet().iterator(); - boolean pruned = false; + List > prunedEntries = new ArrayList<>(); while (iter.hasNext()) { SharedBufferPage page = iter.next().getValue(); - if (page.prune(pruningTimestamp)) { - pruned = true; - } + page.prune(pruningTimestamp, prunedEntries); if (page.isEmpty()) { // delete page if it is empty iter.remove(); } } - return pruned; + if (!prunedEntries.isEmpty()) { + for (Map.Entry > entry : pages.entrySet()) { + entry.getValue().removeEdges(prunedEntries); + } + prunedEntries.clear(); --- End diff -- Updated. > Dangling reference generated after NFA clean up timed out SharedBufferEntry > --- > > Key: FLINK-8226 > URL: https://issues.apache.org/jira/browse/FLINK-8226 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > Fix For: 1.5.0, 1.4.1 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5141: [FLINK-8226] [cep] Dangling reference generated af...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/5141#discussion_r159149194 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java --- @@ -191,22 +191,28 @@ public boolean isEmpty() { */ public boolean prune(long pruningTimestamp) { Iterator>> iter = pages.entrySet().iterator(); - boolean pruned = false; + List > prunedEntries = new ArrayList<>(); while (iter.hasNext()) { SharedBufferPage page = iter.next().getValue(); - if (page.prune(pruningTimestamp)) { - pruned = true; - } + page.prune(pruningTimestamp, prunedEntries); if (page.isEmpty()) { // delete page if it is empty iter.remove(); } } - return pruned; + if (!prunedEntries.isEmpty()) { + for (Map.Entry > entry : pages.entrySet()) { + entry.getValue().removeEdges(prunedEntries); + } + prunedEntries.clear(); --- End diff -- Updated. ---
[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148351 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -109,4 +110,75 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** +* Returns the Int number after the input number left shift n bits +* @param input Int type +* @param n +* @return input << n +*/ + def shiftLeft(input: Int, n: Int): Int = { +_shiftLeft(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number left shift n bits +* @param input Long type +* @param n +* @return input << n +*/ + def shiftLeft(input: Long, n: Int): Long = { +_shiftLeft(input, n).asInstanceOf[Long] + } + + /** +* Returns the Int number after the input number right shift n bits +* @param input Int type +* @param n +* @return input >> n +*/ + def shiftRight(input: Int, n: Int): Int = { +_shiftRight(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number right shift n bits +* @param input Long type +* @param n +* @return input >> n +*/ + def shiftRight(input: Long, n: Int): Long = { +_shiftRight(input, n).asInstanceOf[Long] --- End diff -- input >> n ---
[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307349#comment-16307349 ] ASF GitHub Bot commented on FLINK-8302: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148340 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -109,4 +110,75 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** +* Returns the Int number after the input number left shift n bits +* @param input Int type +* @param n +* @return input << n +*/ + def shiftLeft(input: Int, n: Int): Int = { +_shiftLeft(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number left shift n bits +* @param input Long type +* @param n +* @return input << n +*/ + def shiftLeft(input: Long, n: Int): Long = { +_shiftLeft(input, n).asInstanceOf[Long] + } + + /** +* Returns the Int number after the input number right shift n bits +* @param input Int type +* @param n +* @return input >> n +*/ + def shiftRight(input: Int, n: Int): Int = { +_shiftRight(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number right shift n bits +* @param input Long type +* @param n +* @return input >> n +*/ + def shiftRight(input: Long, n: Int): Long = { +_shiftRight(input, n).asInstanceOf[Long] + } + + /** +* Returns the number after the input number left shift n bits +* Input must be type 'Long' or type 'Int' +*/ + private def _shiftLeft(input: Any, n: Int): Any = { +input match { + case l: jl.Long => l << n + case i: jl.Integer => i << n + case _ => +throw new IllegalArgumentException( + s"type of input in function 'shiftLeft(input, n)' must be Long or Integer" +) +} + } + + /** +* Returns the number after the input number right shift n bits +* Input must be type 'Long' or type 'Int' +*/ + private def _shiftRight(input: Any, n: Int): Any = { +input match { + case l: jl.Long => l >> n + case i: jl.Integer => i >> n + case _ => +throw new IllegalArgumentException( --- End diff -- never run here? > Support shift_left and shift_right in TableAPI > -- > > Key: FLINK-8302 > URL: https://issues.apache.org/jira/browse/FLINK-8302 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: DuBin > Labels: features > Fix For: 1.5.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Add shift_left and shift_right support in TableAPI, shift_left(input, n) act > as input << n, shift_right(input, n) act as input >> n. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307346#comment-16307346 ] ASF GitHub Bot commented on FLINK-8302: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148351 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -109,4 +110,75 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** +* Returns the Int number after the input number left shift n bits +* @param input Int type +* @param n +* @return input << n +*/ + def shiftLeft(input: Int, n: Int): Int = { +_shiftLeft(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number left shift n bits +* @param input Long type +* @param n +* @return input << n +*/ + def shiftLeft(input: Long, n: Int): Long = { +_shiftLeft(input, n).asInstanceOf[Long] + } + + /** +* Returns the Int number after the input number right shift n bits +* @param input Int type +* @param n +* @return input >> n +*/ + def shiftRight(input: Int, n: Int): Int = { +_shiftRight(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number right shift n bits +* @param input Long type +* @param n +* @return input >> n +*/ + def shiftRight(input: Long, n: Int): Long = { +_shiftRight(input, n).asInstanceOf[Long] --- End diff -- input >> n > Support shift_left and shift_right in TableAPI > -- > > Key: FLINK-8302 > URL: https://issues.apache.org/jira/browse/FLINK-8302 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: DuBin > Labels: features > Fix For: 1.5.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Add shift_left and shift_right support in TableAPI, shift_left(input, n) act > as input << n, shift_right(input, n) act as input >> n. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307348#comment-16307348 ] ASF GitHub Bot commented on FLINK-8302: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148053 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala --- @@ -1216,6 +1216,69 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { ) } + @Test + def testShiftLeft(): Unit = { +testSqlApi( + "SHIFT_LEFT(1,1)", + "2" +) + +testSqlApi( + "SHIFT_LEFT(21,1)", + "42" +) + +testSqlApi( + "SHIFT_LEFT(21,1)", + "42" +) + --- End diff -- Remove duplicate test case. > Support shift_left and shift_right in TableAPI > -- > > Key: FLINK-8302 > URL: https://issues.apache.org/jira/browse/FLINK-8302 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: DuBin > Labels: features > Fix For: 1.5.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Add shift_left and shift_right support in TableAPI, shift_left(input, n) act > as input << n, shift_right(input, n) act as input >> n. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307351#comment-16307351 ] ASF GitHub Bot commented on FLINK-8302: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148349 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -109,4 +110,75 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** +* Returns the Int number after the input number left shift n bits +* @param input Int type +* @param n +* @return input << n +*/ + def shiftLeft(input: Int, n: Int): Int = { +_shiftLeft(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number left shift n bits +* @param input Long type +* @param n +* @return input << n +*/ + def shiftLeft(input: Long, n: Int): Long = { +_shiftLeft(input, n).asInstanceOf[Long] + } + + /** +* Returns the Int number after the input number right shift n bits +* @param input Int type +* @param n +* @return input >> n +*/ + def shiftRight(input: Int, n: Int): Int = { +_shiftRight(input, n).asInstanceOf[Int] --- End diff -- input >> n > Support shift_left and shift_right in TableAPI > -- > > Key: FLINK-8302 > URL: https://issues.apache.org/jira/browse/FLINK-8302 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: DuBin > Labels: features > Fix For: 1.5.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Add shift_left and shift_right support in TableAPI, shift_left(input, n) act > as input << n, shift_right(input, n) act as input >> n. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148499 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala --- @@ -58,4 +58,22 @@ object ScalarSqlFunctions { OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)), SqlFunctionCategory.NUMERIC) + val SHIFT_LEFT = new SqlFunction( +"SHIFT_LEFT", +SqlKind.OTHER_FUNCTION, +ReturnTypes.ARG0, +null, +OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC), --- End diff -- Second operand should be SqlTypeFamily.INTEGER. `OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC) -> OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.INTEGER) ` ---
[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307354#comment-16307354 ] ASF GitHub Bot commented on FLINK-8302: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148352 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -109,4 +110,75 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** +* Returns the Int number after the input number left shift n bits +* @param input Int type +* @param n +* @return input << n +*/ + def shiftLeft(input: Int, n: Int): Int = { +_shiftLeft(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number left shift n bits +* @param input Long type +* @param n +* @return input << n +*/ + def shiftLeft(input: Long, n: Int): Long = { +_shiftLeft(input, n).asInstanceOf[Long] + } + + /** +* Returns the Int number after the input number right shift n bits +* @param input Int type +* @param n +* @return input >> n +*/ + def shiftRight(input: Int, n: Int): Int = { +_shiftRight(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number right shift n bits +* @param input Long type +* @param n +* @return input >> n +*/ + def shiftRight(input: Long, n: Int): Long = { +_shiftRight(input, n).asInstanceOf[Long] + } + + /** +* Returns the number after the input number left shift n bits +* Input must be type 'Long' or type 'Int' +*/ + private def _shiftLeft(input: Any, n: Int): Any = { --- End diff -- remove this method? > Support shift_left and shift_right in TableAPI > -- > > Key: FLINK-8302 > URL: https://issues.apache.org/jira/browse/FLINK-8302 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: DuBin > Labels: features > Fix For: 1.5.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Add shift_left and shift_right support in TableAPI, shift_left(input, n) act > as input << n, shift_right(input, n) act as input >> n. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148345 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -109,4 +110,75 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** +* Returns the Int number after the input number left shift n bits +* @param input Int type +* @param n +* @return input << n +*/ + def shiftLeft(input: Int, n: Int): Int = { +_shiftLeft(input, n).asInstanceOf[Int] --- End diff -- input << n ? ---
[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307347#comment-16307347 ] ASF GitHub Bot commented on FLINK-8302: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148345 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -109,4 +110,75 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** +* Returns the Int number after the input number left shift n bits +* @param input Int type +* @param n +* @return input << n +*/ + def shiftLeft(input: Int, n: Int): Int = { +_shiftLeft(input, n).asInstanceOf[Int] --- End diff -- input << n ? > Support shift_left and shift_right in TableAPI > -- > > Key: FLINK-8302 > URL: https://issues.apache.org/jira/browse/FLINK-8302 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: DuBin > Labels: features > Fix For: 1.5.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Add shift_left and shift_right support in TableAPI, shift_left(input, n) act > as input << n, shift_right(input, n) act as input >> n. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307353#comment-16307353 ] ASF GitHub Bot commented on FLINK-8302: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148353 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -109,4 +110,75 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** +* Returns the Int number after the input number left shift n bits +* @param input Int type +* @param n +* @return input << n +*/ + def shiftLeft(input: Int, n: Int): Int = { +_shiftLeft(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number left shift n bits +* @param input Long type +* @param n +* @return input << n +*/ + def shiftLeft(input: Long, n: Int): Long = { +_shiftLeft(input, n).asInstanceOf[Long] + } + + /** +* Returns the Int number after the input number right shift n bits +* @param input Int type +* @param n +* @return input >> n +*/ + def shiftRight(input: Int, n: Int): Int = { +_shiftRight(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number right shift n bits +* @param input Long type +* @param n +* @return input >> n +*/ + def shiftRight(input: Long, n: Int): Long = { +_shiftRight(input, n).asInstanceOf[Long] + } + + /** +* Returns the number after the input number left shift n bits +* Input must be type 'Long' or type 'Int' +*/ + private def _shiftLeft(input: Any, n: Int): Any = { +input match { + case l: jl.Long => l << n + case i: jl.Integer => i << n + case _ => +throw new IllegalArgumentException( + s"type of input in function 'shiftLeft(input, n)' must be Long or Integer" +) +} + } + + /** +* Returns the number after the input number right shift n bits +* Input must be type 'Long' or type 'Int' +*/ + private def _shiftRight(input: Any, n: Int): Any = { --- End diff -- remove this method? > Support shift_left and shift_right in TableAPI > -- > > Key: FLINK-8302 > URL: https://issues.apache.org/jira/browse/FLINK-8302 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: DuBin > Labels: features > Fix For: 1.5.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Add shift_left and shift_right support in TableAPI, shift_left(input, n) act > as input << n, shift_right(input, n) act as input >> n. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307350#comment-16307350 ] ASF GitHub Bot commented on FLINK-8302: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148346 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -109,4 +110,75 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** +* Returns the Int number after the input number left shift n bits +* @param input Int type +* @param n +* @return input << n +*/ + def shiftLeft(input: Int, n: Int): Int = { +_shiftLeft(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number left shift n bits +* @param input Long type +* @param n +* @return input << n +*/ + def shiftLeft(input: Long, n: Int): Long = { +_shiftLeft(input, n).asInstanceOf[Long] --- End diff -- input << n > Support shift_left and shift_right in TableAPI > -- > > Key: FLINK-8302 > URL: https://issues.apache.org/jira/browse/FLINK-8302 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: DuBin > Labels: features > Fix For: 1.5.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Add shift_left and shift_right support in TableAPI, shift_left(input, n) act > as input << n, shift_right(input, n) act as input >> n. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307355#comment-16307355 ] ASF GitHub Bot commented on FLINK-8302: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148499 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala --- @@ -58,4 +58,22 @@ object ScalarSqlFunctions { OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)), SqlFunctionCategory.NUMERIC) + val SHIFT_LEFT = new SqlFunction( +"SHIFT_LEFT", +SqlKind.OTHER_FUNCTION, +ReturnTypes.ARG0, +null, +OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC), --- End diff -- Second operand should be SqlTypeFamily.INTEGER. `OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC) -> OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.INTEGER) ` > Support shift_left and shift_right in TableAPI > -- > > Key: FLINK-8302 > URL: https://issues.apache.org/jira/browse/FLINK-8302 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: DuBin > Labels: features > Fix For: 1.5.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Add shift_left and shift_right support in TableAPI, shift_left(input, n) act > as input << n, shift_right(input, n) act as input >> n. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8302) Support shift_left and shift_right in TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307352#comment-16307352 ] ASF GitHub Bot commented on FLINK-8302: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148500 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala --- @@ -58,4 +58,22 @@ object ScalarSqlFunctions { OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)), SqlFunctionCategory.NUMERIC) + val SHIFT_LEFT = new SqlFunction( +"SHIFT_LEFT", +SqlKind.OTHER_FUNCTION, +ReturnTypes.ARG0, +null, +OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC), +SqlFunctionCategory.NUMERIC + ) + + val SHIFT_RIGHT = new SqlFunction( +"SHIFT_RIGHT", +SqlKind.OTHER_FUNCTION, +ReturnTypes.ARG0, +null, +OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC), +SqlFunctionCategory.NUMERIC --- End diff -- Second operand should be SqlTypeFamily.INTEGER. `OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC) -> OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.INTEGER) ` > Support shift_left and shift_right in TableAPI > -- > > Key: FLINK-8302 > URL: https://issues.apache.org/jira/browse/FLINK-8302 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: DuBin > Labels: features > Fix For: 1.5.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > Add shift_left and shift_right support in TableAPI, shift_left(input, n) act > as input << n, shift_right(input, n) act as input >> n. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148353 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -109,4 +110,75 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** +* Returns the Int number after the input number left shift n bits +* @param input Int type +* @param n +* @return input << n +*/ + def shiftLeft(input: Int, n: Int): Int = { +_shiftLeft(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number left shift n bits +* @param input Long type +* @param n +* @return input << n +*/ + def shiftLeft(input: Long, n: Int): Long = { +_shiftLeft(input, n).asInstanceOf[Long] + } + + /** +* Returns the Int number after the input number right shift n bits +* @param input Int type +* @param n +* @return input >> n +*/ + def shiftRight(input: Int, n: Int): Int = { +_shiftRight(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number right shift n bits +* @param input Long type +* @param n +* @return input >> n +*/ + def shiftRight(input: Long, n: Int): Long = { +_shiftRight(input, n).asInstanceOf[Long] + } + + /** +* Returns the number after the input number left shift n bits +* Input must be type 'Long' or type 'Int' +*/ + private def _shiftLeft(input: Any, n: Int): Any = { +input match { + case l: jl.Long => l << n + case i: jl.Integer => i << n + case _ => +throw new IllegalArgumentException( + s"type of input in function 'shiftLeft(input, n)' must be Long or Integer" +) +} + } + + /** +* Returns the number after the input number right shift n bits +* Input must be type 'Long' or type 'Int' +*/ + private def _shiftRight(input: Any, n: Int): Any = { --- End diff -- remove this method? ---
[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148352 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -109,4 +110,75 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** +* Returns the Int number after the input number left shift n bits +* @param input Int type +* @param n +* @return input << n +*/ + def shiftLeft(input: Int, n: Int): Int = { +_shiftLeft(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number left shift n bits +* @param input Long type +* @param n +* @return input << n +*/ + def shiftLeft(input: Long, n: Int): Long = { +_shiftLeft(input, n).asInstanceOf[Long] + } + + /** +* Returns the Int number after the input number right shift n bits +* @param input Int type +* @param n +* @return input >> n +*/ + def shiftRight(input: Int, n: Int): Int = { +_shiftRight(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number right shift n bits +* @param input Long type +* @param n +* @return input >> n +*/ + def shiftRight(input: Long, n: Int): Long = { +_shiftRight(input, n).asInstanceOf[Long] + } + + /** +* Returns the number after the input number left shift n bits +* Input must be type 'Long' or type 'Int' +*/ + private def _shiftLeft(input: Any, n: Int): Any = { --- End diff -- remove this method? ---
[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148346 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -109,4 +110,75 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** +* Returns the Int number after the input number left shift n bits +* @param input Int type +* @param n +* @return input << n +*/ + def shiftLeft(input: Int, n: Int): Int = { +_shiftLeft(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number left shift n bits +* @param input Long type +* @param n +* @return input << n +*/ + def shiftLeft(input: Long, n: Int): Long = { +_shiftLeft(input, n).asInstanceOf[Long] --- End diff -- input << n ---
[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148340 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -109,4 +110,75 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** +* Returns the Int number after the input number left shift n bits +* @param input Int type +* @param n +* @return input << n +*/ + def shiftLeft(input: Int, n: Int): Int = { +_shiftLeft(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number left shift n bits +* @param input Long type +* @param n +* @return input << n +*/ + def shiftLeft(input: Long, n: Int): Long = { +_shiftLeft(input, n).asInstanceOf[Long] + } + + /** +* Returns the Int number after the input number right shift n bits +* @param input Int type +* @param n +* @return input >> n +*/ + def shiftRight(input: Int, n: Int): Int = { +_shiftRight(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number right shift n bits +* @param input Long type +* @param n +* @return input >> n +*/ + def shiftRight(input: Long, n: Int): Long = { +_shiftRight(input, n).asInstanceOf[Long] + } + + /** +* Returns the number after the input number left shift n bits +* Input must be type 'Long' or type 'Int' +*/ + private def _shiftLeft(input: Any, n: Int): Any = { +input match { + case l: jl.Long => l << n + case i: jl.Integer => i << n + case _ => +throw new IllegalArgumentException( + s"type of input in function 'shiftLeft(input, n)' must be Long or Integer" +) +} + } + + /** +* Returns the number after the input number right shift n bits +* Input must be type 'Long' or type 'Int' +*/ + private def _shiftRight(input: Any, n: Int): Any = { +input match { + case l: jl.Long => l >> n + case i: jl.Integer => i >> n + case _ => +throw new IllegalArgumentException( --- End diff -- never run here? ---
[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148053 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala --- @@ -1216,6 +1216,69 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { ) } + @Test + def testShiftLeft(): Unit = { +testSqlApi( + "SHIFT_LEFT(1,1)", + "2" +) + +testSqlApi( + "SHIFT_LEFT(21,1)", + "42" +) + +testSqlApi( + "SHIFT_LEFT(21,1)", + "42" +) + --- End diff -- Remove duplicate test case. ---
[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148349 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala --- @@ -109,4 +110,75 @@ object ScalarFunctions { Math.log(x) / Math.log(base) } } + + /** +* Returns the Int number after the input number left shift n bits +* @param input Int type +* @param n +* @return input << n +*/ + def shiftLeft(input: Int, n: Int): Int = { +_shiftLeft(input, n).asInstanceOf[Int] + } + + /** +* Returns the Long number after the input number left shift n bits +* @param input Long type +* @param n +* @return input << n +*/ + def shiftLeft(input: Long, n: Int): Long = { +_shiftLeft(input, n).asInstanceOf[Long] + } + + /** +* Returns the Int number after the input number right shift n bits +* @param input Int type +* @param n +* @return input >> n +*/ + def shiftRight(input: Int, n: Int): Int = { +_shiftRight(input, n).asInstanceOf[Int] --- End diff -- input >> n ---
[GitHub] flink pull request #5202: [FLINK-8302][table]Add SHIFT_LEFT and SHIFT_RIGHT ...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5202#discussion_r159148500 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala --- @@ -58,4 +58,22 @@ object ScalarSqlFunctions { OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)), SqlFunctionCategory.NUMERIC) + val SHIFT_LEFT = new SqlFunction( +"SHIFT_LEFT", +SqlKind.OTHER_FUNCTION, +ReturnTypes.ARG0, +null, +OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC), +SqlFunctionCategory.NUMERIC + ) + + val SHIFT_RIGHT = new SqlFunction( +"SHIFT_RIGHT", +SqlKind.OTHER_FUNCTION, +ReturnTypes.ARG0, +null, +OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC), +SqlFunctionCategory.NUMERIC --- End diff -- Second operand should be SqlTypeFamily.INTEGER. `OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC) -> OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.INTEGER) ` ---
[jira] [Commented] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase
[ https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307281#comment-16307281 ] Ted Yu commented on FLINK-6105: --- In flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java : {code} try { Thread.sleep(500); } catch (InterruptedException e1) { // ignore it } {code} > Properly handle InterruptedException in HadoopInputFormatBase > - > > Key: FLINK-6105 > URL: https://issues.apache.org/jira/browse/FLINK-6105 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu >Assignee: mingleizhang > > When catching InterruptedException, we should throw InterruptedIOException > instead of IOException. > The following example is from HadoopInputFormatBase : > {code} > try { > splits = this.mapreduceInputFormat.getSplits(jobContext); > } catch (InterruptedException e) { > throw new IOException("Could not get Splits.", e); > } > {code} > There may be other places where IOE is thrown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8333) Split command options from deployment options in CliFrontend
[ https://issues.apache.org/jira/browse/FLINK-8333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307274#comment-16307274 ] ASF GitHub Bot commented on FLINK-8333: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5220 [FLINK-8333] [flip6] Separate deployment options from command options ## What is the purpose of the change This commit separates the parsing of command options and deployment options into two steps. This makes it easier to make the CustomCommandLines non-static. Moreover, this commit moves the CliFrontend into the cli sub package. This PR is based on #5219. ## Brief change log - Move `CliFrontend` to `org.apache.flink.client.cli` - Decouple creation of `RunOptions`, `ListOptions`, etc. from `CliFrontendParser` ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink splitCliOptions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5220.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5220 commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b Author: Till RohrmannDate: 2017-12-07T12:57:24Z [FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient Introduce YarnApplicationStatusMonitor which does the Yarn ApplicationStatus polling in the FlinkYarnSessionCli. This decouples the YarnClusterClient from the actual communication with Yarn and, thus, gives a better separation of concerns. commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44 Author: Till Rohrmann Date: 2017-12-20T15:43:21Z [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor Moves the YarnClient from the YarnClusterClient to the AbstractYarnClusterDescriptor. This makes the latter responsible for the lifecycle management of the client and gives a better separation of concerns. commit 0cd22bf559eb820f3e2d381686752f583f4f16ff Author: Till Rohrmann Date: 2017-12-18T17:59:30Z [FLINK-8332] [flip6] Move savepoint dispose into ClusterClient Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives a better separation of concerns and allows the CliFrontend to be used with different ClusterClient implementations. commit 2b4c1efb79ffdfd1423240047dce8e2f038958c7 Author: Till Rohrmann Date: 2017-12-20T16:19:59Z [FLINK-8333] [flip6] Separate deployment options from command options This commit separates the parsing of command options and deployment options into two steps. This makes it easier to make the CustomCommandLines non-static. Moreover, this commit moves the CliFrontend into the cli sub package. > Split command options from deployment options in CliFrontend > > > Key: FLINK-8333 > URL: https://issues.apache.org/jira/browse/FLINK-8333 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to better support different {{CustomCommandLines}} we should split > the command and deployment option parsing in the {{CliFrontend}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5220: [FLINK-8333] [flip6] Separate deployment options f...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5220 [FLINK-8333] [flip6] Separate deployment options from command options ## What is the purpose of the change This commit separates the parsing of command options and deployment options into two steps. This makes it easier to make the CustomCommandLines non-static. Moreover, this commit moves the CliFrontend into the cli sub package. This PR is based on #5219. ## Brief change log - Move `CliFrontend` to `org.apache.flink.client.cli` - Decouple creation of `RunOptions`, `ListOptions`, etc. from `CliFrontendParser` ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink splitCliOptions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5220.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5220 commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b Author: Till RohrmannDate: 2017-12-07T12:57:24Z [FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient Introduce YarnApplicationStatusMonitor which does the Yarn ApplicationStatus polling in the FlinkYarnSessionCli. This decouples the YarnClusterClient from the actual communication with Yarn and, thus, gives a better separation of concerns. commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44 Author: Till Rohrmann Date: 2017-12-20T15:43:21Z [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor Moves the YarnClient from the YarnClusterClient to the AbstractYarnClusterDescriptor. This makes the latter responsible for the lifecycle management of the client and gives a better separation of concerns. commit 0cd22bf559eb820f3e2d381686752f583f4f16ff Author: Till Rohrmann Date: 2017-12-18T17:59:30Z [FLINK-8332] [flip6] Move savepoint dispose into ClusterClient Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives a better separation of concerns and allows the CliFrontend to be used with different ClusterClient implementations. commit 2b4c1efb79ffdfd1423240047dce8e2f038958c7 Author: Till Rohrmann Date: 2017-12-20T16:19:59Z [FLINK-8333] [flip6] Separate deployment options from command options This commit separates the parsing of command options and deployment options into two steps. This makes it easier to make the CustomCommandLines non-static. Moreover, this commit moves the CliFrontend into the cli sub package. ---
[jira] [Created] (FLINK-8333) Split command options from deployment options in CliFrontend
Till Rohrmann created FLINK-8333: Summary: Split command options from deployment options in CliFrontend Key: FLINK-8333 URL: https://issues.apache.org/jira/browse/FLINK-8333 Project: Flink Issue Type: Improvement Components: Client Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.0 In order to better support different {{CustomCommandLines}} we should split the command and deployment option parsing in the {{CliFrontend}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8332) Move dispose savepoint into ClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307269#comment-16307269 ] ASF GitHub Bot commented on FLINK-8332: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5219 [FLINK-8332] [flip6] Move savepoint dispose into ClusterClient ## What is the purpose of the change Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives a better separation of concerns and allows the CliFrontend to be used with different ClusterClient implementations. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink refactorSavepointCommand Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5219.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5219 commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b Author: Till RohrmannDate: 2017-12-07T12:57:24Z [FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient Introduce YarnApplicationStatusMonitor which does the Yarn ApplicationStatus polling in the FlinkYarnSessionCli. This decouples the YarnClusterClient from the actual communication with Yarn and, thus, gives a better separation of concerns. commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44 Author: Till Rohrmann Date: 2017-12-20T15:43:21Z [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor Moves the YarnClient from the YarnClusterClient to the AbstractYarnClusterDescriptor. This makes the latter responsible for the lifecycle management of the client and gives a better separation of concerns. commit 0cd22bf559eb820f3e2d381686752f583f4f16ff Author: Till Rohrmann Date: 2017-12-18T17:59:30Z [FLINK-8332] [flip6] Move savepoint dispose into ClusterClient Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives a better separation of concerns and allows the CliFrontend to be used with different ClusterClient implementations. > Move dispose savepoint into ClusterClient > - > > Key: FLINK-8332 > URL: https://issues.apache.org/jira/browse/FLINK-8332 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Currently, the {{CliFrontend}} sends the command for disposing a savepoint. > In order to better abstract this functionality we should move it to the > {{ClusterClient}}. That way we can have different implementations of the > {{ClusterClient}} (Flip-6 and old code) which are used by the same > {{CliFrontend}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5219: [FLINK-8332] [flip6] Move savepoint dispose into C...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5219 [FLINK-8332] [flip6] Move savepoint dispose into ClusterClient ## What is the purpose of the change Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives a better separation of concerns and allows the CliFrontend to be used with different ClusterClient implementations. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink refactorSavepointCommand Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5219.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5219 commit c48324702b0ebff3fd7a16e480ead90fc4c9a30b Author: Till RohrmannDate: 2017-12-07T12:57:24Z [FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient Introduce YarnApplicationStatusMonitor which does the Yarn ApplicationStatus polling in the FlinkYarnSessionCli. This decouples the YarnClusterClient from the actual communication with Yarn and, thus, gives a better separation of concerns. commit 783bd1daf36b34a74cbe718a52a1043ba38d5a44 Author: Till Rohrmann Date: 2017-12-20T15:43:21Z [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor Moves the YarnClient from the YarnClusterClient to the AbstractYarnClusterDescriptor. This makes the latter responsible for the lifecycle management of the client and gives a better separation of concerns. commit 0cd22bf559eb820f3e2d381686752f583f4f16ff Author: Till Rohrmann Date: 2017-12-18T17:59:30Z [FLINK-8332] [flip6] Move savepoint dispose into ClusterClient Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives a better separation of concerns and allows the CliFrontend to be used with different ClusterClient implementations. ---
[jira] [Created] (FLINK-8332) Move dispose savepoint into ClusterClient
Till Rohrmann created FLINK-8332: Summary: Move dispose savepoint into ClusterClient Key: FLINK-8332 URL: https://issues.apache.org/jira/browse/FLINK-8332 Project: Flink Issue Type: Improvement Components: Client Affects Versions: 1.5.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.5.0 Currently, the {{CliFrontend}} sends the command for disposing a savepoint. In order to better abstract this functionality we should move it to the {{ClusterClient}}. That way we can have different implementations of the {{ClusterClient}} (Flip-6 and old code) which are used by the same {{CliFrontend}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8329) Move YarnClient out of YarnClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307268#comment-16307268 ] ASF GitHub Bot commented on FLINK-8329: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5216 Corrected the checkstyle violations. > Move YarnClient out of YarnClusterClient > > > Key: FLINK-8329 > URL: https://issues.apache.org/jira/browse/FLINK-8329 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Move the {{YarnClient}} from the {{YarnClusterClient}} to the > {{AbstractYarnClusterDescriptor}} which will be responsible for the lifecycle > management of the {{YarnClient}}. This change is a clean up task which will > better structure the client code. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5216: [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClust...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5216 Corrected the checkstyle violations. ---
[jira] [Closed] (FLINK-8330) Remove FlinkYarnCLI
[ https://issues.apache.org/jira/browse/FLINK-8330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8330. Resolution: Fixed Removed via ce62945ae6c1b1878bebc9a528e63ef7a54cb897 > Remove FlinkYarnCLI > --- > > Key: FLINK-8330 > URL: https://issues.apache.org/jira/browse/FLINK-8330 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Remove the {{FlinkYarnCLI}} because it is no longer needed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8330) Remove FlinkYarnCLI
[ https://issues.apache.org/jira/browse/FLINK-8330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307246#comment-16307246 ] ASF GitHub Bot commented on FLINK-8330: --- Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/5217 > Remove FlinkYarnCLI > --- > > Key: FLINK-8330 > URL: https://issues.apache.org/jira/browse/FLINK-8330 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Remove the {{FlinkYarnCLI}} because it is no longer needed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5217: [FLINK-8330] [flip6] Remove FlinkYarnCLI
Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/5217 ---
[jira] [Commented] (FLINK-8330) Remove FlinkYarnCLI
[ https://issues.apache.org/jira/browse/FLINK-8330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307243#comment-16307243 ] ASF GitHub Bot commented on FLINK-8330: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5217 Thanks for the review @GJL. Merging this PR. > Remove FlinkYarnCLI > --- > > Key: FLINK-8330 > URL: https://issues.apache.org/jira/browse/FLINK-8330 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Remove the {{FlinkYarnCLI}} because it is no longer needed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5217: [FLINK-8330] [flip6] Remove FlinkYarnCLI
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5217 Thanks for the review @GJL. Merging this PR. ---
[jira] [Commented] (FLINK-8268) Test instability for 'TwoPhaseCommitSinkFunctionTest'
[ https://issues.apache.org/jira/browse/FLINK-8268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307241#comment-16307241 ] Till Rohrmann commented on FLINK-8268: -- Just for the record: Another test failure on Travis: https://travis-ci.org/tillrohrmann/flink/jobs/322955365 > Test instability for 'TwoPhaseCommitSinkFunctionTest' > - > > Key: FLINK-8268 > URL: https://issues.apache.org/jira/browse/FLINK-8268 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stephan Ewen >Assignee: Piotr Nowojski >Priority: Critical > Labels: test-stability > > The following exception / failure message occurs. > {code} > Tests run: 5, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.824 sec <<< > FAILURE! - in > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest > testIgnoreCommitExceptionDuringRecovery(org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest) > Time elapsed: 0.068 sec <<< ERROR! > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at java.io.FileOutputStream.writeBytes(Native Method) > at java.io.FileOutputStream.write(FileOutputStream.java:326) > at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) > at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) > at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) > at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) > at java.io.BufferedWriter.flush(BufferedWriter.java:254) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:313) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest$FileBasedSinkFunction.preCommit(TwoPhaseCommitSinkFunctionTest.java:288) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:290) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.testIgnoreCommitExceptionDuringRecovery(TwoPhaseCommitSinkFunctionTest.java:208) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8171) Remove work arounds in Flip6LocalStreamEnvironment
[ https://issues.apache.org/jira/browse/FLINK-8171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8171. Resolution: Fixed Fixed via 8e7a71c053135fdce4073eaf8022d5727d87b5fa > Remove work arounds in Flip6LocalStreamEnvironment > -- > > Key: FLINK-8171 > URL: https://issues.apache.org/jira/browse/FLINK-8171 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > After adding FLINK-7956, it is no longer necessary that the > {{Flip6LocalStreamEnvironment}} waits for the registration of TaskManagers > before submitting a job. Moreover, it is also possible to use slot sharing > when submitting jobs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8171) Remove work arounds in Flip6LocalStreamEnvironment
[ https://issues.apache.org/jira/browse/FLINK-8171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307238#comment-16307238 ] ASF GitHub Bot commented on FLINK-8171: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5101 > Remove work arounds in Flip6LocalStreamEnvironment > -- > > Key: FLINK-8171 > URL: https://issues.apache.org/jira/browse/FLINK-8171 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > After adding FLINK-7956, it is no longer necessary that the > {{Flip6LocalStreamEnvironment}} waits for the registration of TaskManagers > before submitting a job. Moreover, it is also possible to use slot sharing > when submitting jobs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5101: [FLINK-8171] [flip6] Remove work arounds from Flip...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5101 ---
[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state
[ https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307217#comment-16307217 ] ASF GitHub Bot commented on FLINK-8331: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/5218 @fhueske thanks for your explanation, make sense to me. I updated the PR. The changes as follows: - Revert the change of `RowCsvInputFormatTest.testTailingEmptyFields`. - Unity parseField behavior when startPos = limit, except StringParser, that is `StringParser` set error state `EMPTY_COLUMN` and set empty string to `result`,return `limit` value as index, and other set error state `EMPTY_COLUMN`, return `-1` as index; Best, Jincheng > FieldParsers do not correctly set EMPT_COLUMN error state > - > > Key: FLINK-8331 > URL: https://issues.apache.org/jira/browse/FLINK-8331 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0, 1.4.1 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a > field is empty. > Instead, they try to parse the field value from an empty String which fails, > e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}. > The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} > values. The implementation requires that all {{FieldParser}} correctly return > the EMPTY_COLUMN error state in case of an empty field. > Affected {{FieldParser}}: > - BigDecParser > - BigIntParser > - DoubleParser > - FloatParser > - SqlDateParser > - SqlTimeParser > - SqlTimestampParser -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5218: [FLINK-8331][core] FieldParser do not correctly set EMPT_...
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/5218 @fhueske thanks for your explanation, make sense to me. I updated the PR. The changes as follows: - Revert the change of `RowCsvInputFormatTest.testTailingEmptyFields`. - Unity parseField behavior when startPos = limit, except StringParser, that is `StringParser` set error state `EMPTY_COLUMN` and set empty string to `result`,return `limit` value as index, and other set error state `EMPTY_COLUMN`, return `-1` as index; Best, Jincheng ---
[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state
[ https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307201#comment-16307201 ] ASF GitHub Bot commented on FLINK-8331: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5218#discussion_r159139198 --- Diff: flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java --- @@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() { @Test public void testEmptyFieldInIsolation() { try { - String [] emptyStrings = new String[] {"|"}; + FieldParser parser = getParser(); + + byte[] bytes = "|".getBytes(ConfigConstants.DEFAULT_CHARSET); --- End diff -- Yes, we can keep `testEmptyFieldInIsolation`. > FieldParsers do not correctly set EMPT_COLUMN error state > - > > Key: FLINK-8331 > URL: https://issues.apache.org/jira/browse/FLINK-8331 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0, 1.4.1 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a > field is empty. > Instead, they try to parse the field value from an empty String which fails, > e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}. > The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} > values. The implementation requires that all {{FieldParser}} correctly return > the EMPTY_COLUMN error state in case of an empty field. > Affected {{FieldParser}}: > - BigDecParser > - BigIntParser > - DoubleParser > - FloatParser > - SqlDateParser > - SqlTimeParser > - SqlTimestampParser -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5218#discussion_r159139198 --- Diff: flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java --- @@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() { @Test public void testEmptyFieldInIsolation() { try { - String [] emptyStrings = new String[] {"|"}; + FieldParser parser = getParser(); + + byte[] bytes = "|".getBytes(ConfigConstants.DEFAULT_CHARSET); --- End diff -- Yes, we can keep `testEmptyFieldInIsolation`. ---
[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state
[ https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307198#comment-16307198 ] ASF GitHub Bot commented on FLINK-8331: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5218#discussion_r159139167 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -362,61 +365,95 @@ public void readStringFieldsWithTrailingDelimiters() throws Exception { @Test public void testTailingEmptyFields() throws Exception { - String fileContent = "abc|-def|-ghijk\n" + - "abc|-def|-\n" + - "abc|-|-\n" + - "|-|-|-\n" + - "|-|-\n" + - "abc|-def\n"; - FileInputSplit split = createTempFile(fileContent); - - TypeInformation[] fieldTypes = new TypeInformation[]{ - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO}; - - RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|"); - format.setFieldDelimiter("|-"); - format.configure(new Configuration()); - format.open(split); - - Row result = new Row(3); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("abc", result.getField(0)); - assertEquals("def", result.getField(1)); - assertEquals("ghijk", result.getField(2)); + List> dataList = new java.util.ArrayList<>(); + + // test String + dataList.add(new Tuple4<>(BasicTypeInfo.STRING_TYPE_INFO, "bdc", "bdc", "")); + // test BigInt + dataList.add(new Tuple4<>(BasicTypeInfo.BIG_INT_TYPE_INFO, --- End diff -- ooh,you are right. make sense to me. :) > FieldParsers do not correctly set EMPT_COLUMN error state > - > > Key: FLINK-8331 > URL: https://issues.apache.org/jira/browse/FLINK-8331 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0, 1.4.1 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a > field is empty. > Instead, they try to parse the field value from an empty String which fails, > e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}. > The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} > values. The implementation requires that all {{FieldParser}} correctly return > the EMPTY_COLUMN error state in case of an empty field. > Affected {{FieldParser}}: > - BigDecParser > - BigIntParser > - DoubleParser > - FloatParser > - SqlDateParser > - SqlTimeParser > - SqlTimestampParser -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5218#discussion_r159139167 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -362,61 +365,95 @@ public void readStringFieldsWithTrailingDelimiters() throws Exception { @Test public void testTailingEmptyFields() throws Exception { - String fileContent = "abc|-def|-ghijk\n" + - "abc|-def|-\n" + - "abc|-|-\n" + - "|-|-|-\n" + - "|-|-\n" + - "abc|-def\n"; - FileInputSplit split = createTempFile(fileContent); - - TypeInformation[] fieldTypes = new TypeInformation[]{ - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO}; - - RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|"); - format.setFieldDelimiter("|-"); - format.configure(new Configuration()); - format.open(split); - - Row result = new Row(3); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("abc", result.getField(0)); - assertEquals("def", result.getField(1)); - assertEquals("ghijk", result.getField(2)); + List> dataList = new java.util.ArrayList<>(); + + // test String + dataList.add(new Tuple4<>(BasicTypeInfo.STRING_TYPE_INFO, "bdc", "bdc", "")); + // test BigInt + dataList.add(new Tuple4<>(BasicTypeInfo.BIG_INT_TYPE_INFO, --- End diff -- ooh,you are right. make sense to me. :) ---
[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state
[ https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307168#comment-16307168 ] ASF GitHub Bot commented on FLINK-8331: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5218#discussion_r159137345 --- Diff: flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java --- @@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() { @Test public void testEmptyFieldInIsolation() { try { - String [] emptyStrings = new String[] {"|"}; + FieldParser parser = getParser(); + + byte[] bytes = "|".getBytes(ConfigConstants.DEFAULT_CHARSET); + int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + + assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState()); + + if (this.allowsEmptyField()) { + assertTrue("Parser declared the empty string as invalid.", numRead != -1); + assertEquals("Invalid number of bytes read returned.", bytes.length, numRead); + } else { + assertTrue("Parser accepted the empty string.", numRead == -1); + } + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test erroneous: " + e.getMessage()); + } + } + + @Test + public void testTailingEmptyField() { + try { + String[] tailingEmptyFieldStrings = new String[]{"a|", "a||"}; FieldParser parser = getParser(); - for (String emptyString : emptyStrings) { - byte[] bytes = emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET); - int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + for (String tailingEmptyFieldString : tailingEmptyFieldStrings) { + byte[] bytes = tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET); + int numRead = parser.parseField(bytes, 1, bytes.length, new byte[]{'|'}, parser.createValue()); assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState()); - if(this.allowsEmptyField()) { + if (this.allowsEmptyField()) { --- End diff -- Yes, you are right. I misinterpreted the `allowsEmptyField()` method. We need to keep it. > FieldParsers do not correctly set EMPT_COLUMN error state > - > > Key: FLINK-8331 > URL: https://issues.apache.org/jira/browse/FLINK-8331 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0, 1.4.1 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a > field is empty. > Instead, they try to parse the field value from an empty String which fails, > e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}. > The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} > values. The implementation requires that all {{FieldParser}} correctly return > the EMPTY_COLUMN error state in case of an empty field. > Affected {{FieldParser}}: > - BigDecParser > - BigIntParser > - DoubleParser > - FloatParser > - SqlDateParser > - SqlTimeParser > - SqlTimestampParser -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5218#discussion_r159137345 --- Diff: flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java --- @@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() { @Test public void testEmptyFieldInIsolation() { try { - String [] emptyStrings = new String[] {"|"}; + FieldParser parser = getParser(); + + byte[] bytes = "|".getBytes(ConfigConstants.DEFAULT_CHARSET); + int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + + assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState()); + + if (this.allowsEmptyField()) { + assertTrue("Parser declared the empty string as invalid.", numRead != -1); + assertEquals("Invalid number of bytes read returned.", bytes.length, numRead); + } else { + assertTrue("Parser accepted the empty string.", numRead == -1); + } + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test erroneous: " + e.getMessage()); + } + } + + @Test + public void testTailingEmptyField() { + try { + String[] tailingEmptyFieldStrings = new String[]{"a|", "a||"}; FieldParser parser = getParser(); - for (String emptyString : emptyStrings) { - byte[] bytes = emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET); - int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + for (String tailingEmptyFieldString : tailingEmptyFieldStrings) { + byte[] bytes = tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET); + int numRead = parser.parseField(bytes, 1, bytes.length, new byte[]{'|'}, parser.createValue()); assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState()); - if(this.allowsEmptyField()) { + if (this.allowsEmptyField()) { --- End diff -- Yes, you are right. I misinterpreted the `allowsEmptyField()` method. We need to keep it. ---
[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state
[ https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307162#comment-16307162 ] ASF GitHub Bot commented on FLINK-8331: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5218#discussion_r159136839 --- Diff: flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java --- @@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() { @Test public void testEmptyFieldInIsolation() { try { - String [] emptyStrings = new String[] {"|"}; + FieldParser parser = getParser(); + + byte[] bytes = "|".getBytes(ConfigConstants.DEFAULT_CHARSET); + int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + + assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState()); + + if (this.allowsEmptyField()) { + assertTrue("Parser declared the empty string as invalid.", numRead != -1); + assertEquals("Invalid number of bytes read returned.", bytes.length, numRead); + } else { + assertTrue("Parser accepted the empty string.", numRead == -1); + } + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test erroneous: " + e.getMessage()); + } + } + + @Test + public void testTailingEmptyField() { + try { + String[] tailingEmptyFieldStrings = new String[]{"a|", "a||"}; FieldParser parser = getParser(); - for (String emptyString : emptyStrings) { - byte[] bytes = emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET); - int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + for (String tailingEmptyFieldString : tailingEmptyFieldStrings) { + byte[] bytes = tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET); + int numRead = parser.parseField(bytes, 1, bytes.length, new byte[]{'|'}, parser.createValue()); assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState()); - if(this.allowsEmptyField()) { + if (this.allowsEmptyField()) { --- End diff -- Oh, I find you added 4 test file ( `QuotedStringParserTest` | `QuotedStringValueParserTest` |`UnquotedStringParserTest`|`UnquotedStringValueParserTest` ) `allowsEmptyField` is true. so i keep this `allowsEmptyField` in `ParserTestBase`. > FieldParsers do not correctly set EMPT_COLUMN error state > - > > Key: FLINK-8331 > URL: https://issues.apache.org/jira/browse/FLINK-8331 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0, 1.4.1 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a > field is empty. > Instead, they try to parse the field value from an empty String which fails, > e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}. > The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} > values. The implementation requires that all {{FieldParser}} correctly return > the EMPTY_COLUMN error state in case of an empty field. > Affected {{FieldParser}}: > - BigDecParser > - BigIntParser > - DoubleParser > - FloatParser > - SqlDateParser > - SqlTimeParser > - SqlTimestampParser -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5218#discussion_r159136839 --- Diff: flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java --- @@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() { @Test public void testEmptyFieldInIsolation() { try { - String [] emptyStrings = new String[] {"|"}; + FieldParser parser = getParser(); + + byte[] bytes = "|".getBytes(ConfigConstants.DEFAULT_CHARSET); + int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + + assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState()); + + if (this.allowsEmptyField()) { + assertTrue("Parser declared the empty string as invalid.", numRead != -1); + assertEquals("Invalid number of bytes read returned.", bytes.length, numRead); + } else { + assertTrue("Parser accepted the empty string.", numRead == -1); + } + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test erroneous: " + e.getMessage()); + } + } + + @Test + public void testTailingEmptyField() { + try { + String[] tailingEmptyFieldStrings = new String[]{"a|", "a||"}; FieldParser parser = getParser(); - for (String emptyString : emptyStrings) { - byte[] bytes = emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET); - int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + for (String tailingEmptyFieldString : tailingEmptyFieldStrings) { + byte[] bytes = tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET); + int numRead = parser.parseField(bytes, 1, bytes.length, new byte[]{'|'}, parser.createValue()); assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState()); - if(this.allowsEmptyField()) { + if (this.allowsEmptyField()) { --- End diff -- Oh, I find you added 4 test file ( `QuotedStringParserTest` | `QuotedStringValueParserTest` |`UnquotedStringParserTest`|`UnquotedStringValueParserTest` ) `allowsEmptyField` is true. so i keep this `allowsEmptyField` in `ParserTestBase`. ---
[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state
[ https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307153#comment-16307153 ] ASF GitHub Bot commented on FLINK-8331: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5218#discussion_r159135908 --- Diff: flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java --- @@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() { @Test public void testEmptyFieldInIsolation() { try { - String [] emptyStrings = new String[] {"|"}; + FieldParser parser = getParser(); + + byte[] bytes = "|".getBytes(ConfigConstants.DEFAULT_CHARSET); + int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + + assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState()); + + if (this.allowsEmptyField()) { + assertTrue("Parser declared the empty string as invalid.", numRead != -1); + assertEquals("Invalid number of bytes read returned.", bytes.length, numRead); + } else { + assertTrue("Parser accepted the empty string.", numRead == -1); + } + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test erroneous: " + e.getMessage()); + } + } + + @Test + public void testTailingEmptyField() { + try { + String[] tailingEmptyFieldStrings = new String[]{"a|", "a||"}; FieldParser parser = getParser(); - for (String emptyString : emptyStrings) { - byte[] bytes = emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET); - int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + for (String tailingEmptyFieldString : tailingEmptyFieldStrings) { + byte[] bytes = tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET); + int numRead = parser.parseField(bytes, 1, bytes.length, new byte[]{'|'}, parser.createValue()); assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState()); - if(this.allowsEmptyField()) { + if (this.allowsEmptyField()) { --- End diff -- We should remove the `allowsEmptyField()` method from `ParserTestBase`. All parsers have to support empty fields. > FieldParsers do not correctly set EMPT_COLUMN error state > - > > Key: FLINK-8331 > URL: https://issues.apache.org/jira/browse/FLINK-8331 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0, 1.4.1 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a > field is empty. > Instead, they try to parse the field value from an empty String which fails, > e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}. > The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} > values. The implementation requires that all {{FieldParser}} correctly return > the EMPTY_COLUMN error state in case of an empty field. > Affected {{FieldParser}}: > - BigDecParser > - BigIntParser > - DoubleParser > - FloatParser > - SqlDateParser > - SqlTimeParser > - SqlTimestampParser -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state
[ https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307155#comment-16307155 ] ASF GitHub Bot commented on FLINK-8331: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5218#discussion_r159136001 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -362,61 +365,95 @@ public void readStringFieldsWithTrailingDelimiters() throws Exception { @Test public void testTailingEmptyFields() throws Exception { - String fileContent = "abc|-def|-ghijk\n" + - "abc|-def|-\n" + - "abc|-|-\n" + - "|-|-|-\n" + - "|-|-\n" + - "abc|-def\n"; - FileInputSplit split = createTempFile(fileContent); - - TypeInformation[] fieldTypes = new TypeInformation[]{ - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO}; - - RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|"); - format.setFieldDelimiter("|-"); - format.configure(new Configuration()); - format.open(split); - - Row result = new Row(3); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("abc", result.getField(0)); - assertEquals("def", result.getField(1)); - assertEquals("ghijk", result.getField(2)); + List> dataList = new java.util.ArrayList<>(); + + // test String + dataList.add(new Tuple4<>(BasicTypeInfo.STRING_TYPE_INFO, "bdc", "bdc", "")); + // test BigInt + dataList.add(new Tuple4<>(BasicTypeInfo.BIG_INT_TYPE_INFO, --- End diff -- I think we can extend this test but should not exhaustively test all types. It's the responsibility of the different `FieldParser` to implement the empty field logic correctly. Hence, this should be tested in the field parser tests and not here. > FieldParsers do not correctly set EMPT_COLUMN error state > - > > Key: FLINK-8331 > URL: https://issues.apache.org/jira/browse/FLINK-8331 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0, 1.4.1 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a > field is empty. > Instead, they try to parse the field value from an empty String which fails, > e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}. > The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} > values. The implementation requires that all {{FieldParser}} correctly return > the EMPTY_COLUMN error state in case of an empty field. > Affected {{FieldParser}}: > - BigDecParser > - BigIntParser > - DoubleParser > - FloatParser > - SqlDateParser > - SqlTimeParser > - SqlTimestampParser -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8331) FieldParsers do not correctly set EMPT_COLUMN error state
[ https://issues.apache.org/jira/browse/FLINK-8331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307154#comment-16307154 ] ASF GitHub Bot commented on FLINK-8331: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5218#discussion_r159135944 --- Diff: flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java --- @@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() { @Test public void testEmptyFieldInIsolation() { try { - String [] emptyStrings = new String[] {"|"}; + FieldParser parser = getParser(); + + byte[] bytes = "|".getBytes(ConfigConstants.DEFAULT_CHARSET); --- End diff -- this is the same case as `"a|"` in `testTailingEmptyField()`. We only need one of these. > FieldParsers do not correctly set EMPT_COLUMN error state > - > > Key: FLINK-8331 > URL: https://issues.apache.org/jira/browse/FLINK-8331 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.0, 1.4.1 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Some {{FieldParser}} do not correctly set the EMPTY_COLUMN error state if a > field is empty. > Instead, they try to parse the field value from an empty String which fails, > e.g., in case of the {{DoubleParser}} with a {{NumberFormatException}}. > The {{RowCsvInputFormat}} has a flag to interpret empty fields as {{null}} > values. The implementation requires that all {{FieldParser}} correctly return > the EMPTY_COLUMN error state in case of an empty field. > Affected {{FieldParser}}: > - BigDecParser > - BigIntParser > - DoubleParser > - FloatParser > - SqlDateParser > - SqlTimeParser > - SqlTimestampParser -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5218#discussion_r159135944 --- Diff: flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java --- @@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() { @Test public void testEmptyFieldInIsolation() { try { - String [] emptyStrings = new String[] {"|"}; + FieldParser parser = getParser(); + + byte[] bytes = "|".getBytes(ConfigConstants.DEFAULT_CHARSET); --- End diff -- this is the same case as `"a|"` in `testTailingEmptyField()`. We only need one of these. ---
[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5218#discussion_r159135908 --- Diff: flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java --- @@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() { @Test public void testEmptyFieldInIsolation() { try { - String [] emptyStrings = new String[] {"|"}; + FieldParser parser = getParser(); + + byte[] bytes = "|".getBytes(ConfigConstants.DEFAULT_CHARSET); + int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + + assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState()); + + if (this.allowsEmptyField()) { + assertTrue("Parser declared the empty string as invalid.", numRead != -1); + assertEquals("Invalid number of bytes read returned.", bytes.length, numRead); + } else { + assertTrue("Parser accepted the empty string.", numRead == -1); + } + } catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test erroneous: " + e.getMessage()); + } + } + + @Test + public void testTailingEmptyField() { + try { + String[] tailingEmptyFieldStrings = new String[]{"a|", "a||"}; FieldParser parser = getParser(); - for (String emptyString : emptyStrings) { - byte[] bytes = emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET); - int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + for (String tailingEmptyFieldString : tailingEmptyFieldStrings) { + byte[] bytes = tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET); + int numRead = parser.parseField(bytes, 1, bytes.length, new byte[]{'|'}, parser.createValue()); assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState()); - if(this.allowsEmptyField()) { + if (this.allowsEmptyField()) { --- End diff -- We should remove the `allowsEmptyField()` method from `ParserTestBase`. All parsers have to support empty fields. ---
[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5218#discussion_r159136001 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -362,61 +365,95 @@ public void readStringFieldsWithTrailingDelimiters() throws Exception { @Test public void testTailingEmptyFields() throws Exception { - String fileContent = "abc|-def|-ghijk\n" + - "abc|-def|-\n" + - "abc|-|-\n" + - "|-|-|-\n" + - "|-|-\n" + - "abc|-def\n"; - FileInputSplit split = createTempFile(fileContent); - - TypeInformation[] fieldTypes = new TypeInformation[]{ - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO}; - - RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|"); - format.setFieldDelimiter("|-"); - format.configure(new Configuration()); - format.open(split); - - Row result = new Row(3); - - result = format.nextRecord(result); - assertNotNull(result); - assertEquals("abc", result.getField(0)); - assertEquals("def", result.getField(1)); - assertEquals("ghijk", result.getField(2)); + List> dataList = new java.util.ArrayList<>(); + + // test String + dataList.add(new Tuple4<>(BasicTypeInfo.STRING_TYPE_INFO, "bdc", "bdc", "")); + // test BigInt + dataList.add(new Tuple4<>(BasicTypeInfo.BIG_INT_TYPE_INFO, --- End diff -- I think we can extend this test but should not exhaustively test all types. It's the responsibility of the different `FieldParser` to implement the empty field logic correctly. Hence, this should be tested in the field parser tests and not here. ---
[jira] [Commented] (FLINK-7155) Add Influxdb metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-7155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307152#comment-16307152 ] ASF GitHub Bot commented on FLINK-7155: --- Github user gobozov commented on a diff in the pull request: https://github.com/apache/flink/pull/4299#discussion_r159136038 --- Diff: flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/InfluxdbReporter.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.influxdb; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.dropwizard.ScheduledDropwizardReporter; +import org.apache.flink.metrics.MetricConfig; + +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.ScheduledReporter; +import metrics_influxdb.HttpInfluxdbProtocol; +import metrics_influxdb.api.measurements.CategoriesMetricMeasurementTransformer; + +import java.util.concurrent.TimeUnit; + +/** + * This class acts as a factory for the {@link metrics_influxdb.InfluxdbReporter} and allows using it as a Flink + * reporter. + */ +@PublicEvolving +public class InfluxdbReporter extends ScheduledDropwizardReporter { + + public static final String ARG_USER = "user"; + public static final String ARG_PASSWORD = "password"; + public static final String ARG_DB = "db"; + + @Override + public ScheduledReporter getReporter(final MetricConfig config) { + final String host = config.getString(ARG_HOST, "localhost"); + final Integer port = config.getInteger(ARG_PORT, HttpInfluxdbProtocol.DEFAULT_PORT); + final String user = config.getString(ARG_USER, null); + final String password = config.getString(ARG_PASSWORD, null); + final String db = config.getString(ARG_DB, "flink"); + final String conversionRate = config.getString(ARG_CONVERSION_RATE, null); + final String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null); + + final metrics_influxdb.InfluxdbReporter.Builder builder = + metrics_influxdb.InfluxdbReporter.forRegistry(registry); + + builder.protocol(new HttpInfluxdbProtocol(host, port, user, password, db)); --- End diff -- We built same thing for our needs using same InfluxDb reporter. The disadvantage is that it does not support https, we modified it to support sslContext. I also don't see way to provide tags to reporter builder. > Add Influxdb metrics reporter > - > > Key: FLINK-7155 > URL: https://issues.apache.org/jira/browse/FLINK-7155 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Patrick Lucas >Assignee: Patrick Lucas > > [~jgrier] has a [simple Influxdb metrics reporter for > Flink|https://github.com/jgrier/flink-stuff/tree/master/flink-influx-reporter] > that is a thing wrapper around [a lightweight, public-domain Influxdb > reporter|https://github.com/davidB/metrics-influxdb] for Codahale metrics. > We can implement this very easily in Java in the same as as > flink-metrics-graphite. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4299: [FLINK-7155] [metrics] Add Influxdb reporter
Github user gobozov commented on a diff in the pull request: https://github.com/apache/flink/pull/4299#discussion_r159136038 --- Diff: flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/InfluxdbReporter.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.influxdb; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.dropwizard.ScheduledDropwizardReporter; +import org.apache.flink.metrics.MetricConfig; + +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.ScheduledReporter; +import metrics_influxdb.HttpInfluxdbProtocol; +import metrics_influxdb.api.measurements.CategoriesMetricMeasurementTransformer; + +import java.util.concurrent.TimeUnit; + +/** + * This class acts as a factory for the {@link metrics_influxdb.InfluxdbReporter} and allows using it as a Flink + * reporter. + */ +@PublicEvolving +public class InfluxdbReporter extends ScheduledDropwizardReporter { + + public static final String ARG_USER = "user"; + public static final String ARG_PASSWORD = "password"; + public static final String ARG_DB = "db"; + + @Override + public ScheduledReporter getReporter(final MetricConfig config) { + final String host = config.getString(ARG_HOST, "localhost"); + final Integer port = config.getInteger(ARG_PORT, HttpInfluxdbProtocol.DEFAULT_PORT); + final String user = config.getString(ARG_USER, null); + final String password = config.getString(ARG_PASSWORD, null); + final String db = config.getString(ARG_DB, "flink"); + final String conversionRate = config.getString(ARG_CONVERSION_RATE, null); + final String conversionDuration = config.getString(ARG_CONVERSION_DURATION, null); + + final metrics_influxdb.InfluxdbReporter.Builder builder = + metrics_influxdb.InfluxdbReporter.forRegistry(registry); + + builder.protocol(new HttpInfluxdbProtocol(host, port, user, password, db)); --- End diff -- We built same thing for our needs using same InfluxDb reporter. The disadvantage is that it does not support https, we modified it to support sslContext. I also don't see way to provide tags to reporter builder. ---