[GitHub] flink issue #4129: [FLINK-6918] [tests] Harden AbstractOperatorRestoreTestBa...

2017-06-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4129
  
+1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6918) Failing tests: ChainLengthDecreaseTest and ChainLengthIncreaseTest

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050095#comment-16050095
 ] 

ASF GitHub Bot commented on FLINK-6918:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4129
  
+1 to merge.


> Failing tests: ChainLengthDecreaseTest and ChainLengthIncreaseTest
> --
>
> Key: FLINK-6918
> URL: https://issues.apache.org/jira/browse/FLINK-6918
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Greg Hogan
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> While running {{mvn clean verify}} on Linux with {{commit 
> 3bad77c0ae932a926260b769efb151a89fc309ab}}.
> {noformat}
> Tests in error:
>   
> ChainLengthDecreaseTest>AbstractOperatorRestoreTestBase.testMigrationAndRestore:164->AbstractOperatorRestoreTestBase.migrateJob:202
>  »
>   
> ChainLengthIncreaseTest>AbstractOperatorRestoreTestBase.testMigrationAndRestore:164->AbstractOperatorRestoreTestBase.migrateJob:202
>  »
> {noformat}
> {noformat}
> Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 6.497 sec <<< 
> FAILURE! - in 
> org.apache.flink.test.state.operator.restore.unkeyed.ChainLengthDecreaseTest
> testMigrationAndRestore[Migrate Savepoint: 
> nonKeyed-flink1.3](org.apache.flink.test.state.operator.restore.unkeyed.ChainLengthDecreaseTest)
>   Time elapsed: 0.361 sec  <<< ERROR!
> java.lang.Exception: java.lang.Exception: Failed to trigger savepoint.
>   at 
> org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase.migrateJob(AbstractOperatorRestoreTestBase.java:202)
>   at 
> org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase.testMigrationAndRestore(AbstractOperatorRestoreTestBase.java:164)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.For

[jira] [Created] (FLINK-6925) Add CONCAT/CONCAT_WS supported in SQL

2017-06-15 Thread sunjincheng (JIRA)
sunjincheng created FLINK-6925:
--

 Summary: Add CONCAT/CONCAT_WS supported in SQL
 Key: FLINK-6925
 URL: https://issues.apache.org/jira/browse/FLINK-6925
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Affects Versions: 1.4.0
Reporter: sunjincheng


CONCAT(str1,str2,...)Returns the string that results from concatenating the 
arguments. May have one or more arguments. If all arguments are nonbinary 
strings, the result is a nonbinary string. If the arguments include any binary 
strings, the result is a binary string. A numeric argument is converted to its 
equivalent nonbinary string form.

CONCAT() returns NULL if any argument is NULL.

* Syntax:
CONCAT(str1,str2,...) 

* Arguments
** str1,str2,... -

* Return Types
  string

* Example:
  CONCAT('F', 'lin', 'k') -> 'Flink'
  CONCAT('M', NULL, 'L') -> NULL
  CONCAT(14.3) -> '14.3'

* See more:
** [MySQL| 
https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat]


CONCAT_WS() stands for Concatenate With Separator and is a special form of 
CONCAT(). The first argument is the separator for the rest of the arguments. 
The separator is added between the strings to be concatenated. The separator 
can be a string, as can the rest of the arguments. If the separator is NULL, 
the result is NULL.

* Syntax:
CONCAT_WS(separator,str1,str2,...)

* Arguments
** separator -
** str1,str2,... -

* Return Types
  string

* Example:
  CONCAT_WS(',','First name','Second name','Last Name') -> 'First name,Second 
name,Last Name'

* See more:
** [MySQL| 
https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat-ws]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6926) Add MD5/SHA1/SHA2 supported in SQL

2017-06-15 Thread sunjincheng (JIRA)
sunjincheng created FLINK-6926:
--

 Summary: Add MD5/SHA1/SHA2 supported in SQL
 Key: FLINK-6926
 URL: https://issues.apache.org/jira/browse/FLINK-6926
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Affects Versions: 1.4.0
Reporter: sunjincheng


MD5(str)Calculates an MD5 128-bit checksum for the string. The value is 
returned as a string of 32 hexadecimal digits, or NULL if the argument was 
NULL. The return value can, for example, be used as a hash key. See the notes 
at the beginning of this section about storing hash values efficiently.

The return value is a nonbinary string in the connection character set.

* Example:
 MD5('testing') - 'ae2b1fca515949e5d54fb22b8ed95575'

* See more:
** [MySQL| 
https://dev.mysql.com/doc/refman/5.7/en/encryption-functions.html#function_sha1]


SHA1(str), SHA(str)Calculates an SHA-1 160-bit checksum for the string, as 
described in RFC 3174 (Secure Hash Algorithm). The value is returned as a 
string of 40 hexadecimal digits, or NULL if the argument was NULL. One of the 
possible uses for this function is as a hash key. See the notes at the 
beginning of this section about storing hash values efficiently. You can also 
use SHA1() as a cryptographic function for storing passwords. SHA() is 
synonymous with SHA1().

The return value is a nonbinary string in the connection character set.

* Example:
  SHA1('abc') -> 'a9993e364706816aba3e25717850c26c9cd0d89d'



SHA2(str, hash_length)Calculates the SHA-2 family of hash functions (SHA-224, 
SHA-256, SHA-384, and SHA-512). The first argument is the cleartext string to 
be hashed. The second argument indicates the desired bit length of the result, 
which must have a value of 224, 256, 384, 512, or 0 (which is equivalent to 
256). If either argument is NULL or the hash length is not one of the permitted 
values, the return value is NULL. Otherwise, the function result is a hash 
value containing the desired number of bits. See the notes at the beginning of 
this section about storing hash values efficiently.

The return value is a nonbinary string in the connection character set.

* Example:
SHA2('abc', 224) -> '23097d223405d8228642a477bda255b32aadbce4bda0b3f7e36c9da7'
* See more:
** [MySQL| 
https://dev.mysql.com/doc/refman/5.7/en/encryption-functions.html#function_sha2]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6891) Add LOG supported in SQL

2017-06-15 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6891:
---
Description: 
LOG(X), LOG(B,X)

If called with one parameter, this function returns the natural logarithm of X. 
If X is less than or equal to 0.0E0, the function returns NULL and (as of MySQL 
5.7.4) a warning “Invalid argument for logarithm” is reported.
The inverse of this function (when called with a single argument) is the EXP() 
function.

If called with two parameters, this function returns the logarithm of X to the 
base B. If X is less than or equal to 0, or if B is less than or equal to 1, 
then NULL is returned.
* Example:
 LOG(2) -> 0.69314718055995
 LOG(-2) -> NULL
 LOG(2,65536) -> 16
 LOG(10,100) -> 2
 LOG(1,100) -> NULL

* See more:
** [MySQL| 
https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log]



  was:
LONG (N) A single parameter version of the function returns the natural 
logarithm of N, and if two arguments are called, it returns any radix of the 
logarithm of N. 
* Syntax:
LOG ( float_expression [, base ] )  

* Arguments
**float_expression:  Is an expression of type float or of a type that can be 
implicitly converted to float.
**base: Optional integer argument that sets the base for the logarithm.

* Return Types
  float

* Example:
  LOG(10) -> 2.30

* See more:
** [MSQL|https://docs.microsoft.com/en-us/sql/t-sql/functions/log-transact-sql]
** [MySQL| 
https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log]




> Add LOG supported in SQL
> 
>
> Key: FLINK-6891
> URL: https://issues.apache.org/jira/browse/FLINK-6891
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> LOG(X), LOG(B,X)
> If called with one parameter, this function returns the natural logarithm of 
> X. If X is less than or equal to 0.0E0, the function returns NULL and (as of 
> MySQL 5.7.4) a warning “Invalid argument for logarithm” is reported.
> The inverse of this function (when called with a single argument) is the 
> EXP() function.
> If called with two parameters, this function returns the logarithm of X to 
> the base B. If X is less than or equal to 0, or if B is less than or equal to 
> 1, then NULL is returned.
> * Example:
>  LOG(2) -> 0.69314718055995
>  LOG(-2) -> NULL
>  LOG(2,65536) -> 16
>  LOG(10,100) -> 2
>  LOG(1,100) -> NULL
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6927) Support pattern group in CEP

2017-06-15 Thread Dian Fu (JIRA)
Dian Fu created FLINK-6927:
--

 Summary: Support pattern group in CEP
 Key: FLINK-6927
 URL: https://issues.apache.org/jira/browse/FLINK-6927
 Project: Flink
  Issue Type: Bug
  Components: CEP
Reporter: Dian Fu
Assignee: Dian Fu


We should add support for pattern group. This would enrich the set of supported 
patterns. For example, users can write patterns like this with this feature 
available:
{code}
 A --> (B --> C.times(3)).optional() --> D
{code}
or
{code}
A --> (B --> C).times(3) --> D
{code}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4120: [FLINK-6836] [tests] Fix YARNSessionCapacitySchedulerITCa...

2017-06-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4120
  
The PR descriptions says that you're downgrading the dependency versions, 
but with the second commit your actually upgrading them.

Also, couldn't we move the dependency management into `flink-shaded-hadoop`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6836) Failing YARNSessionCapacitySchedulerITCase.testTaskManagerFailure

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050134#comment-16050134
 ] 

ASF GitHub Bot commented on FLINK-6836:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4120
  
The PR descriptions says that you're downgrading the dependency versions, 
but with the second commit your actually upgrading them.

Also, couldn't we move the dependency management into `flink-shaded-hadoop`?


> Failing YARNSessionCapacitySchedulerITCase.testTaskManagerFailure
> -
>
> Key: FLINK-6836
> URL: https://issues.apache.org/jira/browse/FLINK-6836
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The master is currently unstable. The 
> {{YARNSessionCapacitySchedulerITCase.testTaskManagerFailure}} fails with 
> Hadoop version {{2.6.5}}, {{2.7.3}} and {{2.8.0}}.
> See this build [1] for example.
> [1] https://travis-ci.org/apache/flink/builds/238720589



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6263) Leader error in Kafka producer on leader change (broker restart/failrue)

2017-06-15 Thread Konstantin Lalafaryan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050148#comment-16050148
 ] 

Konstantin Lalafaryan commented on FLINK-6263:
--

What is the status of this bug ? We have the same problem in our flink job.

> Leader error in Kafka producer on leader change (broker restart/failrue)
> 
>
> Key: FLINK-6263
> URL: https://issues.apache.org/jira/browse/FLINK-6263
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>
> We have observed the following error in the Kafka producer
> java.lang.Exception: Failed to send data to Kafka: This server is not the 
> leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:376)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:293)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6891) Add LOG(X) supported in SQL

2017-06-15 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6891:
---
Summary: Add LOG(X) supported in SQL  (was: Add LOG supported in SQL)

> Add LOG(X) supported in SQL
> ---
>
> Key: FLINK-6891
> URL: https://issues.apache.org/jira/browse/FLINK-6891
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> LOG(X), LOG(B,X)
> If called with one parameter, this function returns the natural logarithm of 
> X. If X is less than or equal to 0.0E0, the function returns NULL and (as of 
> MySQL 5.7.4) a warning “Invalid argument for logarithm” is reported.
> The inverse of this function (when called with a single argument) is the 
> EXP() function.
> If called with two parameters, this function returns the logarithm of X to 
> the base B. If X is less than or equal to 0, or if B is less than or equal to 
> 1, then NULL is returned.
> * Example:
>  LOG(2) -> 0.69314718055995
>  LOG(-2) -> NULL
>  LOG(2,65536) -> 16
>  LOG(10,100) -> 2
>  LOG(1,100) -> NULL
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log]
> --NOTE-- In this JIRA. we only implement LOG(X). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6891) Add LOG supported in SQL

2017-06-15 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6891:
---
Description: 
LOG(X), LOG(B,X)

If called with one parameter, this function returns the natural logarithm of X. 
If X is less than or equal to 0.0E0, the function returns NULL and (as of MySQL 
5.7.4) a warning “Invalid argument for logarithm” is reported.
The inverse of this function (when called with a single argument) is the EXP() 
function.

If called with two parameters, this function returns the logarithm of X to the 
base B. If X is less than or equal to 0, or if B is less than or equal to 1, 
then NULL is returned.
* Example:
 LOG(2) -> 0.69314718055995
 LOG(-2) -> NULL
 LOG(2,65536) -> 16
 LOG(10,100) -> 2
 LOG(1,100) -> NULL

* See more:
** [MySQL| 
https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log]

--NOTE-- In this JIRA. we only implement LOG(X). 



  was:
LOG(X), LOG(B,X)

If called with one parameter, this function returns the natural logarithm of X. 
If X is less than or equal to 0.0E0, the function returns NULL and (as of MySQL 
5.7.4) a warning “Invalid argument for logarithm” is reported.
The inverse of this function (when called with a single argument) is the EXP() 
function.

If called with two parameters, this function returns the logarithm of X to the 
base B. If X is less than or equal to 0, or if B is less than or equal to 1, 
then NULL is returned.
* Example:
 LOG(2) -> 0.69314718055995
 LOG(-2) -> NULL
 LOG(2,65536) -> 16
 LOG(10,100) -> 2
 LOG(1,100) -> NULL

* See more:
** [MySQL| 
https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log]




> Add LOG supported in SQL
> 
>
> Key: FLINK-6891
> URL: https://issues.apache.org/jira/browse/FLINK-6891
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> LOG(X), LOG(B,X)
> If called with one parameter, this function returns the natural logarithm of 
> X. If X is less than or equal to 0.0E0, the function returns NULL and (as of 
> MySQL 5.7.4) a warning “Invalid argument for logarithm” is reported.
> The inverse of this function (when called with a single argument) is the 
> EXP() function.
> If called with two parameters, this function returns the logarithm of X to 
> the base B. If X is less than or equal to 0, or if B is less than or equal to 
> 1, then NULL is returned.
> * Example:
>  LOG(2) -> 0.69314718055995
>  LOG(-2) -> NULL
>  LOG(2,65536) -> 16
>  LOG(10,100) -> 2
>  LOG(1,100) -> NULL
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log]
> --NOTE-- In this JIRA. we only implement LOG(X). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6928) Kafka source: default topic needs to exist

2017-06-15 Thread Erik van Oosten (JIRA)
Erik van Oosten created FLINK-6928:
--

 Summary: Kafka source: default topic needs to exist
 Key: FLINK-6928
 URL: https://issues.apache.org/jira/browse/FLINK-6928
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.2.1, 1.3.0
Reporter: Erik van Oosten


When using a Kafka source, the defaultTopic needs to exist even when it is 
never used. It would be nice if fetching partition information for the default 
topic would be delayed until the moment a topic is actually used.

Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
default topic.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6928) Kafka source: default topic should not need to exist

2017-06-15 Thread Erik van Oosten (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik van Oosten updated FLINK-6928:
---
Summary: Kafka source: default topic should not need to exist  (was: Kafka 
source: default topic needs to exist)

> Kafka source: default topic should not need to exist
> 
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka source, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6773) Use compression (e.g. snappy) for full check/savepoints

2017-06-15 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter reassigned FLINK-6773:
-

Assignee: Stefan Richter

> Use compression (e.g. snappy) for full check/savepoints
> ---
>
> Key: FLINK-6773
> URL: https://issues.apache.org/jira/browse/FLINK-6773
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> We could use compression (e.g. snappy stream compression) to decrease the 
> size of our full checkpoints and savepoints. From some initial experiments, I 
> think there is great potential to achieve compression rates around 30-50%. 
> Given those numbers, I think this is very low hanging fruit to implement.
> One point to consider in the implementation is that compression blocks should 
> respect key-groups, i.e. typically it should make sense to compress per 
> key-group.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist

2017-06-15 Thread Erik van Oosten (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik van Oosten updated FLINK-6928:
---
Description: 
When using a Kafka sink, the defaultTopic needs to exist even when it is never 
used. It would be nice if fetching partition information for the default topic 
would be delayed until the moment a topic is actually used.

Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
default topic.

  was:
When using a Kafka source, the defaultTopic needs to exist even when it is 
never used. It would be nice if fetching partition information for the default 
topic would be delayed until the moment a topic is actually used.

Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
default topic.


> Kafka sink: default topic should not need to exist
> --
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka sink, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist

2017-06-15 Thread Erik van Oosten (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik van Oosten updated FLINK-6928:
---
Summary: Kafka sink: default topic should not need to exist  (was: Kafka 
source: default topic should not need to exist)

> Kafka sink: default topic should not need to exist
> --
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka source, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist

2017-06-15 Thread Erik van Oosten (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik van Oosten updated FLINK-6928:
---
Description: 
When using a Kafka sink, the defaultTopic needs to exist even when it is never 
used. It would be nice if fetching partition information for the default topic 
would be delayed until the moment a topic is actually used.

Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
default topic.

In addition, it would be nice if we could signal that the defaultTopic is not 
needed by passing {{null}}. Currently, a value for the defaultTopic is required.

  was:
When using a Kafka sink, the defaultTopic needs to exist even when it is never 
used. It would be nice if fetching partition information for the default topic 
would be delayed until the moment a topic is actually used.

Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
default topic.

It would be nice if we could signal that the defaultTopic is not needed by 
passing {{null}}. Currently, a value for the defaultTopic is required.


> Kafka sink: default topic should not need to exist
> --
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka sink, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.
> In addition, it would be nice if we could signal that the defaultTopic is not 
> needed by passing {{null}}. Currently, a value for the defaultTopic is 
> required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6928) Kafka sink: default topic should not need to exist

2017-06-15 Thread Erik van Oosten (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik van Oosten updated FLINK-6928:
---
Description: 
When using a Kafka sink, the defaultTopic needs to exist even when it is never 
used. It would be nice if fetching partition information for the default topic 
would be delayed until the moment a topic is actually used.

Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
default topic.

It would be nice if we could signal that the defaultTopic is not needed by 
passing {{null}}. Currently, a value for the defaultTopic is required.

  was:
When using a Kafka sink, the defaultTopic needs to exist even when it is never 
used. It would be nice if fetching partition information for the default topic 
would be delayed until the moment a topic is actually used.

Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
default topic.


> Kafka sink: default topic should not need to exist
> --
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka sink, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.
> It would be nice if we could signal that the defaultTopic is not needed by 
> passing {{null}}. Currently, a value for the defaultTopic is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6263) Leader error in Kafka producer on leader change (broker restart/failrue)

2017-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050152#comment-16050152
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6263:


Could you briefly describe how / when this error occurred? Was the Kafka 
brokers undergoing some operation?
It could make sense to introduce some pluggable error handler for these cases.

> Leader error in Kafka producer on leader change (broker restart/failrue)
> 
>
> Key: FLINK-6263
> URL: https://issues.apache.org/jira/browse/FLINK-6263
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>
> We have observed the following error in the Kafka producer
> java.lang.Exception: Failed to send data to Kafka: This server is not the 
> leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:376)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:293)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6928) Kafka sink: default topic should not need to exist

2017-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050157#comment-16050157
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6928:


I agree that the {{defaultTopic}} configuration is a bit "off" w.r.t. dynamic 
topics in the {{KeyedSerializationSchema}}.

Taking a step back, it might make sense to:

1. Make the support for routing to multiple dynamic topics more prominent in 
the first-level API of the producer.

2. Make setting the default an explicit option. e.g.
{code}
FlinkKafkaProducer prod = new FlinkKafkaProducer(serSchema, props, partitioner)
prod.setDefaultTopic("") // optional setting. If set, we fetch metadata for it 
in open.
{code}

What do you think?

> Kafka sink: default topic should not need to exist
> --
>
> Key: FLINK-6928
> URL: https://issues.apache.org/jira/browse/FLINK-6928
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Erik van Oosten
>
> When using a Kafka sink, the defaultTopic needs to exist even when it is 
> never used. It would be nice if fetching partition information for the 
> default topic would be delayed until the moment a topic is actually used.
> Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the 
> default topic.
> In addition, it would be nice if we could signal that the defaultTopic is not 
> needed by passing {{null}}. Currently, a value for the defaultTopic is 
> required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3219: [FLINK-5659] Harden FileBaseUtils#deleteFileOrDire...

2017-06-15 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/3219


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5659) FileBaseUtils#deleteFileOrDirectory not thread-safe on Windows

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050158#comment-16050158
 ] 

ASF GitHub Bot commented on FLINK-5659:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/3219


> FileBaseUtils#deleteFileOrDirectory not thread-safe on Windows
> --
>
> Key: FLINK-5659
> URL: https://issues.apache.org/jira/browse/FLINK-5659
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Local Runtime
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
>
> The {code}FileBaseUtils#deleteFileOrDirectory{code} is not thread-safe on 
> Windows.
> First you will run into AccessDeniedExceptions since one thread tried to 
> delete a file while another thread was already doing that, for which the file 
> has to be opened.
> Once you resolve those exceptions (by catching them double checking whether 
> the file still exists), you run into DirectoryNotEmptyExceptions since there 
> is some wacky timing/visibility issue when deleting files concurrently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6776) Use skip instead of seek for small forward repositioning in DFS streams

2017-06-15 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-6776.
-
   Resolution: Implemented
Fix Version/s: 1.4.0

Implemented in 9141379f6d

> Use skip instead of seek for small forward repositioning in DFS streams
> ---
>
> Key: FLINK-6776
> URL: https://issues.apache.org/jira/browse/FLINK-6776
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
> Fix For: 1.4.0
>
>
> Reading checkpoint meta data and finding key-groups in restores sometimes 
> require to seek in input streams. Currently, we always use a seek, even for 
> small position changes. As small true seeks are far more expensive than small 
> reads/skips, we should just skip over small gaps instead of performing the 
> seek.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4019: [FLINK-6776] [runtime] Use skip instead of seek for small...

2017-06-15 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4019
  
Merged in 9141379f6d.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4019: [FLINK-6776] [runtime] Use skip instead of seek fo...

2017-06-15 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/4019


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #:

2017-06-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:


https://github.com/apache/flink/commit/9141379f6d2654886d48154b453170cc23b89a87#commitcomment-22558742
  
In 
flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:
In 
flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
 on line 75:
What about negative seek values here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #:

2017-06-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:


https://github.com/apache/flink/commit/9141379f6d2654886d48154b453170cc23b89a87#commitcomment-22558746
  
Good change, but I think this could use a guarding unit test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6776) Use skip instead of seek for small forward repositioning in DFS streams

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050179#comment-16050179
 ] 

ASF GitHub Bot commented on FLINK-6776:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4019
  
Merged in 9141379f6d.


> Use skip instead of seek for small forward repositioning in DFS streams
> ---
>
> Key: FLINK-6776
> URL: https://issues.apache.org/jira/browse/FLINK-6776
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
> Fix For: 1.4.0
>
>
> Reading checkpoint meta data and finding key-groups in restores sometimes 
> require to seek in input streams. Currently, we always use a seek, even for 
> small position changes. As small true seeks are far more expensive than small 
> reads/skips, we should just skip over small gaps instead of performing the 
> seek.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #:

2017-06-15 Thread StefanRRichter
Github user StefanRRichter commented on the pull request:


https://github.com/apache/flink/commit/9141379f6d2654886d48154b453170cc23b89a87#commitcomment-22558757
  
In 
flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:
In 
flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
 on line 75:
They will trigger `forceSeek, as they should.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6776) Use skip instead of seek for small forward repositioning in DFS streams

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050180#comment-16050180
 ] 

ASF GitHub Bot commented on FLINK-6776:
---

Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/4019


> Use skip instead of seek for small forward repositioning in DFS streams
> ---
>
> Key: FLINK-6776
> URL: https://issues.apache.org/jira/browse/FLINK-6776
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
> Fix For: 1.4.0
>
>
> Reading checkpoint meta data and finding key-groups in restores sometimes 
> require to seek in input streams. Currently, we always use a seek, even for 
> small position changes. As small true seeks are far more expensive than small 
> reads/skips, we should just skip over small gaps instead of performing the 
> seek.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122152659
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Maybe a better solution is to change this code block into

```scala
val input1AccessExprs = for (i <- input1Mapping.indices)
  yield generateInputAccess(input1, input1Term, i, input1Mapping)

val input2AccessExprs = input2 match {
  case Some(ti) => for (i <- input2Mapping.indices)
yield generateInputAccess(ti, input2Term, i, input2Mapping)

  case None => Seq() // add nothing
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050206#comment-16050206
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122141845
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

`input2` should be `ti`,  here `input2` is an `Option` which can't be 
`PojoTypeInfo`.


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(

[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122141845
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

`input2` should be `ti`,  here `input2` is an `Option` which can't be 
`PojoTypeInfo`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050205#comment-16050205
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122152659
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Maybe a better solution is to change this code block into

```scala
val input1AccessExprs = for (i <- input1Mapping.indices)
  yield generateInputAccess(input1, input1Term, i, input1Mapping)

val input2AccessExprs = input2 match {
  case Some(ti) => for (i <- input2Mapping.indices)
yield generateInputAccess(ti, input2Term, i, input2Mapping)

  case None => Seq() // add nothing
}
```


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long recordTime, String urlKey) {
> super();
> this.recordTime = recordTime;
> this.urlKey = urlKey;
> }
> public String getUrlKey() {
> return urlKey;
> }
> publ

[jira] [Assigned] (FLINK-6602) Table source with defined time attributes allows empty string

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-6602:


Assignee: Zhe Li

> Table source with defined time attributes allows empty string
> -
>
> Key: FLINK-6602
> URL: https://issues.apache.org/jira/browse/FLINK-6602
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Zhe Li
> Attachments: getRowType.png
>
>
> {{DefinedRowtimeAttribute}} and {{DefinedProctimeAttribute}} are not checked 
> for empty strings.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6602) Table source with defined time attributes allows empty string

2017-06-15 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050224#comment-16050224
 ] 

Fabian Hueske commented on FLINK-6602:
--

The fix looks good! Please open a pull request with that :-)

I also gave you contributor permissions for the Flink JIRA.
You can now also assign other issues to yourself.

Thank you, Fabian

> Table source with defined time attributes allows empty string
> -
>
> Key: FLINK-6602
> URL: https://issues.apache.org/jira/browse/FLINK-6602
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Zhe Li
> Attachments: getRowType.png
>
>
> {{DefinedRowtimeAttribute}} and {{DefinedProctimeAttribute}} are not checked 
> for empty strings.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4120: [FLINK-6836] [tests] Fix YARNSessionCapacitySchedulerITCa...

2017-06-15 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4120
  
I first thought I had to downgrade, because of the aforementioned problem. 
However, Hadoop 2.8.0 needs at least httpclient `4.5.2`. Luckily, they fixed 
the problem with the `URLEncodedUtils#parse` method in `4.5.3`. Therefore I 
bumped the version again.

Concerning moving the dependency management to `flink-shaded-hadoop`: I 
think this is unfortunately not possible since due the immutable reactor builds 
from maven, `flink-yarn-tests` pulls all dependencies from 
`flink-shaded-hadoop` and `flink-shaded-yarn-tests`. This also means that 
depending on the classpath order, we either use the shaded Yarn mini-cluster or 
not. In the latter case we have to make sure that we are using `httpclient` 
`4.5.3` which is only the case if we define the dependency management in the 
parent pom.

As a side remark, given this situation with maven, I'm not sure whether we 
actually need the `flink-shaded-yarn-tests` module.

A proper solution would be fixing the problem with the immutable builds. 
For example, everything works properly if we first install the shaded jars and 
then build the flink-yarn-tests. Then maven will properly resolve all 
dependencies. In this case, I think it should be possible to move the 
dependency management into `flink-shaded-hadoop`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6836) Failing YARNSessionCapacitySchedulerITCase.testTaskManagerFailure

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050238#comment-16050238
 ] 

ASF GitHub Bot commented on FLINK-6836:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4120
  
I first thought I had to downgrade, because of the aforementioned problem. 
However, Hadoop 2.8.0 needs at least httpclient `4.5.2`. Luckily, they fixed 
the problem with the `URLEncodedUtils#parse` method in `4.5.3`. Therefore I 
bumped the version again.

Concerning moving the dependency management to `flink-shaded-hadoop`: I 
think this is unfortunately not possible since due the immutable reactor builds 
from maven, `flink-yarn-tests` pulls all dependencies from 
`flink-shaded-hadoop` and `flink-shaded-yarn-tests`. This also means that 
depending on the classpath order, we either use the shaded Yarn mini-cluster or 
not. In the latter case we have to make sure that we are using `httpclient` 
`4.5.3` which is only the case if we define the dependency management in the 
parent pom.

As a side remark, given this situation with maven, I'm not sure whether we 
actually need the `flink-shaded-yarn-tests` module.

A proper solution would be fixing the problem with the immutable builds. 
For example, everything works properly if we first install the shaded jars and 
then build the flink-yarn-tests. Then maven will properly resolve all 
dependencies. In this case, I think it should be possible to move the 
dependency management into `flink-shaded-hadoop`.


> Failing YARNSessionCapacitySchedulerITCase.testTaskManagerFailure
> -
>
> Key: FLINK-6836
> URL: https://issues.apache.org/jira/browse/FLINK-6836
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The master is currently unstable. The 
> {{YARNSessionCapacitySchedulerITCase.testTaskManagerFailure}} fails with 
> Hadoop version {{2.6.5}}, {{2.7.3}} and {{2.8.0}}.
> See this build [1] for example.
> [1] https://travis-ci.org/apache/flink/builds/238720589



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4093: [FLINK-6748] [table] [docs] Reworked Table API Pag...

2017-06-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4093


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


issues@flink.apache.org

2017-06-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4094


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050244#comment-16050244
 ] 

ASF GitHub Bot commented on FLINK-6750:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4094


> Table API / SQL Docs: Table Sources & Sinks Page
> 
>
> Key: FLINK-6750
> URL: https://issues.apache.org/jira/browse/FLINK-6750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>
> Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6748) Table API / SQL Docs: Table API Page

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050243#comment-16050243
 ] 

ASF GitHub Bot commented on FLINK-6748:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4093


> Table API / SQL Docs: Table API Page
> 
>
> Key: FLINK-6748
> URL: https://issues.apache.org/jira/browse/FLINK-6748
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>
> Update and refine {{./docs/dev/table/tableApi.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske resolved FLINK-6750.
--
   Resolution: Fixed
Fix Version/s: 1.4.0
   1.3.1

Done for 1.3.1 with 89850f21dc596b2b845ce98433263496e512ef54
Done for 1.4.0 with 232481572bb48e82880afdb2f7237af08a8404b5

> Table API / SQL Docs: Table Sources & Sinks Page
> 
>
> Key: FLINK-6750
> URL: https://issues.apache.org/jira/browse/FLINK-6750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
> Fix For: 1.3.1, 1.4.0
>
>
> Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reopened FLINK-6750:
--

> Table API / SQL Docs: Table Sources & Sinks Page
> 
>
> Key: FLINK-6750
> URL: https://issues.apache.org/jira/browse/FLINK-6750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
> Fix For: 1.3.1, 1.4.0
>
>
> Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Issue Comment Deleted] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-6750:
-
Comment: was deleted

(was: Done for 1.3.1 with 89850f21dc596b2b845ce98433263496e512ef54
Done for 1.4.0 with 232481572bb48e82880afdb2f7237af08a8404b5)

> Table API / SQL Docs: Table Sources & Sinks Page
> 
>
> Key: FLINK-6750
> URL: https://issues.apache.org/jira/browse/FLINK-6750
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
> Fix For: 1.3.1, 1.4.0
>
>
> Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6748) Table API / SQL Docs: Table API Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6748.

   Resolution: Done
Fix Version/s: 1.4.0
   1.3.1

Done for 1.3.1 with 89850f21dc596b2b845ce98433263496e512ef54
Done for 1.4.0 with 232481572bb48e82880afdb2f7237af08a8404b5

> Table API / SQL Docs: Table API Page
> 
>
> Key: FLINK-6748
> URL: https://issues.apache.org/jira/browse/FLINK-6748
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
> Fix For: 1.3.1, 1.4.0
>
>
> Update and refine {{./docs/dev/table/tableApi.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6747) Table API / SQL Docs: Streaming Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-6747:


Assignee: Fabian Hueske

> Table API / SQL Docs: Streaming Page
> 
>
> Key: FLINK-6747
> URL: https://issues.apache.org/jira/browse/FLINK-6747
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> Update and refine {{./docs/dev/table/streaming.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6747) Table API / SQL Docs: Streaming Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-6747:
-
Issue Type: Task  (was: Sub-task)
Parent: (was: FLINK-5354)

> Table API / SQL Docs: Streaming Page
> 
>
> Key: FLINK-6747
> URL: https://issues.apache.org/jira/browse/FLINK-6747
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> Update and refine {{./docs/dev/table/streaming.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6747) Table API / SQL Docs: Streaming Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-6747:
-
Description: 
Extend {{./docs/dev/table/streaming.md}} page.
Missing are sections about

- Dynamic Tables
- QueryConfiguration (state retention time)

  was:Update and refine {{./docs/dev/table/streaming.md}} in feature branch 
https://github.com/apache/flink/tree/tableDocs


> Table API / SQL Docs: Streaming Page
> 
>
> Key: FLINK-6747
> URL: https://issues.apache.org/jira/browse/FLINK-6747
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> Extend {{./docs/dev/table/streaming.md}} page.
> Missing are sections about
> - Dynamic Tables
> - QueryConfiguration (state retention time)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-6750:
-
Issue Type: Task  (was: Sub-task)
Parent: (was: FLINK-5354)

> Table API / SQL Docs: Table Sources & Sinks Page
> 
>
> Key: FLINK-6750
> URL: https://issues.apache.org/jira/browse/FLINK-6750
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
> Fix For: 1.3.1, 1.4.0
>
>
> Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6751) Table API / SQL Docs: UDFs Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-6751:
-
Issue Type: Task  (was: Sub-task)
Parent: (was: FLINK-5354)

> Table API / SQL Docs: UDFs Page
> ---
>
> Key: FLINK-6751
> URL: https://issues.apache.org/jira/browse/FLINK-6751
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Shaoxuan Wang
>
> Update and refine {{./docs/dev/table/udfs.md}} in feature branch 
> https://github.com/apache/flink/tree/tableDocs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-6750:
-
Description: 
Update and refine the documentation about TableSources and TableSinks.
There are a few TODOs left in {{./docs/dev/table/sourceSinks.md}}

  was:Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch 
https://github.com/apache/flink/tree/tableDocs


> Table API / SQL Docs: Table Sources & Sinks Page
> 
>
> Key: FLINK-6750
> URL: https://issues.apache.org/jira/browse/FLINK-6750
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
> Fix For: 1.3.1, 1.4.0
>
>
> Update and refine the documentation about TableSources and TableSinks.
> There are a few TODOs left in {{./docs/dev/table/sourceSinks.md}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6751) Table API / SQL Docs: UDFs Page

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-6751:
-
Description: 
Update and extend the documentation of UDFs in the Table API / SQL: 
{{./docs/dev/table/udfs.md}}
Missing sections:

- Registration of UDFs
- UDAGGs


  was:Update and refine {{./docs/dev/table/udfs.md}} in feature branch 
https://github.com/apache/flink/tree/tableDocs


> Table API / SQL Docs: UDFs Page
> ---
>
> Key: FLINK-6751
> URL: https://issues.apache.org/jira/browse/FLINK-6751
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Shaoxuan Wang
>
> Update and extend the documentation of UDFs in the Table API / SQL: 
> {{./docs/dev/table/udfs.md}}
> Missing sections:
> - Registration of UDFs
> - UDAGGs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5354) Split up Table API documentation into multiple pages

2017-06-15 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-5354.

   Resolution: Done
Fix Version/s: 1.4.0
   1.3.1

Done for 1.3.1 with 398012aa165d5d6cee9982475ea9cded60ae6ae3
Done for 1.4.0 with a5d93a56cb37e691ec9bb06d17c76151e7619267

> Split up Table API documentation into multiple pages 
> -
>
> Key: FLINK-5354
> URL: https://issues.apache.org/jira/browse/FLINK-5354
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Table API & SQL
>Reporter: Timo Walther
>Assignee: Fabian Hueske
> Fix For: 1.3.1, 1.4.0
>
>
> The Table API documentation page is quite large at the moment. We should 
> split it up into multiple pages:
> Here is my suggestion:
> - Overview (Datatypes, Config, Registering Tables, Examples)
> - TableSources and Sinks
> - Table API
> - SQL
> - Functions



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6929) Add documentation for Table API OVER windows

2017-06-15 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-6929:


 Summary: Add documentation for Table API OVER windows
 Key: FLINK-6929
 URL: https://issues.apache.org/jira/browse/FLINK-6929
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Table API & SQL
Affects Versions: 1.3.1, 1.4.0
Reporter: Fabian Hueske


The Table API documentation is currently lacking a description of OVER windows.
The page has a placeholder section with a TODO: 
{{./docs/dev/table/tableApi.md}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6917) Introduce test base for end-to-end testing serializer config snapshotting, restoring, and compatibility check roundtrips

2017-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6917:
---
Labels:   (was: ta)

> Introduce test base for end-to-end testing serializer config snapshotting, 
> restoring, and compatibility check roundtrips 
> -
>
> Key: FLINK-6917
> URL: https://issues.apache.org/jira/browse/FLINK-6917
> Project: Flink
>  Issue Type: Test
>  Components: State Backends, Checkpointing, Tests, Type Serialization 
> System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Currently, we only have end-to-end tests of the serializer snapshotting and 
> restore roundtrip for the {{PojoSerializer}}, {{KryoSerializer}}, and Scala 
> type serializers.
> They are all written differently with varying coverage of behavioural tests, 
> and scattered in several different test classes.
> This JIRA tracks introducing a common test base for the serialization 
> roundtrip for all serializers in Flink, and also activating it for all 
> serializers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122168412
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Right, I noticed that as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050271#comment-16050271
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122168412
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Right, I noticed that as well


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long recordTime, String urlKey) {
> super();

[jira] [Updated] (FLINK-6917) Introduce test base for end-to-end testing serializer config snapshotting, restoring, and compatibility check roundtrips

2017-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6917:
---
Labels: ta  (was: )

> Introduce test base for end-to-end testing serializer config snapshotting, 
> restoring, and compatibility check roundtrips 
> -
>
> Key: FLINK-6917
> URL: https://issues.apache.org/jira/browse/FLINK-6917
> Project: Flink
>  Issue Type: Test
>  Components: State Backends, Checkpointing, Tests, Type Serialization 
> System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>  Labels: ta
>
> Currently, we only have end-to-end tests of the serializer snapshotting and 
> restore roundtrip for the {{PojoSerializer}}, {{KryoSerializer}}, and Scala 
> type serializers.
> They are all written differently with varying coverage of behavioural tests, 
> and scattered in several different test classes.
> This JIRA tracks introducing a common test base for the serialization 
> roundtrip for all serializers in Flink, and also activating it for all 
> serializers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122168658
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

I think I tried that but it cause a couple of tests to fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050272#comment-16050272
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122168658
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

I think I tried that but it cause a couple of tests to fail.


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long recordTime, String urlKey) {
> super();
> this.recordTime = recordTime;
> this.urlKey = urlKey;
> }
> public String getUrlKey() {
> return urlKey;
> }
> public void setUrlKey(String urlKey) {
> this.urlKey = urlKey;
> }
> public long getRecordTime() {
> return recordTime;
> }
> public void setRecordTime(long recordTime) {
> this.recordTime = recordTime;
> }
> public long getHttpGetMessageCount() {
> return httpGetMessageCount;
> }

[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122176275
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

@fhueske @wuchong thanks for try improve this PR. @wuchong For your 
solution, In my side also got two error when I run `TimeAttributesITCase`. Have 
you run all test in your local side ? 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050326#comment-16050326
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122176275
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

@fhueske @wuchong thanks for try improve this PR. @wuchong For your 
solution, In my side also got two error when I run `TimeAttributesITCase`. Have 
you run all test in your local side ? 



> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private l

[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122176847
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

@wuchong I think the current PR. works well. So I suggest you can open a 
new PR. to improve it.
What do you think? @fhueske @wuchong 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050332#comment-16050332
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122176847
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

@wuchong I think the current PR. works well. So I suggest you can open a 
new PR. to improve it.
What do you think? @fhueske @wuchong 


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
>

[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122178152
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Emm..  But it works good in my local. I pushed it to my branch, can you 
check it out?  
https://github.com/wuchong/flink/commit/ad97e84ca561ea32d2ce5e0779f4aff6429b5523


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050341#comment-16050341
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122178152
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Emm..  But it works good in my local. I pushed it to my branch, can you 
check it out?  
https://github.com/wuchong/flink/commit/ad97e84ca561ea32d2ce5e0779f4aff6429b5523


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> 

[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122180812
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

I think your branch looks good @wuchong. I tried a similar thing but did 
not change the `generateFieldAccess()` methods because I was afraid of side 
effects if we use the mapping for any kind types (instead of just POJOs).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050358#comment-16050358
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122180812
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

I think your branch looks good @wuchong. I tried a similar thing but did 
not change the `generateFieldAccess()` methods because I was afraid of side 
effects if we use the mapping for any kind types (instead of just POJOs).


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long sta

[GitHub] flink issue #4125: [FLINK-6682] [checkpoints] Improve error message in case ...

2017-06-15 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4125
  
Hi @zentol . Please helps review if you are free, should I add some extra 
information ? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6682) Improve error message in case parallelism exceeds maxParallelism

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050370#comment-16050370
 ] 

ASF GitHub Bot commented on FLINK-6682:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4125
  
Hi @zentol . Please helps review if you are free, should I add some extra 
information ? Thanks.


> Improve error message in case parallelism exceeds maxParallelism
> 
>
> Key: FLINK-6682
> URL: https://issues.apache.org/jira/browse/FLINK-6682
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>
> When restoring a job with a parallelism that exceeds the maxParallelism we're 
> not providing a useful error message, as all you get is an 
> IllegalArgumentException:
> {code}
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed
> at 
> org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:343)
> at 
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
> ... 22 more
> Caused by: java.lang.IllegalArgumentException
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at 
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.createKeyGroupPartitions(StateAssignmentOperation.java:449)
> at 
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:117)
> at 
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:102)
> at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1038)
> at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1101)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-5991:
---
Labels:   (was: api-breaking api-deprecation)

> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-5991:
---
Labels: api-breaking api-deprecation  (was: )

> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050423#comment-16050423
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122194058
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

I check the usages of `generateFieldAccess()` and the changes seem to be OK.


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long recor

[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122194058
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

I check the usages of `generateFieldAccess()` and the changes seem to be OK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6046) Add support for oversized messages during deployment

2017-06-15 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-6046:
---
Component/s: Network

> Add support for oversized messages during deployment
> 
>
> Key: FLINK-6046
> URL: https://issues.apache.org/jira/browse/FLINK-6046
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> This is the non-FLIP6 version of FLINK-4346, restricted to deployment 
> messages:
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than {{akka.framesize}} as may happen for task deployments via the 
> {{TaskDeploymentDescriptor}}.
> We should use the {{BlobServer}} to offload big data items (if possible) and 
> make use of any potential distributed file system behind. This way, not only 
> do we avoid the akka framesize restriction, but may also be able to speed up 
> deployment.
> I suggest the following changes:
>   - the sender, i.e. the {{Execution}} class, tries to store the serialized 
> job information and serialized task information (if oversized) from the 
> {{TaskDeploymentDescriptor}} (tdd) on the {{BlobServer}} as a single 
> {{NAME_ADDRESSABLE}} blob under its job ID (if this does not work, we send 
> the whole tdd as usual via akka)
>   - if stored in a blob, these data items are removed from the tdd
>   - the receiver, i.e. the {{TaskManager}} class, tries to retrieve any 
> offloaded data after receiving the {{TaskDeploymentDescriptor}} from akka; it 
> re-assembles the original tdd
>   - the stored blob may be deleted after re-assembly of the tdd
> Further (future) changes may include:
>   - separating the serialized job information and serialized task information 
> into two files and re-use the first one for all tasks
>   - not re-deploying these two during job recovery (if possible)
>   - then, as all other {{NAME_ADDRESSABLE}} blobs, these offloaded blobs may 
> be removed when the job enters a final state instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6046) Add support for oversized messages during deployment

2017-06-15 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050450#comment-16050450
 ] 

Nico Kruber commented on FLINK-6046:


since the broken cleanup for the current blob store is a deal breaker for 
offloading oversized messages, we should re-vamp this issue after finishing 
FLIP-19

> Add support for oversized messages during deployment
> 
>
> Key: FLINK-6046
> URL: https://issues.apache.org/jira/browse/FLINK-6046
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> This is the non-FLIP6 version of FLINK-4346, restricted to deployment 
> messages:
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than {{akka.framesize}} as may happen for task deployments via the 
> {{TaskDeploymentDescriptor}}.
> We should use the {{BlobServer}} to offload big data items (if possible) and 
> make use of any potential distributed file system behind. This way, not only 
> do we avoid the akka framesize restriction, but may also be able to speed up 
> deployment.
> I suggest the following changes:
>   - the sender, i.e. the {{Execution}} class, tries to store the serialized 
> job information and serialized task information (if oversized) from the 
> {{TaskDeploymentDescriptor}} (tdd) on the {{BlobServer}} as a single 
> {{NAME_ADDRESSABLE}} blob under its job ID (if this does not work, we send 
> the whole tdd as usual via akka)
>   - if stored in a blob, these data items are removed from the tdd
>   - the receiver, i.e. the {{TaskManager}} class, tries to retrieve any 
> offloaded data after receiving the {{TaskDeploymentDescriptor}} from akka; it 
> re-assembles the original tdd
>   - the stored blob may be deleted after re-assembly of the tdd
> Further (future) changes may include:
>   - separating the serialized job information and serialized task information 
> into two files and re-use the first one for all tasks
>   - not re-deploying these two during job recovery (if possible)
>   - then, as all other {{NAME_ADDRESSABLE}} blobs, these offloaded blobs may 
> be removed when the job enters a final state instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5908) Blob Cache can (rarely) get corrupted on failed blob downloads

2017-06-15 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-5908:
---
Fix Version/s: (was: 1.4.0)
   (was: 1.2.2)

> Blob Cache can (rarely) get corrupted on failed blob downloads
> --
>
> Key: FLINK-5908
> URL: https://issues.apache.org/jira/browse/FLINK-5908
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>
> The Blob Cache downloads files directly to the target file location.
> While it tries to clean up failed attempts, there is a change that this 
> cleanup does not complete.
> In that case, we have a corrupt file at the target location. The blob cache 
> then assumes that it already has the file cached already and future requests 
> do not attempt to re-download the file.
> The fix would be to download to a temp file name, validate the integrity, and 
> rename to the target file path when the validation succeeds.
> The validation for "content addressable" could even include validating the 
> checksum hash.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5908) Blob Cache can (rarely) get corrupted on failed blob downloads

2017-06-15 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-5908:
---
Affects Version/s: 1.4.0
   1.3.0

> Blob Cache can (rarely) get corrupted on failed blob downloads
> --
>
> Key: FLINK-5908
> URL: https://issues.apache.org/jira/browse/FLINK-5908
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Network
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
>Reporter: Stephan Ewen
>
> The Blob Cache downloads files directly to the target file location.
> While it tries to clean up failed attempts, there is a change that this 
> cleanup does not complete.
> In that case, we have a corrupt file at the target location. The blob cache 
> then assumes that it already has the file cached already and future requests 
> do not attempt to re-download the file.
> The fix would be to download to a temp file name, validate the integrity, and 
> rename to the target file path when the validation succeeds.
> The validation for "content addressable" could even include validating the 
> checksum hash.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5908) Blob Cache can (rarely) get corrupted on failed blob downloads

2017-06-15 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-5908:
---
Component/s: Network

> Blob Cache can (rarely) get corrupted on failed blob downloads
> --
>
> Key: FLINK-5908
> URL: https://issues.apache.org/jira/browse/FLINK-5908
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Network
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
>Reporter: Stephan Ewen
>
> The Blob Cache downloads files directly to the target file location.
> While it tries to clean up failed attempts, there is a change that this 
> cleanup does not complete.
> In that case, we have a corrupt file at the target location. The blob cache 
> then assumes that it already has the file cached already and future requests 
> do not attempt to re-download the file.
> The fix would be to download to a temp file name, validate the integrity, and 
> rename to the target file path when the validation succeeds.
> The validation for "content addressable" could even include validating the 
> checksum hash.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5908) Blob Cache can (rarely) get corrupted on failed blob downloads

2017-06-15 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050456#comment-16050456
 ] 

Nico Kruber commented on FLINK-5908:


let's solve this during the re-write of the BLOB store with FLIP-19

> Blob Cache can (rarely) get corrupted on failed blob downloads
> --
>
> Key: FLINK-5908
> URL: https://issues.apache.org/jira/browse/FLINK-5908
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, Network
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
>Reporter: Stephan Ewen
>
> The Blob Cache downloads files directly to the target file location.
> While it tries to clean up failed attempts, there is a change that this 
> cleanup does not complete.
> In that case, we have a corrupt file at the target location. The blob cache 
> then assumes that it already has the file cached already and future requests 
> do not attempt to re-download the file.
> The fix would be to download to a temp file name, validate the integrity, and 
> rename to the target file path when the validation succeeds.
> The validation for "content addressable" could even include validating the 
> checksum hash.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6930) Selecting window start / end on row-based Tumble/Slide window causes NPE

2017-06-15 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-6930:


 Summary: Selecting window start / end on row-based Tumble/Slide 
window causes NPE
 Key: FLINK-6930
 URL: https://issues.apache.org/jira/browse/FLINK-6930
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.3.0, 1.4.0
Reporter: Fabian Hueske


Selecting the start and end properties of a row-based window causes a 
NullPointerException.
The following program:

{code}
val windowedTable = table
  .window(Tumble over 2.rows on 'proctime as 'w)
  .groupBy('w, 'string)
  .select('string as 'n, 'int.count as 'cnt, 'w.start as 's, 'w.end as 'e)
{code}

causes 

{code}
Caused by: java.lang.NullPointerException
at 
org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1556)
at 
org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1551)
at DataStreamCalcRule$40.processElement(Unknown Source)
at 
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:67)
at 
org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:890)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:868)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:75)
at 
org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:37)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:599)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:456)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
{code}

We should validate that the start and end window properties are not accessed if 
the window is defined on row-counts.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122201344
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

@fhueske thanks for your reviewing. So if the travis pass, I will merge the 
code ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050474#comment-16050474
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122201344
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

@fhueske thanks for your reviewing. So if the travis pass, I will merge the 
code ? 


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(lo

[GitHub] flink issue #4120: [FLINK-6836] [tests] Fix YARNSessionCapacitySchedulerITCa...

2017-06-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4120
  
+1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6836) Failing YARNSessionCapacitySchedulerITCase.testTaskManagerFailure

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050495#comment-16050495
 ] 

ASF GitHub Bot commented on FLINK-6836:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4120
  
+1 to merge.


> Failing YARNSessionCapacitySchedulerITCase.testTaskManagerFailure
> -
>
> Key: FLINK-6836
> URL: https://issues.apache.org/jira/browse/FLINK-6836
> Project: Flink
>  Issue Type: Bug
>  Components: Tests, YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The master is currently unstable. The 
> {{YARNSessionCapacitySchedulerITCase.testTaskManagerFailure}} fails with 
> Hadoop version {{2.6.5}}, {{2.7.3}} and {{2.8.0}}.
> See this build [1] for example.
> [1] https://travis-ci.org/apache/flink/builds/238720589



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6931) Support custom compression formats for checkpoints (+Upgrade/Compatibility)

2017-06-15 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-6931:
-

 Summary: Support custom compression formats for checkpoints 
(+Upgrade/Compatibility)
 Key: FLINK-6931
 URL: https://issues.apache.org/jira/browse/FLINK-6931
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Stefan Richter


With FLINK-6773, we introduced optional snappy compression for keyed state in 
full checkpoints and savepoints. We should offer users a way to register their 
own compression formats with the {{ExecutionConfig}}. For this, we should also 
have a compatibility story, very similar to what 
{{TypeSerializerConfigSnapshot}} doesfor type serializers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122205799
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Thanks @wuchong I check it in my side. nice. 
+1

Best,
SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050502#comment-16050502
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122205799
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Thanks @wuchong I check it in my side. nice. 
+1

Best,
SunJincheng


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Cont

[GitHub] flink pull request #4130: [FLINK-6773] [checkpoint] Introduce compression (s...

2017-06-15 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/4130

[FLINK-6773] [checkpoint] Introduce compression (snappy) for keyed st…

This PR introduce optional snappy compression for the keyed state in full 
checkpoints and savepoints. This feature can be activated through a flag in 
{{ExecutionConfig}}.

For the future, we can also support user-defined compression schemes, which 
will also require a upgrade and compatibility feature, as described in 
FLINK-6931.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink compressedKeyGroups

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4130.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4130






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6773) Use compression (e.g. snappy) for full check/savepoints

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050513#comment-16050513
 ] 

ASF GitHub Bot commented on FLINK-6773:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4130
  
CC @tillrohrmann 


> Use compression (e.g. snappy) for full check/savepoints
> ---
>
> Key: FLINK-6773
> URL: https://issues.apache.org/jira/browse/FLINK-6773
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> We could use compression (e.g. snappy stream compression) to decrease the 
> size of our full checkpoints and savepoints. From some initial experiments, I 
> think there is great potential to achieve compression rates around 30-50%. 
> Given those numbers, I think this is very low hanging fruit to implement.
> One point to consider in the implementation is that compression blocks should 
> respect key-groups, i.e. typically it should make sense to compress per 
> key-group.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6773) Use compression (e.g. snappy) for full check/savepoints

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050508#comment-16050508
 ] 

ASF GitHub Bot commented on FLINK-6773:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/4130

[FLINK-6773] [checkpoint] Introduce compression (snappy) for keyed st…

This PR introduce optional snappy compression for the keyed state in full 
checkpoints and savepoints. This feature can be activated through a flag in 
{{ExecutionConfig}}.

For the future, we can also support user-defined compression schemes, which 
will also require a upgrade and compatibility feature, as described in 
FLINK-6931.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink compressedKeyGroups

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4130.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4130






> Use compression (e.g. snappy) for full check/savepoints
> ---
>
> Key: FLINK-6773
> URL: https://issues.apache.org/jira/browse/FLINK-6773
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> We could use compression (e.g. snappy stream compression) to decrease the 
> size of our full checkpoints and savepoints. From some initial experiments, I 
> think there is great potential to achieve compression rates around 30-50%. 
> Given those numbers, I think this is very low hanging fruit to implement.
> One point to consider in the implementation is that compression blocks should 
> respect key-groups, i.e. typically it should make sense to compress per 
> key-group.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4130: [FLINK-6773] [checkpoint] Introduce compression (snappy) ...

2017-06-15 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4130
  
CC @tillrohrmann 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6886) Fix Timestamp field can not be selected in event time case when toDataStream[T], `T` not a `Row` Type.

2017-06-15 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050535#comment-16050535
 ] 

Fabian Hueske commented on FLINK-6886:
--

Maybe there's another way to fix this problem. I played a bit around and found 
the following:

The following Table API query is executed correctly:

{code}
val table = stream.toTable(tEnv, 'l, 'i, 'n, 'proctime.proctime)

val windowedTable = table
  .window(Tumble over 2.seconds on 'proctime as 'w)
  .groupBy('w, 'n)
  .select('n, 'i.count as 'cnt, 'w.start as 's, 'w.end as 'e)
val results = windowedTable.toAppendStream[MP](queryConfig)

// POJO

class MP(var s: Timestamp, var e: Timestamp, var cnt: Long, var n: String) {
  def this() { this(null, null, 0, null) }
  override def toString: String = s"$n,${s.toString},${e.toString},$cnt"
}
{code}

whereas the equivalent SQL query fails with the reported exception ("The field 
types of physical and logical row types do not match")

{code}
val sqlTable = tEnv.sql(
  s"""SELECT TUMBLE_START(proctime, INTERVAL '2' SECOND) AS s,
|  TUMBLE_END(proctime, INTERVAL '2' SECOND) AS e,
|  n,
|  COUNT(i) as cnt
|FROM $table
|GROUP BY n, TUMBLE(proctime, INTERVAL '2' SECOND)
|
  """.stripMargin)

val results = sqlTable.toAppendStream[MP](queryConfig)
{code}

The plans of both queries look similar, but the SQL plan seems to lack the 
correct final projection:

{code}
// Table API plan
== Abstract Syntax Tree ==
LogicalProject(n=[$0], cnt=[AS($1, 'cnt')], s=[AS($2, 's')], e=[AS($3, 'e')])
  LogicalWindowAggregate(group=[{0}], TMP_0=[COUNT($1)])
LogicalProject(n=[$2], i=[$1], proctime=[$3])
  LogicalTableScan(table=[[_DataStreamTable_0]])

== Optimized Logical Plan ==
DataStreamCalc(select=[n, TMP_0 AS cnt, TMP_1 AS s, TMP_2 AS e])
  DataStreamGroupWindowAggregate(groupBy=[n], window=[TumblingGroupWindow('w, 
'proctime, 2000.millis)], select=[n, COUNT(i) AS TMP_0, start('w) AS TMP_1, 
end('w) AS TMP_2])
DataStreamCalc(select=[n, i, proctime])
  DataStreamScan(table=[[_DataStreamTable_0]])

// SQL plans
== Abstract Syntax Tree ==
LogicalProject(s=[TUMBLE_START($1)], e=[TUMBLE_END($1)], n=[$0], cnt=[$2])
  LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
LogicalProject(n=[$2], $f1=[TUMBLE($3, 2000)], i=[$1])
  LogicalTableScan(table=[[UnnamedTable$3]])

== Optimized Logical Plan ==
DataStreamCalc(select=[w$start, w$end, n, cnt])
  DataStreamGroupWindowAggregate(groupBy=[n], window=[TumblingGroupWindow('w$, 
'proctime, 2000.millis)], select=[n, COUNT(i) AS cnt, start('w$) AS w$start, 
end('w$) AS w$end])
DataStreamCalc(select=[n, proctime, i])
  DataStreamScan(table=[[_DataStreamTable_0]])
{code}

So this doesn't seem to be a principled issue with the time attributes or 
window properties but rather an issue of the SQL optimization.

What do you think [~sunjincheng121] and [~jark]?

> Fix Timestamp field can not be selected in event time case when  
> toDataStream[T], `T` not a `Row` Type.
> ---
>
> Key: FLINK-6886
> URL: https://issues.apache.org/jira/browse/FLINK-6886
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently for event-time window(group/over), When contain `Timestamp` type 
> field in `SELECT Clause`, And toDataStream[T], `T` not a `Row` Type, Such 
> `PojoType`, will throw a exception. In this JIRA. will fix this bug. For 
> example:
> Group Window on SQL:
> {code}
> SELECT name, max(num) as myMax, TUMBLE_START(rowtime, INTERVAL '5' SECOND) as 
> winStart,TUMBLE_END(rowtime, INTERVAL '5' SECOND) as winEnd FROM T1 GROUP BY 
> name, TUMBLE(rowtime, INTERVAL '5' SECOND)
> {code}
> Throw Exception:
> {code}
> org.apache.flink.table.api.TableException: The field types of physical and 
> logical row types do not match.This is a bug and should not happen. Please 
> file an issue.
>   at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
>   at 
> org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:721)
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:247)
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:647)
> {code}
> In fact, when we solve this exception,subsequent other exceptions will be 
> thrown. The real reason is {{TableEnvironment#generateRowConverterFunction}} 
> method bug. So in this JIRA. will fix it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122212537
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Yes, @wuchong please merge :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail

2017-06-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050537#comment-16050537
 ] 

ASF GitHub Bot commented on FLINK-6896:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4111#discussion_r122212537
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -871,12 +871,24 @@ class CodeGenerator(
   returnType: TypeInformation[_ <: Any],
   resultFieldNames: Seq[String])
 : GeneratedExpression = {
-val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
-  yield generateInputAccess(input1, input1Term, i, input1Mapping)
+
+val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
+  for (i <- 0 until input1Mapping.length)
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+} else {
+  for (i <- 0 until input1.getArity if input1Mapping.contains(i))
+yield generateInputAccess(input1, input1Term, i, input1Mapping)
+}
 
 val input2AccessExprs = input2 match {
-  case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
-yield generateInputAccess(ti, input2Term, i, input2Mapping)
+  case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
--- End diff --

Yes, @wuchong please merge :-)


> Creating a table from a POJO and use table sink to output fail
> --
>
> Key: FLINK-6896
> URL: https://issues.apache.org/jira/browse/FLINK-6896
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Mark You
>Assignee: sunjincheng
> Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
> public static void main(String[] args) throws Exception {
> List data = new ArrayList();
> data.add(new Content(1L, "Hi"));
> data.add(new Content(2L, "Hallo"));
> data.add(new Content(3L, "Hello"));
> data.add(new Content(4L, "Hello"));
> data.add(new Content(7L, "Hello"));
> data.add(new Content(8L, "Hello world"));
> data.add(new Content(16L, "Hello world"));
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> DataStream stream = env.fromCollection(data);
> DataStream stream2 = stream.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) {
> /**
>  * 
>  */
> private static final long serialVersionUID = 
> 410512296011057717L;
> @Override
> public long extractTimestamp(Content element) {
> return element.getRecordTime();
> }
> });
> final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
> Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
> Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
> 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
> //table.printSchema();
> TableSink windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
> WriteMode.OVERWRITE);
> windowTable.writeToSink(windowSink);
> // tableEnv.toDataStream(windowTable, Row.class).print();
> env.execute();
> }
> public static class Content implements Serializable {
> /**
>  * 
>  */
> private static final long serialVersionUID = 1429246948772430441L;
> private String urlKey;
> private long recordTime;
> // private String recordTimeStr;
> private long httpGetMessageCount;
> private long httpPostMessageCount;
> private long uplink;
> private long downlink;
> private long statusCode;
> private long statusCodeCount;
> public Content() {
> super();
> }
> public Content(long recordTime, String urlKey) {
> super();

[jira] [Commented] (FLINK-4641) Support branching CEP patterns

2017-06-15 Thread Alexander Chermenin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050551#comment-16050551
 ] 

Alexander Chermenin commented on FLINK-4641:


Hi [~dian.fu]! Yes of course, you're welcome! Unfortunately I don't have enough 
time to do it.

> Support branching CEP patterns 
> ---
>
> Key: FLINK-4641
> URL: https://issues.apache.org/jira/browse/FLINK-4641
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Till Rohrmann
>Assignee: Alexander Chermenin
>
> We should add support for branching CEP patterns to the Pattern API. 
> {code}
> |--> B --|
> ||
> A -- --> D
> ||
> |--> C --|
> {code}
> This feature will require changes to the {{Pattern}} class and the 
> {{NFACompiler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6932) Update the inaccessible Dataflow Model paper link

2017-06-15 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-6932:

Description:  I tried to access the Dataflow Model paper link which under 
[https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html],
 then it gives me an error [ 404 ] instead.  (was:  I tried to access the 
[#Dataflow Model paper] link which under 
[https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html],
 then it gives me an error [ 404 ] instead.)

> Update the inaccessible Dataflow Model paper link
> -
>
> Key: FLINK-6932
> URL: https://issues.apache.org/jira/browse/FLINK-6932
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: mingleizhang
>Assignee: mingleizhang
>  Labels: None
>
>  I tried to access the Dataflow Model paper link which under 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html],
>  then it gives me an error [ 404 ] instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6932) Update the inaccessible Dataflow Model paper link

2017-06-15 Thread mingleizhang (JIRA)
mingleizhang created FLINK-6932:
---

 Summary: Update the inaccessible Dataflow Model paper link
 Key: FLINK-6932
 URL: https://issues.apache.org/jira/browse/FLINK-6932
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: mingleizhang
Assignee: mingleizhang


 I tried to access the [#Dataflow Model paper] link which under 
[https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html],
 then it gives me an error [ 404 ] instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4078: [FLINK-6693] [table] Support DATE_FORMAT function ...

2017-06-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4078#discussion_r122255414
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime;
+
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.DateTimeFormatterBuilder;
+
+/**
+ * Built-in scalar functions for date time related operations.
+ */
+public class DateTimeFunctions {
+   private static final int PIVOT_YEAR = 2020;
+
+   private static final ThreadLocalCache 
DATETIME_FORMATTER_CACHE =
+   new ThreadLocalCache(100) {
+   @Override
+   protected DateTimeFormatter getNewInstance(String 
format) {
+   return createDateTimeFormatter(format);
+   }
+   };
+
+   public static String dateFormat(long ts, String formatString) {
+   DateTimeFormatter formatter = 
DATETIME_FORMATTER_CACHE.get(formatString);
--- End diff --

Would it make sense to add a shortcut during code-gen if the pattern is a 
string literal (this should be the most common case) to avoid the lookup?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >