[jira] [Commented] (FLINK-6930) Selecting window start / end on row-based Tumble/Slide window causes NPE

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051384#comment-16051384
 ] 

ASF GitHub Bot commented on FLINK-6930:
---

Github user hustfxj commented on the issue:

https://github.com/apache/flink/pull/4133
  
+1


> Selecting window start / end on row-based Tumble/Slide window causes NPE
> 
>
> Key: FLINK-6930
> URL: https://issues.apache.org/jira/browse/FLINK-6930
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> Selecting the start and end properties of a row-based window causes a 
> NullPointerException.
> The following program:
> {code}
> val windowedTable = table
>   .window(Tumble over 2.rows on 'proctime as 'w)
>   .groupBy('w, 'string)
>   .select('string as 'n, 'int.count as 'cnt, 'w.start as 's, 'w.end as 'e)
> {code}
> causes 
> {code}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1556)
>   at 
> org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1551)
>   at DataStreamCalcRule$40.processElement(Unknown Source)
>   at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:67)
>   at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
>   at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:890)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:868)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:75)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:37)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:599)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:456)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> We should validate that the start and end window properties are not accessed 
> if the window is defined on row-counts.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4133: [FLINK-6930] [table] Forbid selecting window start/end on...

2017-06-15 Thread hustfxj
Github user hustfxj commented on the issue:

https://github.com/apache/flink/pull/4133
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-6930) Selecting window start / end on row-based Tumble/Slide window causes NPE

2017-06-15 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-6930:
--

Assignee: Jark Wu

> Selecting window start / end on row-based Tumble/Slide window causes NPE
> 
>
> Key: FLINK-6930
> URL: https://issues.apache.org/jira/browse/FLINK-6930
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> Selecting the start and end properties of a row-based window causes a 
> NullPointerException.
> The following program:
> {code}
> val windowedTable = table
>   .window(Tumble over 2.rows on 'proctime as 'w)
>   .groupBy('w, 'string)
>   .select('string as 'n, 'int.count as 'cnt, 'w.start as 's, 'w.end as 'e)
> {code}
> causes 
> {code}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1556)
>   at 
> org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1551)
>   at DataStreamCalcRule$40.processElement(Unknown Source)
>   at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:67)
>   at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
>   at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:890)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:868)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:75)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:37)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:599)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:456)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> We should validate that the start and end window properties are not accessed 
> if the window is defined on row-counts.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6925) Add CONCAT/CONCAT_WS supported in SQL

2017-06-15 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng reassigned FLINK-6925:
--

Assignee: sunjincheng

> Add CONCAT/CONCAT_WS supported in SQL
> -
>
> Key: FLINK-6925
> URL: https://issues.apache.org/jira/browse/FLINK-6925
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> CONCAT(str1,str2,...)Returns the string that results from concatenating the 
> arguments. May have one or more arguments. If all arguments are nonbinary 
> strings, the result is a nonbinary string. If the arguments include any 
> binary strings, the result is a binary string. A numeric argument is 
> converted to its equivalent nonbinary string form.
> CONCAT() returns NULL if any argument is NULL.
> * Syntax:
> CONCAT(str1,str2,...) 
> * Arguments
> ** str1,str2,... -
> * Return Types
>   string
> * Example:
>   CONCAT('F', 'lin', 'k') -> 'Flink'
>   CONCAT('M', NULL, 'L') -> NULL
>   CONCAT(14.3) -> '14.3'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat]
> CONCAT_WS() stands for Concatenate With Separator and is a special form of 
> CONCAT(). The first argument is the separator for the rest of the arguments. 
> The separator is added between the strings to be concatenated. The separator 
> can be a string, as can the rest of the arguments. If the separator is NULL, 
> the result is NULL.
> * Syntax:
> CONCAT_WS(separator,str1,str2,...)
> * Arguments
> ** separator -
> ** str1,str2,... -
> * Return Types
>   string
> * Example:
>   CONCAT_WS(',','First name','Second name','Last Name') -> 'First name,Second 
> name,Last Name'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat-ws]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6934) Consider moving LRUCache class

2017-06-15 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051346#comment-16051346
 ] 

mingleizhang commented on FLINK-6934:
-

I would say just suggest remove it. Actually, it belongs to until tools but not 
used almost 3 years. If I would say let us remove {{LRUCache}}, we should 
remove {{LRUCacheMap}} class also I will say.  [~uce]  [~Zentol] What do you 
think ? 

> Consider moving LRUCache class
> --
>
> Key: FLINK-6934
> URL: https://issues.apache.org/jira/browse/FLINK-6934
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> LRUCache class is not used any more. So, I would suggest remove it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6934) Consider moving LRUCache class

2017-06-15 Thread mingleizhang (JIRA)
mingleizhang created FLINK-6934:
---

 Summary: Consider moving LRUCache class
 Key: FLINK-6934
 URL: https://issues.apache.org/jira/browse/FLINK-6934
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Reporter: mingleizhang
Assignee: mingleizhang


LRUCache class is not used any more. So, I would suggest remove it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6930) Selecting window start / end on row-based Tumble/Slide window causes NPE

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051342#comment-16051342
 ] 

ASF GitHub Bot commented on FLINK-6930:
---

GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/4133

[FLINK-6930] [table] Forbid selecting window start/end on row-based 
Tumble/Slide windows

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink forbid-rowbased-window-start

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4133.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4133


commit 4cf422acd25b18ebc285ace542d49ebc646707db
Author: Jark Wu 
Date:   2017-06-16T03:49:31Z

[FLINK-6930] [table] Forbid selecting window start/end on row-based 
Tumble/Slide windows




> Selecting window start / end on row-based Tumble/Slide window causes NPE
> 
>
> Key: FLINK-6930
> URL: https://issues.apache.org/jira/browse/FLINK-6930
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Fabian Hueske
>
> Selecting the start and end properties of a row-based window causes a 
> NullPointerException.
> The following program:
> {code}
> val windowedTable = table
>   .window(Tumble over 2.rows on 'proctime as 'w)
>   .groupBy('w, 'string)
>   .select('string as 'n, 'int.count as 'cnt, 'w.start as 's, 'w.end as 'e)
> {code}
> causes 
> {code}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1556)
>   at 
> org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1551)
>   at DataStreamCalcRule$40.processElement(Unknown Source)
>   at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:67)
>   at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
>   at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:890)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:868)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:75)
>   at 
> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:37)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:599)
>   at 
> 

[GitHub] flink pull request #4133: [FLINK-6930] [table] Forbid selecting window start...

2017-06-15 Thread wuchong
GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/4133

[FLINK-6930] [table] Forbid selecting window start/end on row-based 
Tumble/Slide windows

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink forbid-rowbased-window-start

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4133.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4133


commit 4cf422acd25b18ebc285ace542d49ebc646707db
Author: Jark Wu 
Date:   2017-06-16T03:49:31Z

[FLINK-6930] [table] Forbid selecting window start/end on row-based 
Tumble/Slide windows




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6933) Refactor NFACompiler to reduce code duplication

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051335#comment-16051335
 ] 

ASF GitHub Bot commented on FLINK-6933:
---

GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4132

[FLINK-6933] [cep] Refactor NFACompiler to reduce code duplication

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink FLINK-6933

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4132.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4132


commit 97da09af4828661fcde22c9d09aee69cfa7d7ac5
Author: Dian Fu 
Date:   2017-06-16T03:43:04Z

[FLINK-6933] [cep] Refactor NFACompiler to reduce code duplication




> Refactor NFACompiler to reduce code duplication
> ---
>
> Key: FLINK-6933
> URL: https://issues.apache.org/jira/browse/FLINK-6933
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> I find that part of the code in NFACompiler is duplicate, this JIRA tries to 
> eliminate the code duplication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4132: [FLINK-6933] [cep] Refactor NFACompiler to reduce ...

2017-06-15 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4132

[FLINK-6933] [cep] Refactor NFACompiler to reduce code duplication

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink FLINK-6933

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4132.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4132


commit 97da09af4828661fcde22c9d09aee69cfa7d7ac5
Author: Dian Fu 
Date:   2017-06-16T03:43:04Z

[FLINK-6933] [cep] Refactor NFACompiler to reduce code duplication




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6933) Refactor NFACompiler to reduce code duplication

2017-06-15 Thread Dian Fu (JIRA)
Dian Fu created FLINK-6933:
--

 Summary: Refactor NFACompiler to reduce code duplication
 Key: FLINK-6933
 URL: https://issues.apache.org/jira/browse/FLINK-6933
 Project: Flink
  Issue Type: Bug
  Components: CEP
Reporter: Dian Fu
Assignee: Dian Fu


I find that part of the code in NFACompiler is duplicate, this JIRA tries to 
eliminate the code duplication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6891) Add LOG(X) supported in SQL

2017-06-15 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6891:
---
Description: 
LOG(X), LOG(B,X)

If called with one parameter, this function returns the natural logarithm of X. 
If X is less than or equal to 0.0E0, the function returns NULL and (as of MySQL 
5.7.4) a warning “Invalid argument for logarithm” is reported.
The inverse of this function (when called with a single argument) is the EXP() 
function.

If called with two parameters, this function returns the logarithm of X to the 
base B. If X is less than or equal to 0, or if B is less than or equal to 1, 
then NULL is returned.
* Example:
 LOG(2) -> 0.69314718055995
 LOG(-2) -> NULL
 LOG(2,65536) -> 16
 LOG(10,100) -> 2
 LOG(1,100) -> NULL

* See more:
** [MySQL| 
https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log]

--NOTE-- In this JIRA. NULL case will throw IllegalArgumentException.



  was:
LOG(X), LOG(B,X)

If called with one parameter, this function returns the natural logarithm of X. 
If X is less than or equal to 0.0E0, the function returns NULL and (as of MySQL 
5.7.4) a warning “Invalid argument for logarithm” is reported.
The inverse of this function (when called with a single argument) is the EXP() 
function.

If called with two parameters, this function returns the logarithm of X to the 
base B. If X is less than or equal to 0, or if B is less than or equal to 1, 
then NULL is returned.
* Example:
 LOG(2) -> 0.69314718055995
 LOG(-2) -> NULL
 LOG(2,65536) -> 16
 LOG(10,100) -> 2
 LOG(1,100) -> NULL

* See more:
** [MySQL| 
https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log]

--NOTE-- In this JIRA. NULL case will thow IllegalArgumentException.




> Add LOG(X) supported in SQL
> ---
>
> Key: FLINK-6891
> URL: https://issues.apache.org/jira/browse/FLINK-6891
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> LOG(X), LOG(B,X)
> If called with one parameter, this function returns the natural logarithm of 
> X. If X is less than or equal to 0.0E0, the function returns NULL and (as of 
> MySQL 5.7.4) a warning “Invalid argument for logarithm” is reported.
> The inverse of this function (when called with a single argument) is the 
> EXP() function.
> If called with two parameters, this function returns the logarithm of X to 
> the base B. If X is less than or equal to 0, or if B is less than or equal to 
> 1, then NULL is returned.
> * Example:
>  LOG(2) -> 0.69314718055995
>  LOG(-2) -> NULL
>  LOG(2,65536) -> 16
>  LOG(10,100) -> 2
>  LOG(1,100) -> NULL
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log]
> --NOTE-- In this JIRA. NULL case will throw IllegalArgumentException.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6891) Add LOG(X) supported in SQL

2017-06-15 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6891:
---
Description: 
LOG(X), LOG(B,X)

If called with one parameter, this function returns the natural logarithm of X. 
If X is less than or equal to 0.0E0, the function returns NULL and (as of MySQL 
5.7.4) a warning “Invalid argument for logarithm” is reported.
The inverse of this function (when called with a single argument) is the EXP() 
function.

If called with two parameters, this function returns the logarithm of X to the 
base B. If X is less than or equal to 0, or if B is less than or equal to 1, 
then NULL is returned.
* Example:
 LOG(2) -> 0.69314718055995
 LOG(-2) -> NULL
 LOG(2,65536) -> 16
 LOG(10,100) -> 2
 LOG(1,100) -> NULL

* See more:
** [MySQL| 
https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log]

--NOTE-- In this JIRA. NULL case will thow IllegalArgumentException.



  was:
LOG(X), LOG(B,X)

If called with one parameter, this function returns the natural logarithm of X. 
If X is less than or equal to 0.0E0, the function returns NULL and (as of MySQL 
5.7.4) a warning “Invalid argument for logarithm” is reported.
The inverse of this function (when called with a single argument) is the EXP() 
function.

If called with two parameters, this function returns the logarithm of X to the 
base B. If X is less than or equal to 0, or if B is less than or equal to 1, 
then NULL is returned.
* Example:
 LOG(2) -> 0.69314718055995
 LOG(-2) -> NULL
 LOG(2,65536) -> 16
 LOG(10,100) -> 2
 LOG(1,100) -> NULL

* See more:
** [MySQL| 
https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log]

--NOTE-- In this JIRA. we only implement LOG(X). 




> Add LOG(X) supported in SQL
> ---
>
> Key: FLINK-6891
> URL: https://issues.apache.org/jira/browse/FLINK-6891
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> LOG(X), LOG(B,X)
> If called with one parameter, this function returns the natural logarithm of 
> X. If X is less than or equal to 0.0E0, the function returns NULL and (as of 
> MySQL 5.7.4) a warning “Invalid argument for logarithm” is reported.
> The inverse of this function (when called with a single argument) is the 
> EXP() function.
> If called with two parameters, this function returns the logarithm of X to 
> the base B. If X is less than or equal to 0, or if B is less than or equal to 
> 1, then NULL is returned.
> * Example:
>  LOG(2) -> 0.69314718055995
>  LOG(-2) -> NULL
>  LOG(2,65536) -> 16
>  LOG(10,100) -> 2
>  LOG(1,100) -> NULL
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log]
> --NOTE-- In this JIRA. NULL case will thow IllegalArgumentException.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051320#comment-16051320
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4111
  
Travis reported a bug about composite type. Fixed it. Waiting for the new 
CI pass.


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long recordTime, String urlKey) {
> super();
> this.recordTime = recordTime;
> this.urlKey = urlKey;
> }
> public String getUrlKey() {
> return urlKey;
> }
> public void setUrlKey(String urlKey) {
> this.urlKey = urlKey;
> }
> public long getRecordTime() {
> return recordTime;
> }
> public void setRecordTime(long recordTime) {
> this.recordTime = recordTime;
> }
> public long getHttpGetMessageCount() {
> return httpGetMessageCount;
> }
> public void setHttpGetMessageCount(long httpGetMessageCount) {
> this.httpGetMessageCount = httpGetMessageCount;
> }
> public long getHttpPostMessageCount() {
> return httpPostMessageCount;
> }
> public void setHttpPostMessageCount(long httpPostMessageCount) {
> this.httpPostMessageCount = httpPostMessageCount;
> }
> public long getUplink() {
> return uplink;
> }
> public void setUplink(long uplink) {
> this.uplink = uplink;
> }
> public long 

[GitHub] flink issue #4111: [FLINK-6896][table] Fix generate PojoType input result ex...

2017-06-15 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4111
  
Travis reported a bug about composite type. Fixed it. Waiting for the new 
CI pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6932) Update the inaccessible Dataflow Model paper link

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16051269#comment-16051269
 ] 

ASF GitHub Bot commented on FLINK-6932:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/4131#discussion_r122348284
  
--- Diff: docs/dev/event_time.md ---
@@ -146,7 +146,7 @@ to use timestamp assignment and watermark generation in 
the Flink DataStream API
 *Note: Flink implements many techniques from the Dataflow Model. For a 
good introduction to event time and watermarks, have a look at the articles 
below.*
 
   - [Streaming 
101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by 
Tyler Akidau
-  - The [Dataflow Model 
paper](https://static.googleusercontent.com/media/research.google.com/en/pubs/archive/43864.pdf)
--- End diff --

Yes. Exactly. I will do a change to this.


> Update the inaccessible Dataflow Model paper link
> -
>
> Key: FLINK-6932
> URL: https://issues.apache.org/jira/browse/FLINK-6932
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: mingleizhang
>Assignee: mingleizhang
>  Labels: None
>
>  I tried to access the Dataflow Model paper link which under 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html],
>  then it gives me an error [ 404 ] instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4131: [FLINK-6932] [doc] Update inaccessible Dataflow Mo...

2017-06-15 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/4131#discussion_r122348284
  
--- Diff: docs/dev/event_time.md ---
@@ -146,7 +146,7 @@ to use timestamp assignment and watermark generation in 
the Flink DataStream API
 *Note: Flink implements many techniques from the Dataflow Model. For a 
good introduction to event time and watermarks, have a look at the articles 
below.*
 
   - [Streaming 
101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by 
Tyler Akidau
-  - The [Dataflow Model 
paper](https://static.googleusercontent.com/media/research.google.com/en/pubs/archive/43864.pdf)
--- End diff --

Yes. Exactly. I will do a change to this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6310) LocalExecutor#endSession() uses wrong lock for synchronization

2017-06-15 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-6310:
--
Description: 
Here is related code:

{code}
  public void endSession(JobID jobID) throws Exception {
synchronized (LocalExecutor.class) {
  LocalFlinkMiniCluster flink = this.flink;
{code}

In other places, lock field is used for synchronization:
{code}
  public void start() throws Exception {
synchronized (lock) {
{code}

  was:
Here is related code:
{code}
  public void endSession(JobID jobID) throws Exception {
synchronized (LocalExecutor.class) {
  LocalFlinkMiniCluster flink = this.flink;
{code}

In other places, lock field is used for synchronization:
{code}
  public void start() throws Exception {
synchronized (lock) {
{code}


> LocalExecutor#endSession() uses wrong lock for synchronization
> --
>
> Key: FLINK-6310
> URL: https://issues.apache.org/jira/browse/FLINK-6310
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Ted Yu
>
> Here is related code:
> {code}
>   public void endSession(JobID jobID) throws Exception {
> synchronized (LocalExecutor.class) {
>   LocalFlinkMiniCluster flink = this.flink;
> {code}
> In other places, lock field is used for synchronization:
> {code}
>   public void start() throws Exception {
> synchronized (lock) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase

2017-06-15 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-6105:
--
Description: 
When catching InterruptedException, we should throw InterruptedIOException 
instead of IOException.

The following example is from HadoopInputFormatBase :
{code}
try {
  splits = this.mapreduceInputFormat.getSplits(jobContext);
} catch (InterruptedException e) {
  throw new IOException("Could not get Splits.", e);
}
{code}
There may be other places where IOE is thrown.

  was:
When catching InterruptedException, we should throw InterruptedIOException 
instead of IOException.

The following example is from HadoopInputFormatBase :
{code}
try {
  splits = this.mapreduceInputFormat.getSplits(jobContext);
} catch (InterruptedException e) {
  throw new IOException("Could not get Splits.", e);
}
{code}

There may be other places where IOE is thrown.


> Properly handle InterruptedException in HadoopInputFormatBase
> -
>
> Key: FLINK-6105
> URL: https://issues.apache.org/jira/browse/FLINK-6105
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>
> When catching InterruptedException, we should throw InterruptedIOException 
> instead of IOException.
> The following example is from HadoopInputFormatBase :
> {code}
> try {
>   splits = this.mapreduceInputFormat.getSplits(jobContext);
> } catch (InterruptedException e) {
>   throw new IOException("Could not get Splits.", e);
> }
> {code}
> There may be other places where IOE is thrown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-6928) Kafka sink: default topic should not need to exist

2017-06-15 Thread Erik van Oosten (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050925#comment-16050925
 ] 

Erik van Oosten edited comment on FLINK-6928 at 6/15/17 6:39 PM:
-

In my ideal world method {{getTargetTopic}} would be removed from 
{{*SerializationSchema}} and moved to a new interface, e.g. 
{{DestinationTopic}}.
Then there are two constructor variants for {{FlinkKafkaProducer}}: one would 
take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both 
would have the simplified {{*SerializationSchema}} as argument. To make things 
simple internally, the first variant could wrap the topic in a implementation 
of {{DestinationTopic}} that always returns the same topic.


was (Author: erikvanoosten):
In my ideal world method {{getTargetTopic}} would be removed from 
{{SerializationSchema}} and moved to a new interface, e.g. {{DestinationTopic}}.
Then there are two constructor variants for {{FlinkKafkaProducer}}: one would 
take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both 
would have the simplified {{SerializationSchema}} as argument. To make things 
simple internally, the first variant could wrap the topic in a implementation 
of {{DestinationTopic}} that always returns the same topic.

> Kafka sink: default topic should not need to exist
> --
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka sink, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.
> In addition, it would be nice if we could signal that the defaultTopic is not 
> needed by passing {{null}}. Currently, a value for the defaultTopic is 
> required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6928) Kafka sink: default topic should not need to exist

2017-06-15 Thread Erik van Oosten (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050925#comment-16050925
 ] 

Erik van Oosten commented on FLINK-6928:


In my ideal world method {{getTargetTopic}} would be removed from 
{{SerializationSchema}} and moved to a new interface, e.g. {{DestinationTopic}}.
Then there are two constructor variants for {{FlinkKafkaProducer}}: one would 
take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both 
would have the simplified {{SerializationSchema}} as argument. To make things 
simple internally, the first variant could wrap the topic in a implementation 
of {{DestinationTopic}} that always returns the same topic.

> Kafka sink: default topic should not need to exist
> --
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka sink, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.
> In addition, it would be nice if we could signal that the defaultTopic is not 
> needed by passing {{null}}. Currently, a value for the defaultTopic is 
> required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6932) Update the inaccessible Dataflow Model paper link

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050830#comment-16050830
 ] 

ASF GitHub Bot commented on FLINK-6932:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/4131#discussion_r122263994
  
--- Diff: docs/dev/event_time.md ---
@@ -146,7 +146,7 @@ to use timestamp assignment and watermark generation in 
the Flink DataStream API
 *Note: Flink implements many techniques from the Dataflow Model. For a 
good introduction to event time and watermarks, have a look at the articles 
below.*
 
   - [Streaming 
101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by 
Tyler Akidau
-  - The [Dataflow Model 
paper](https://static.googleusercontent.com/media/research.google.com/en/pubs/archive/43864.pdf)
--- End diff --

Looks like this should have been 
`https://research.google.com/pubs/archive/43864.pdf` before the redirect 
(linked from `https://research.google.com/pubs/pub43864.html`)


> Update the inaccessible Dataflow Model paper link
> -
>
> Key: FLINK-6932
> URL: https://issues.apache.org/jira/browse/FLINK-6932
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: mingleizhang
>Assignee: mingleizhang
>  Labels: None
>
>  I tried to access the Dataflow Model paper link which under 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html],
>  then it gives me an error [ 404 ] instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4131: [FLINK-6932] [doc] Update inaccessible Dataflow Mo...

2017-06-15 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/4131#discussion_r122263994
  
--- Diff: docs/dev/event_time.md ---
@@ -146,7 +146,7 @@ to use timestamp assignment and watermark generation in 
the Flink DataStream API
 *Note: Flink implements many techniques from the Dataflow Model. For a 
good introduction to event time and watermarks, have a look at the articles 
below.*
 
   - [Streaming 
101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by 
Tyler Akidau
-  - The [Dataflow Model 
paper](https://static.googleusercontent.com/media/research.google.com/en/pubs/archive/43864.pdf)
--- End diff --

Looks like this should have been 
`https://research.google.com/pubs/archive/43864.pdf` before the redirect 
(linked from `https://research.google.com/pubs/pub43864.html`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6932) Update the inaccessible Dataflow Model paper link

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050785#comment-16050785
 ] 

ASF GitHub Bot commented on FLINK-6932:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4131

[FLINK-6932] [doc] Update inaccessible Dataflow Model paper link

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-6932

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4131


commit eb4ad3ece3ca8196035e13e191b22e90510b182b
Author: zhangminglei 
Date:   2017-06-15T17:04:06Z

[FLINK-6932] [doc] Update inaccessible Dataflow Model paper link




> Update the inaccessible Dataflow Model paper link
> -
>
> Key: FLINK-6932
> URL: https://issues.apache.org/jira/browse/FLINK-6932
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: mingleizhang
>Assignee: mingleizhang
>  Labels: None
>
>  I tried to access the Dataflow Model paper link which under 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html],
>  then it gives me an error [ 404 ] instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4131: [FLINK-6932] [doc] Update inaccessible Dataflow Mo...

2017-06-15 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4131

[FLINK-6932] [doc] Update inaccessible Dataflow Model paper link

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-6932

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4131


commit eb4ad3ece3ca8196035e13e191b22e90510b182b
Author: zhangminglei 
Date:   2017-06-15T17:04:06Z

[FLINK-6932] [doc] Update inaccessible Dataflow Model paper link




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6693) Support DATE_FORMAT function in the Table / SQL API

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050782#comment-16050782
 ] 

ASF GitHub Bot commented on FLINK-6693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4078#discussion_r122255933
  
--- Diff: flink-libraries/flink-table/pom.xml ---
@@ -110,6 +110,12 @@ under the License.
 compile
 
 
+   
+   joda-time
+   joda-time
+   provided
--- End diff --

Why is `joda-time` provided? Is it a transitive dependency?


> Support DATE_FORMAT function in the Table / SQL API
> ---
>
> Key: FLINK-6693
> URL: https://issues.apache.org/jira/browse/FLINK-6693
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be quite handy to support the {{DATE_FORMAT}} function in Flink to 
> support various date / time related operations:
> The specification of the {{DATE_FORMAT}} function can be found in 
> https://prestodb.io/docs/current/functions/datetime.html.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6693) Support DATE_FORMAT function in the Table / SQL API

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050783#comment-16050783
 ] 

ASF GitHub Bot commented on FLINK-6693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4078#discussion_r122240819
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime;
+
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.DateTimeFormatterBuilder;
+
+/**
+ * Built-in scalar functions for date time related operations.
+ */
+public class DateTimeFunctions {
--- End diff --

We try to keep the code base of the Table API consistently in Scala. There 
are a few classes in Java which are mostly copied from Calcite.
Can you port this class?


> Support DATE_FORMAT function in the Table / SQL API
> ---
>
> Key: FLINK-6693
> URL: https://issues.apache.org/jira/browse/FLINK-6693
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be quite handy to support the {{DATE_FORMAT}} function in Flink to 
> support various date / time related operations:
> The specification of the {{DATE_FORMAT}} function can be found in 
> https://prestodb.io/docs/current/functions/datetime.html.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6693) Support DATE_FORMAT function in the Table / SQL API

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050784#comment-16050784
 ] 

ASF GitHub Bot commented on FLINK-6693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4078#discussion_r122254102
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class DateTimeFunctionTest extends ExpressionTestBase {
+
+  @Test
+  def testDateFormat(): Unit = {
+testAllApis(
+  DateFormat('f0, "%Y"),
--- End diff --

I think it would be good to have a pattern that uses all features. 


> Support DATE_FORMAT function in the Table / SQL API
> ---
>
> Key: FLINK-6693
> URL: https://issues.apache.org/jira/browse/FLINK-6693
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be quite handy to support the {{DATE_FORMAT}} function in Flink to 
> support various date / time related operations:
> The specification of the {{DATE_FORMAT}} function can be found in 
> https://prestodb.io/docs/current/functions/datetime.html.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4078: [FLINK-6693] [table] Support DATE_FORMAT function ...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4078#discussion_r122254102
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class DateTimeFunctionTest extends ExpressionTestBase {
+
+  @Test
+  def testDateFormat(): Unit = {
+testAllApis(
+  DateFormat('f0, "%Y"),
--- End diff --

I think it would be good to have a pattern that uses all features. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6693) Support DATE_FORMAT function in the Table / SQL API

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050781#comment-16050781
 ] 

ASF GitHub Bot commented on FLINK-6693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4078#discussion_r122255414
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime;
+
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.DateTimeFormatterBuilder;
+
+/**
+ * Built-in scalar functions for date time related operations.
+ */
+public class DateTimeFunctions {
+   private static final int PIVOT_YEAR = 2020;
+
+   private static final ThreadLocalCache 
DATETIME_FORMATTER_CACHE =
+   new ThreadLocalCache(100) {
+   @Override
+   protected DateTimeFormatter getNewInstance(String 
format) {
+   return createDateTimeFormatter(format);
+   }
+   };
+
+   public static String dateFormat(long ts, String formatString) {
+   DateTimeFormatter formatter = 
DATETIME_FORMATTER_CACHE.get(formatString);
--- End diff --

Would it make sense to add a shortcut during code-gen if the pattern is a 
string literal (this should be the most common case) to avoid the lookup?


> Support DATE_FORMAT function in the Table / SQL API
> ---
>
> Key: FLINK-6693
> URL: https://issues.apache.org/jira/browse/FLINK-6693
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> It would be quite handy to support the {{DATE_FORMAT}} function in Flink to 
> support various date / time related operations:
> The specification of the {{DATE_FORMAT}} function can be found in 
> https://prestodb.io/docs/current/functions/datetime.html.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4078: [FLINK-6693] [table] Support DATE_FORMAT function ...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4078#discussion_r122240819
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime;
+
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.DateTimeFormatterBuilder;
+
+/**
+ * Built-in scalar functions for date time related operations.
+ */
+public class DateTimeFunctions {
--- End diff --

We try to keep the code base of the Table API consistently in Scala. There 
are a few classes in Java which are mostly copied from Calcite.
Can you port this class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4078: [FLINK-6693] [table] Support DATE_FORMAT function ...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4078#discussion_r122255933
  
--- Diff: flink-libraries/flink-table/pom.xml ---
@@ -110,6 +110,12 @@ under the License.
 compile
 
 
+   
+   joda-time
+   joda-time
+   provided
--- End diff --

Why is `joda-time` provided? Is it a transitive dependency?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4078: [FLINK-6693] [table] Support DATE_FORMAT function ...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4078#discussion_r122255414
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime;
+
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.DateTimeFormatterBuilder;
+
+/**
+ * Built-in scalar functions for date time related operations.
+ */
+public class DateTimeFunctions {
+   private static final int PIVOT_YEAR = 2020;
+
+   private static final ThreadLocalCache 
DATETIME_FORMATTER_CACHE =
+   new ThreadLocalCache(100) {
+   @Override
+   protected DateTimeFormatter getNewInstance(String 
format) {
+   return createDateTimeFormatter(format);
+   }
+   };
+
+   public static String dateFormat(long ts, String formatString) {
+   DateTimeFormatter formatter = 
DATETIME_FORMATTER_CACHE.get(formatString);
--- End diff --

Would it make sense to add a shortcut during code-gen if the pattern is a 
string literal (this should be the most common case) to avoid the lookup?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6932) Update the inaccessible Dataflow Model paper link

2017-06-15 Thread mingleizhang (JIRA)
mingleizhang created FLINK-6932:
---

 Summary: Update the inaccessible Dataflow Model paper link
 Key: FLINK-6932
 URL: https://issues.apache.org/jira/browse/FLINK-6932
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: mingleizhang
Assignee: mingleizhang


 I tried to access the [#Dataflow Model paper] link which under 
[https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html],
 then it gives me an error [ 404 ] instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6932) Update the inaccessible Dataflow Model paper link

2017-06-15 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-6932:

Description:  I tried to access the Dataflow Model paper link which under 
[https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html],
 then it gives me an error [ 404 ] instead.  (was:  I tried to access the 
[#Dataflow Model paper] link which under 
[https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html],
 then it gives me an error [ 404 ] instead.)

> Update the inaccessible Dataflow Model paper link
> -
>
> Key: FLINK-6932
> URL: https://issues.apache.org/jira/browse/FLINK-6932
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: mingleizhang
>Assignee: mingleizhang
>  Labels: None
>
>  I tried to access the Dataflow Model paper link which under 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html],
>  then it gives me an error [ 404 ] instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4641) Support branching CEP patterns

2017-06-15 Thread Alexander Chermenin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050551#comment-16050551
 ] 

Alexander Chermenin commented on FLINK-4641:


Hi [~dian.fu]! Yes of course, you're welcome! Unfortunately I don't have enough 
time to do it.

> Support branching CEP patterns 
> ---
>
> Key: FLINK-4641
> URL: https://issues.apache.org/jira/browse/FLINK-4641
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Till Rohrmann
>Assignee: Alexander Chermenin
>
> We should add support for branching CEP patterns to the Pattern API. 
> {code}
> |--> B --|
> ||
> A -- --> D
> ||
> |--> C --|
> {code}
> This feature will require changes to the {{Pattern}} class and the 
> {{NFACompiler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050537#comment-16050537
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122212537
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Yes, @wuchong please merge :-)


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long recordTime, String urlKey) {
> super();
> 

[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122212537
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Yes, @wuchong please merge :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6886) Fix Timestamp field can not be selected in event time case when toDataStream[T], `T` not a `Row` Type.

2017-06-15 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050535#comment-16050535
 ] 

Fabian Hueske commented on FLINK-6886:
--

Maybe there's another way to fix this problem. I played a bit around and found 
the following:

The following Table API query is executed correctly:

{code}
val table = stream.toTable(tEnv, 'l, 'i, 'n, 'proctime.proctime)

val windowedTable = table
  .window(Tumble over 2.seconds on 'proctime as 'w)
  .groupBy('w, 'n)
  .select('n, 'i.count as 'cnt, 'w.start as 's, 'w.end as 'e)
val results = windowedTable.toAppendStream[MP](queryConfig)

// POJO

class MP(var s: Timestamp, var e: Timestamp, var cnt: Long, var n: String) {
  def this() { this(null, null, 0, null) }
  override def toString: String = s"$n,${s.toString},${e.toString},$cnt"
}
{code}

whereas the equivalent SQL query fails with the reported exception ("The field 
types of physical and logical row types do not match")

{code}
val sqlTable = tEnv.sql(
  s"""SELECT TUMBLE_START(proctime, INTERVAL '2' SECOND) AS s,
|  TUMBLE_END(proctime, INTERVAL '2' SECOND) AS e,
|  n,
|  COUNT(i) as cnt
|FROM $table
|GROUP BY n, TUMBLE(proctime, INTERVAL '2' SECOND)
|
  """.stripMargin)

val results = sqlTable.toAppendStream[MP](queryConfig)
{code}

The plans of both queries look similar, but the SQL plan seems to lack the 
correct final projection:

{code}
// Table API plan
== Abstract Syntax Tree ==
LogicalProject(n=[$0], cnt=[AS($1, 'cnt')], s=[AS($2, 's')], e=[AS($3, 'e')])
  LogicalWindowAggregate(group=[{0}], TMP_0=[COUNT($1)])
LogicalProject(n=[$2], i=[$1], proctime=[$3])
  LogicalTableScan(table=[[_DataStreamTable_0]])

== Optimized Logical Plan ==
DataStreamCalc(select=[n, TMP_0 AS cnt, TMP_1 AS s, TMP_2 AS e])
  DataStreamGroupWindowAggregate(groupBy=[n], window=[TumblingGroupWindow('w, 
'proctime, 2000.millis)], select=[n, COUNT(i) AS TMP_0, start('w) AS TMP_1, 
end('w) AS TMP_2])
DataStreamCalc(select=[n, i, proctime])
  DataStreamScan(table=[[_DataStreamTable_0]])

// SQL plans
== Abstract Syntax Tree ==
LogicalProject(s=[TUMBLE_START($1)], e=[TUMBLE_END($1)], n=[$0], cnt=[$2])
  LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
LogicalProject(n=[$2], $f1=[TUMBLE($3, 2000)], i=[$1])
  LogicalTableScan(table=[[UnnamedTable$3]])

== Optimized Logical Plan ==
DataStreamCalc(select=[w$start, w$end, n, cnt])
  DataStreamGroupWindowAggregate(groupBy=[n], window=[TumblingGroupWindow('w$, 
'proctime, 2000.millis)], select=[n, COUNT(i) AS cnt, start('w$) AS w$start, 
end('w$) AS w$end])
DataStreamCalc(select=[n, proctime, i])
  DataStreamScan(table=[[_DataStreamTable_0]])
{code}

So this doesn't seem to be a principled issue with the time attributes or 
window properties but rather an issue of the SQL optimization.

What do you think [~sunjincheng121] and [~jark]?

> Fix Timestamp field can not be selected in event time case when  
> toDataStream[T], `T` not a `Row` Type.
> ---
>
> Key: FLINK-6886
> URL: https://issues.apache.org/jira/browse/FLINK-6886
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently for event-time window(group/over), When contain `Timestamp` type 
> field in `SELECT Clause`, And toDataStream[T], `T` not a `Row` Type, Such 
> `PojoType`, will throw a exception. In this JIRA. will fix this bug. For 
> example:
> Group Window on SQL:
> {code}
> SELECT name, max(num) as myMax, TUMBLE_START(rowtime, INTERVAL '5' SECOND) as 
> winStart,TUMBLE_END(rowtime, INTERVAL '5' SECOND) as winEnd FROM T1 GROUP BY 
> name, TUMBLE(rowtime, INTERVAL '5' SECOND)
> {code}
> Throw Exception:
> {code}
> org.apache.flink.table.api.TableException: The field types of physical and 
> logical row types do not match.This is a bug and should not happen. Please 
> file an issue.
>   at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
>   at 
> org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:721)
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:247)
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:647)
> {code}
> In fact, when we solve this exception,subsequent other exceptions will be 
> thrown. The real reason is {{TableEnvironment#generateRowConverterFunction}} 
> method bug. So in this JIRA. will fix it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4130: [FLINK-6773] [checkpoint] Introduce compression (snappy) ...

2017-06-15 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4130
  
CC @tillrohrmann 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6773) Use compression (e.g. snappy) for full check/savepoints

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050508#comment-16050508
 ] 

ASF GitHub Bot commented on FLINK-6773:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/4130

[FLINK-6773] [checkpoint] Introduce compression (snappy) for keyed st…

This PR introduce optional snappy compression for the keyed state in full 
checkpoints and savepoints. This feature can be activated through a flag in 
{{ExecutionConfig}}.

For the future, we can also support user-defined compression schemes, which 
will also require a upgrade and compatibility feature, as described in 
FLINK-6931.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink compressedKeyGroups

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4130.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4130






> Use compression (e.g. snappy) for full check/savepoints
> ---
>
> Key: FLINK-6773
> URL: https://issues.apache.org/jira/browse/FLINK-6773
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> We could use compression (e.g. snappy stream compression) to decrease the 
> size of our full checkpoints and savepoints. From some initial experiments, I 
> think there is great potential to achieve compression rates around 30-50%. 
> Given those numbers, I think this is very low hanging fruit to implement.
> One point to consider in the implementation is that compression blocks should 
> respect key-groups, i.e. typically it should make sense to compress per 
> key-group.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6773) Use compression (e.g. snappy) for full check/savepoints

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050513#comment-16050513
 ] 

ASF GitHub Bot commented on FLINK-6773:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4130
  
CC @tillrohrmann 


> Use compression (e.g. snappy) for full check/savepoints
> ---
>
> Key: FLINK-6773
> URL: https://issues.apache.org/jira/browse/FLINK-6773
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> We could use compression (e.g. snappy stream compression) to decrease the 
> size of our full checkpoints and savepoints. From some initial experiments, I 
> think there is great potential to achieve compression rates around 30-50%. 
> Given those numbers, I think this is very low hanging fruit to implement.
> One point to consider in the implementation is that compression blocks should 
> respect key-groups, i.e. typically it should make sense to compress per 
> key-group.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4130: [FLINK-6773] [checkpoint] Introduce compression (s...

2017-06-15 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/4130

[FLINK-6773] [checkpoint] Introduce compression (snappy) for keyed st…

This PR introduce optional snappy compression for the keyed state in full 
checkpoints and savepoints. This feature can be activated through a flag in 
{{ExecutionConfig}}.

For the future, we can also support user-defined compression schemes, which 
will also require a upgrade and compatibility feature, as described in 
FLINK-6931.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink compressedKeyGroups

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4130.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4130






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050502#comment-16050502
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122205799
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Thanks @wuchong I check it in my side. nice. 
+1

Best,
SunJincheng


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long 

[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122205799
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Thanks @wuchong I check it in my side. nice. 
+1

Best,
SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6931) Support custom compression formats for checkpoints (+Upgrade/Compatibility)

2017-06-15 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-6931:
-

 Summary: Support custom compression formats for checkpoints 
(+Upgrade/Compatibility)
 Key: FLINK-6931
 URL: https://issues.apache.org/jira/browse/FLINK-6931
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Stefan Richter


With FLINK-6773, we introduced optional snappy compression for keyed state in 
full checkpoints and savepoints. We should offer users a way to register their 
own compression formats with the {{ExecutionConfig}}. For this, we should also 
have a compatibility story, very similar to what 
{{TypeSerializerConfigSnapshot}} doesfor type serializers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6836) Failing YARNSessionCapacitySchedulerITCase.testTaskManagerFailure

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050495#comment-16050495
 ] 

ASF GitHub Bot commented on FLINK-6836:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4120
  
+1 to merge.


> Failing YARNSessionCapacitySchedulerITCase.testTaskManagerFailure
> -
>
> Key: FLINK-6836
> URL: https://issues.apache.org/jira/browse/FLINK-6836
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The master is currently unstable. The 
> {{YARNSessionCapacitySchedulerITCase.testTaskManagerFailure}} fails with 
> Hadoop version {{2.6.5}}, {{2.7.3}} and {{2.8.0}}.
> See this build [1] for example.
> [1] https://travis-ci.org/apache/flink/builds/238720589



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4120: [FLINK-6836] [tests] Fix YARNSessionCapacitySchedulerITCa...

2017-06-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4120
  
+1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050474#comment-16050474
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122201344
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

@fhueske thanks for your reviewing. So if the travis pass, I will merge the 
code ? 


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long recordTime, 

[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122201344
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

@fhueske thanks for your reviewing. So if the travis pass, I will merge the 
code ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6930) Selecting window start / end on row-based Tumble/Slide window causes NPE

2017-06-15 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-6930:


 Summary: Selecting window start / end on row-based Tumble/Slide 
window causes NPE
 Key: FLINK-6930
 URL: https://issues.apache.org/jira/browse/FLINK-6930
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.3.0, 1.4.0
Reporter: Fabian Hueske


Selecting the start and end properties of a row-based window causes a 
NullPointerException.
The following program:

{code}
val windowedTable = table
  .window(Tumble over 2.rows on 'proctime as 'w)
  .groupBy('w, 'string)
  .select('string as 'n, 'int.count as 'cnt, 'w.start as 's, 'w.end as 'e)
{code}

causes 

{code}
Caused by: java.lang.NullPointerException
at 
org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1556)
at 
org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1551)
at DataStreamCalcRule$40.processElement(Unknown Source)
at 
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:67)
at 
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:890)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:868)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:75)
at 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:37)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:599)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:456)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
{code}

We should validate that the start and end window properties are not accessed if 
the window is defined on row-counts.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5908) Blob Cache can (rarely) get corrupted on failed blob downloads

2017-06-15 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050456#comment-16050456
 ] 

Nico Kruber commented on FLINK-5908:


let's solve this during the re-write of the BLOB store with FLIP-19

> Blob Cache can (rarely) get corrupted on failed blob downloads
> --
>
> Key: FLINK-5908
> URL: https://issues.apache.org/jira/browse/FLINK-5908
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Network
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
>Reporter: Stephan Ewen
>
> The Blob Cache downloads files directly to the target file location.
> While it tries to clean up failed attempts, there is a change that this 
> cleanup does not complete.
> In that case, we have a corrupt file at the target location. The blob cache 
> then assumes that it already has the file cached already and future requests 
> do not attempt to re-download the file.
> The fix would be to download to a temp file name, validate the integrity, and 
> rename to the target file path when the validation succeeds.
> The validation for "content addressable" could even include validating the 
> checksum hash.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5908) Blob Cache can (rarely) get corrupted on failed blob downloads

2017-06-15 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-5908:
---
Component/s: Network

> Blob Cache can (rarely) get corrupted on failed blob downloads
> --
>
> Key: FLINK-5908
> URL: https://issues.apache.org/jira/browse/FLINK-5908
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Network
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
>Reporter: Stephan Ewen
>
> The Blob Cache downloads files directly to the target file location.
> While it tries to clean up failed attempts, there is a change that this 
> cleanup does not complete.
> In that case, we have a corrupt file at the target location. The blob cache 
> then assumes that it already has the file cached already and future requests 
> do not attempt to re-download the file.
> The fix would be to download to a temp file name, validate the integrity, and 
> rename to the target file path when the validation succeeds.
> The validation for "content addressable" could even include validating the 
> checksum hash.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5908) Blob Cache can (rarely) get corrupted on failed blob downloads

2017-06-15 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-5908:
---
Affects Version/s: 1.4.0
   1.3.0

> Blob Cache can (rarely) get corrupted on failed blob downloads
> --
>
> Key: FLINK-5908
> URL: https://issues.apache.org/jira/browse/FLINK-5908
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Network
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
>Reporter: Stephan Ewen
>
> The Blob Cache downloads files directly to the target file location.
> While it tries to clean up failed attempts, there is a change that this 
> cleanup does not complete.
> In that case, we have a corrupt file at the target location. The blob cache 
> then assumes that it already has the file cached already and future requests 
> do not attempt to re-download the file.
> The fix would be to download to a temp file name, validate the integrity, and 
> rename to the target file path when the validation succeeds.
> The validation for "content addressable" could even include validating the 
> checksum hash.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5908) Blob Cache can (rarely) get corrupted on failed blob downloads

2017-06-15 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-5908:
---
Fix Version/s: (was: 1.4.0)
   (was: 1.2.2)

> Blob Cache can (rarely) get corrupted on failed blob downloads
> --
>
> Key: FLINK-5908
> URL: https://issues.apache.org/jira/browse/FLINK-5908
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>
> The Blob Cache downloads files directly to the target file location.
> While it tries to clean up failed attempts, there is a change that this 
> cleanup does not complete.
> In that case, we have a corrupt file at the target location. The blob cache 
> then assumes that it already has the file cached already and future requests 
> do not attempt to re-download the file.
> The fix would be to download to a temp file name, validate the integrity, and 
> rename to the target file path when the validation succeeds.
> The validation for "content addressable" could even include validating the 
> checksum hash.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6046) Add support for oversized messages during deployment

2017-06-15 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050450#comment-16050450
 ] 

Nico Kruber commented on FLINK-6046:


since the broken cleanup for the current blob store is a deal breaker for 
offloading oversized messages, we should re-vamp this issue after finishing 
FLIP-19

> Add support for oversized messages during deployment
> 
>
> Key: FLINK-6046
> URL: https://issues.apache.org/jira/browse/FLINK-6046
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> This is the non-FLIP6 version of FLINK-4346, restricted to deployment 
> messages:
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than {{akka.framesize}} as may happen for task deployments via the 
> {{TaskDeploymentDescriptor}}.
> We should use the {{BlobServer}} to offload big data items (if possible) and 
> make use of any potential distributed file system behind. This way, not only 
> do we avoid the akka framesize restriction, but may also be able to speed up 
> deployment.
> I suggest the following changes:
>   - the sender, i.e. the {{Execution}} class, tries to store the serialized 
> job information and serialized task information (if oversized) from the 
> {{TaskDeploymentDescriptor}} (tdd) on the {{BlobServer}} as a single 
> {{NAME_ADDRESSABLE}} blob under its job ID (if this does not work, we send 
> the whole tdd as usual via akka)
>   - if stored in a blob, these data items are removed from the tdd
>   - the receiver, i.e. the {{TaskManager}} class, tries to retrieve any 
> offloaded data after receiving the {{TaskDeploymentDescriptor}} from akka; it 
> re-assembles the original tdd
>   - the stored blob may be deleted after re-assembly of the tdd
> Further (future) changes may include:
>   - separating the serialized job information and serialized task information 
> into two files and re-use the first one for all tasks
>   - not re-deploying these two during job recovery (if possible)
>   - then, as all other {{NAME_ADDRESSABLE}} blobs, these offloaded blobs may 
> be removed when the job enters a final state instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6046) Add support for oversized messages during deployment

2017-06-15 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-6046:
---
Component/s: Network

> Add support for oversized messages during deployment
> 
>
> Key: FLINK-6046
> URL: https://issues.apache.org/jira/browse/FLINK-6046
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> This is the non-FLIP6 version of FLINK-4346, restricted to deployment 
> messages:
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than {{akka.framesize}} as may happen for task deployments via the 
> {{TaskDeploymentDescriptor}}.
> We should use the {{BlobServer}} to offload big data items (if possible) and 
> make use of any potential distributed file system behind. This way, not only 
> do we avoid the akka framesize restriction, but may also be able to speed up 
> deployment.
> I suggest the following changes:
>   - the sender, i.e. the {{Execution}} class, tries to store the serialized 
> job information and serialized task information (if oversized) from the 
> {{TaskDeploymentDescriptor}} (tdd) on the {{BlobServer}} as a single 
> {{NAME_ADDRESSABLE}} blob under its job ID (if this does not work, we send 
> the whole tdd as usual via akka)
>   - if stored in a blob, these data items are removed from the tdd
>   - the receiver, i.e. the {{TaskManager}} class, tries to retrieve any 
> offloaded data after receiving the {{TaskDeploymentDescriptor}} from akka; it 
> re-assembles the original tdd
>   - the stored blob may be deleted after re-assembly of the tdd
> Further (future) changes may include:
>   - separating the serialized job information and serialized task information 
> into two files and re-use the first one for all tasks
>   - not re-deploying these two during job recovery (if possible)
>   - then, as all other {{NAME_ADDRESSABLE}} blobs, these offloaded blobs may 
> be removed when the job enters a final state instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122194058
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

I check the usages of `generateFieldAccess()` and the changes seem to be OK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050423#comment-16050423
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122194058
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

I check the usages of `generateFieldAccess()` and the changes seem to be OK.


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long recordTime, String 

[jira] [Updated] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-5991:
---
Labels: api-breaking api-deprecation  (was: )

> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-5991:
---
Labels:   (was: api-breaking api-deprecation)

> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6682) Improve error message in case parallelism exceeds maxParallelism

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050370#comment-16050370
 ] 

ASF GitHub Bot commented on FLINK-6682:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4125
  
Hi @zentol . Please helps review if you are free, should I add some extra 
information ? Thanks.


> Improve error message in case parallelism exceeds maxParallelism
> 
>
> Key: FLINK-6682
> URL: https://issues.apache.org/jira/browse/FLINK-6682
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>
> When restoring a job with a parallelism that exceeds the maxParallelism we're 
> not providing a useful error message, as all you get is an 
> IllegalArgumentException:
> {code}
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed
> at 
> org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:343)
> at 
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
> ... 22 more
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.createKeyGroupPartitions(StateAssignmentOperation.java:449)
> at 
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:117)
> at 
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:102)
> at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1038)
> at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1101)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4125: [FLINK-6682] [checkpoints] Improve error message in case ...

2017-06-15 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4125
  
Hi @zentol . Please helps review if you are free, should I add some extra 
information ? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050358#comment-16050358
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122180812
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

I think your branch looks good @wuchong. I tried a similar thing but did 
not change the `generateFieldAccess()` methods because I was afraid of side 
effects if we use the mapping for any kind types (instead of just POJOs).


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
>   

[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122180812
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

I think your branch looks good @wuchong. I tried a similar thing but did 
not change the `generateFieldAccess()` methods because I was afraid of side 
effects if we use the mapping for any kind types (instead of just POJOs).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050341#comment-16050341
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122178152
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Emm..  But it works good in my local. I pushed it to my branch, can you 
check it out?  
https://github.com/wuchong/flink/commit/ad97e84ca561ea32d2ce5e0779f4aff6429b5523


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public 

[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122178152
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Emm..  But it works good in my local. I pushed it to my branch, can you 
check it out?  
https://github.com/wuchong/flink/commit/ad97e84ca561ea32d2ce5e0779f4aff6429b5523


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050332#comment-16050332
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122176847
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

@wuchong I think the current PR. works well. So I suggest you can open a 
new PR. to improve it.
What do you think? @fhueske @wuchong 


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> 

[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122176847
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

@wuchong I think the current PR. works well. So I suggest you can open a 
new PR. to improve it.
What do you think? @fhueske @wuchong 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050326#comment-16050326
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122176275
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

@fhueske @wuchong thanks for try improve this PR. @wuchong For your 
solution, In my side also got two error when I run `TimeAttributesITCase`. Have 
you run all test in your local side ? 



> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long 

[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122176275
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

@fhueske @wuchong thanks for try improve this PR. @wuchong For your 
solution, In my side also got two error when I run `TimeAttributesITCase`. Have 
you run all test in your local side ? 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122168658
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

I think I tried that but it cause a couple of tests to fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050272#comment-16050272
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122168658
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

I think I tried that but it cause a couple of tests to fail.


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long recordTime, String urlKey) {
> super();
> this.recordTime = recordTime;
> this.urlKey = urlKey;
> }
> public String getUrlKey() {
> return urlKey;
> }
> public void setUrlKey(String urlKey) {
> this.urlKey = urlKey;
> }
> public long getRecordTime() {
> return recordTime;
> }
> public void setRecordTime(long recordTime) {
> this.recordTime = recordTime;
> }
> public long getHttpGetMessageCount() {
> return httpGetMessageCount;
> }
> 

[jira] [Updated] (FLINK-6917) Introduce test base for end-to-end testing serializer config snapshotting, restoring, and compatibility check roundtrips

2017-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6917:
---
Labels: ta  (was: )

> Introduce test base for end-to-end testing serializer config snapshotting, 
> restoring, and compatibility check roundtrips 
> -
>
> Key: FLINK-6917
> URL: https://issues.apache.org/jira/browse/FLINK-6917
> Project: Flink
>  Issue Type: Test
>  Components: State Backends, Checkpointing, Tests, Type Serialization 
> System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>  Labels: ta
>
> Currently, we only have end-to-end tests of the serializer snapshotting and 
> restore roundtrip for the {{PojoSerializer}}, {{KryoSerializer}}, and Scala 
> type serializers.
> They are all written differently with varying coverage of behavioural tests, 
> and scattered in several different test classes.
> This JIRA tracks introducing a common test base for the serialization 
> roundtrip for all serializers in Flink, and also activating it for all 
> serializers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050271#comment-16050271
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122168412
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Right, I noticed that as well


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long recordTime, String urlKey) {
> super();
> 

[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122168412
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Right, I noticed that as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6917) Introduce test base for end-to-end testing serializer config snapshotting, restoring, and compatibility check roundtrips

2017-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6917:
---
Labels:   (was: ta)

> Introduce test base for end-to-end testing serializer config snapshotting, 
> restoring, and compatibility check roundtrips 
> -
>
> Key: FLINK-6917
> URL: https://issues.apache.org/jira/browse/FLINK-6917
> Project: Flink
>  Issue Type: Test
>  Components: State Backends, Checkpointing, Tests, Type Serialization 
> System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Currently, we only have end-to-end tests of the serializer snapshotting and 
> restore roundtrip for the {{PojoSerializer}}, {{KryoSerializer}}, and Scala 
> type serializers.
> They are all written differently with varying coverage of behavioural tests, 
> and scattered in several different test classes.
> This JIRA tracks introducing a common test base for the serialization 
> roundtrip for all serializers in Flink, and also activating it for all 
> serializers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6929) Add documentation for Table API OVER windows

2017-06-15 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-6929:


 Summary: Add documentation for Table API OVER windows
 Key: FLINK-6929
 URL: https://issues.apache.org/jira/browse/FLINK-6929
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Table API & SQL
Affects Versions: 1.3.1, 1.4.0
Reporter: Fabian Hueske


The Table API documentation is currently lacking a description of OVER windows.
The page has a placeholder section with a TODO: 
{{./docs/dev/table/tableApi.md}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5354) Split up Table API documentation into multiple pages

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-5354.

   Resolution: Done
Fix Version/s: 1.4.0
   1.3.1

Done for 1.3.1 with 398012aa165d5d6cee9982475ea9cded60ae6ae3
Done for 1.4.0 with a5d93a56cb37e691ec9bb06d17c76151e7619267

> Split up Table API documentation into multiple pages 
> -
>
> Key: FLINK-5354
> URL: https://issues.apache.org/jira/browse/FLINK-5354
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Table API & SQL
>Reporter: Timo Walther
>Assignee: Fabian Hueske
> Fix For: 1.3.1, 1.4.0
>
>
> The Table API documentation page is quite large at the moment. We should 
> split it up into multiple pages:
> Here is my suggestion:
> - Overview (Datatypes, Config, Registering Tables, Examples)
> - TableSources and Sinks
> - Table API
> - SQL
> - Functions



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6751) Table API / SQL Docs: UDFs Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-6751:
-
Description: 
Update and extend the documentation of UDFs in the Table API / SQL: 
{{./docs/dev/table/udfs.md}}
Missing sections:

- Registration of UDFs
- UDAGGs


  was:Update and refine {{./docs/dev/table/udfs.md}} in feature branch 
https://github.com/apache/flink/tree/tableDocs


> Table API / SQL Docs: UDFs Page
> ---
>
> Key: FLINK-6751
> URL: https://issues.apache.org/jira/browse/FLINK-6751
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Shaoxuan Wang
>
> Update and extend the documentation of UDFs in the Table API / SQL: 
> {{./docs/dev/table/udfs.md}}
> Missing sections:
> - Registration of UDFs
> - UDAGGs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-6750:
-
Description: 
Update and refine the documentation about TableSources and TableSinks.
There are a few TODOs left in {{./docs/dev/table/sourceSinks.md}}

  was:Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch 
https://github.com/apache/flink/tree/tableDocs


> Table API / SQL Docs: Table Sources & Sinks Page
> 
>
> Key: FLINK-6750
> URL: https://issues.apache.org/jira/browse/FLINK-6750
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
> Fix For: 1.3.1, 1.4.0
>
>
> Update and refine the documentation about TableSources and TableSinks.
> There are a few TODOs left in {{./docs/dev/table/sourceSinks.md}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6751) Table API / SQL Docs: UDFs Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-6751:
-
Issue Type: Task  (was: Sub-task)
Parent: (was: FLINK-5354)

> Table API / SQL Docs: UDFs Page
> ---
>
> Key: FLINK-6751
> URL: https://issues.apache.org/jira/browse/FLINK-6751
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Shaoxuan Wang
>
> Update and refine {{./docs/dev/table/udfs.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-6750:
-
Issue Type: Task  (was: Sub-task)
Parent: (was: FLINK-5354)

> Table API / SQL Docs: Table Sources & Sinks Page
> 
>
> Key: FLINK-6750
> URL: https://issues.apache.org/jira/browse/FLINK-6750
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
> Fix For: 1.3.1, 1.4.0
>
>
> Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6747) Table API / SQL Docs: Streaming Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-6747:
-
Description: 
Extend {{./docs/dev/table/streaming.md}} page.
Missing are sections about

- Dynamic Tables
- QueryConfiguration (state retention time)

  was:Update and refine {{./docs/dev/table/streaming.md}} in feature branch 
https://github.com/apache/flink/tree/tableDocs


> Table API / SQL Docs: Streaming Page
> 
>
> Key: FLINK-6747
> URL: https://issues.apache.org/jira/browse/FLINK-6747
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> Extend {{./docs/dev/table/streaming.md}} page.
> Missing are sections about
> - Dynamic Tables
> - QueryConfiguration (state retention time)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6747) Table API / SQL Docs: Streaming Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-6747:
-
Issue Type: Task  (was: Sub-task)
Parent: (was: FLINK-5354)

> Table API / SQL Docs: Streaming Page
> 
>
> Key: FLINK-6747
> URL: https://issues.apache.org/jira/browse/FLINK-6747
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> Update and refine {{./docs/dev/table/streaming.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6747) Table API / SQL Docs: Streaming Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-6747:


Assignee: Fabian Hueske

> Table API / SQL Docs: Streaming Page
> 
>
> Key: FLINK-6747
> URL: https://issues.apache.org/jira/browse/FLINK-6747
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> Update and refine {{./docs/dev/table/streaming.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Issue Comment Deleted] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-6750:
-
Comment: was deleted

(was: Done for 1.3.1 with 89850f21dc596b2b845ce98433263496e512ef54
Done for 1.4.0 with 232481572bb48e82880afdb2f7237af08a8404b5)

> Table API / SQL Docs: Table Sources & Sinks Page
> 
>
> Key: FLINK-6750
> URL: https://issues.apache.org/jira/browse/FLINK-6750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
> Fix For: 1.3.1, 1.4.0
>
>
> Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6748) Table API / SQL Docs: Table API Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6748.

   Resolution: Done
Fix Version/s: 1.4.0
   1.3.1

Done for 1.3.1 with 89850f21dc596b2b845ce98433263496e512ef54
Done for 1.4.0 with 232481572bb48e82880afdb2f7237af08a8404b5

> Table API / SQL Docs: Table API Page
> 
>
> Key: FLINK-6748
> URL: https://issues.apache.org/jira/browse/FLINK-6748
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
> Fix For: 1.3.1, 1.4.0
>
>
> Update and refine {{./docs/dev/table/tableApi.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reopened FLINK-6750:
--

> Table API / SQL Docs: Table Sources & Sinks Page
> 
>
> Key: FLINK-6750
> URL: https://issues.apache.org/jira/browse/FLINK-6750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
> Fix For: 1.3.1, 1.4.0
>
>
> Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske resolved FLINK-6750.
--
   Resolution: Fixed
Fix Version/s: 1.4.0
   1.3.1

Done for 1.3.1 with 89850f21dc596b2b845ce98433263496e512ef54
Done for 1.4.0 with 232481572bb48e82880afdb2f7237af08a8404b5

> Table API / SQL Docs: Table Sources & Sinks Page
> 
>
> Key: FLINK-6750
> URL: https://issues.apache.org/jira/browse/FLINK-6750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
> Fix For: 1.3.1, 1.4.0
>
>
> Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6748) Table API / SQL Docs: Table API Page

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050243#comment-16050243
 ] 

ASF GitHub Bot commented on FLINK-6748:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4093


> Table API / SQL Docs: Table API Page
> 
>
> Key: FLINK-6748
> URL: https://issues.apache.org/jira/browse/FLINK-6748
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>
> Update and refine {{./docs/dev/table/tableApi.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4094: [FLINK-6750] [table] [docs] Rework Table Sources &...

2017-06-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4094


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050244#comment-16050244
 ] 

ASF GitHub Bot commented on FLINK-6750:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4094


> Table API / SQL Docs: Table Sources & Sinks Page
> 
>
> Key: FLINK-6750
> URL: https://issues.apache.org/jira/browse/FLINK-6750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>
> Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4093: [FLINK-6748] [table] [docs] Reworked Table API Pag...

2017-06-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4093


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6836) Failing YARNSessionCapacitySchedulerITCase.testTaskManagerFailure

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050238#comment-16050238
 ] 

ASF GitHub Bot commented on FLINK-6836:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4120
  
I first thought I had to downgrade, because of the aforementioned problem. 
However, Hadoop 2.8.0 needs at least httpclient `4.5.2`. Luckily, they fixed 
the problem with the `URLEncodedUtils#parse` method in `4.5.3`. Therefore I 
bumped the version again.

Concerning moving the dependency management to `flink-shaded-hadoop`: I 
think this is unfortunately not possible since due the immutable reactor builds 
from maven, `flink-yarn-tests` pulls all dependencies from 
`flink-shaded-hadoop` and `flink-shaded-yarn-tests`. This also means that 
depending on the classpath order, we either use the shaded Yarn mini-cluster or 
not. In the latter case we have to make sure that we are using `httpclient` 
`4.5.3` which is only the case if we define the dependency management in the 
parent pom.

As a side remark, given this situation with maven, I'm not sure whether we 
actually need the `flink-shaded-yarn-tests` module.

A proper solution would be fixing the problem with the immutable builds. 
For example, everything works properly if we first install the shaded jars and 
then build the flink-yarn-tests. Then maven will properly resolve all 
dependencies. In this case, I think it should be possible to move the 
dependency management into `flink-shaded-hadoop`.


> Failing YARNSessionCapacitySchedulerITCase.testTaskManagerFailure
> -
>
> Key: FLINK-6836
> URL: https://issues.apache.org/jira/browse/FLINK-6836
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The master is currently unstable. The 
> {{YARNSessionCapacitySchedulerITCase.testTaskManagerFailure}} fails with 
> Hadoop version {{2.6.5}}, {{2.7.3}} and {{2.8.0}}.
> See this build [1] for example.
> [1] https://travis-ci.org/apache/flink/builds/238720589



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4120: [FLINK-6836] [tests] Fix YARNSessionCapacitySchedulerITCa...

2017-06-15 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4120
  
I first thought I had to downgrade, because of the aforementioned problem. 
However, Hadoop 2.8.0 needs at least httpclient `4.5.2`. Luckily, they fixed 
the problem with the `URLEncodedUtils#parse` method in `4.5.3`. Therefore I 
bumped the version again.

Concerning moving the dependency management to `flink-shaded-hadoop`: I 
think this is unfortunately not possible since due the immutable reactor builds 
from maven, `flink-yarn-tests` pulls all dependencies from 
`flink-shaded-hadoop` and `flink-shaded-yarn-tests`. This also means that 
depending on the classpath order, we either use the shaded Yarn mini-cluster or 
not. In the latter case we have to make sure that we are using `httpclient` 
`4.5.3` which is only the case if we define the dependency management in the 
parent pom.

As a side remark, given this situation with maven, I'm not sure whether we 
actually need the `flink-shaded-yarn-tests` module.

A proper solution would be fixing the problem with the immutable builds. 
For example, everything works properly if we first install the shaded jars and 
then build the flink-yarn-tests. Then maven will properly resolve all 
dependencies. In this case, I think it should be possible to move the 
dependency management into `flink-shaded-hadoop`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6602) Table source with defined time attributes allows empty string

2017-06-15 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050224#comment-16050224
 ] 

Fabian Hueske commented on FLINK-6602:
--

The fix looks good! Please open a pull request with that :-)

I also gave you contributor permissions for the Flink JIRA.
You can now also assign other issues to yourself.

Thank you, Fabian

> Table source with defined time attributes allows empty string
> -
>
> Key: FLINK-6602
> URL: https://issues.apache.org/jira/browse/FLINK-6602
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Zhe Li
> Attachments: getRowType.png
>
>
> {{DefinedRowtimeAttribute}} and {{DefinedProctimeAttribute}} are not checked 
> for empty strings.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6602) Table source with defined time attributes allows empty string

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-6602:


Assignee: Zhe Li

> Table source with defined time attributes allows empty string
> -
>
> Key: FLINK-6602
> URL: https://issues.apache.org/jira/browse/FLINK-6602
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Zhe Li
> Attachments: getRowType.png
>
>
> {{DefinedRowtimeAttribute}} and {{DefinedProctimeAttribute}} are not checked 
> for empty strings.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050205#comment-16050205
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122152659
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Maybe a better solution is to change this code block into

```scala
val input1AccessExprs = for (i <- input1Mapping.indices)
  yield generateInputAccess(input1, input1Term, i, input1Mapping)

val input2AccessExprs = input2 match {
  case Some(ti) => for (i <- input2Mapping.indices)
yield generateInputAccess(ti, input2Term, i, input2Mapping)

  case None => Seq() // add nothing
}
```


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long recordTime, String urlKey) {
> super();
> this.recordTime = recordTime;
> this.urlKey = urlKey;
> }
> public String getUrlKey() {
> return urlKey;
> }
> public void 

[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050206#comment-16050206
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122141845
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

`input2` should be `ti`,  here `input2` is an `Option` which can't be 
`PojoTypeInfo`.


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long recordTime, 

  1   2   >