[jira] [Commented] (FLINK-3086) ExpressionParser does not support concatenation of suffix operations

2016-04-05 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227709#comment-15227709
 ] 

ramkrishna.s.vasudevan commented on FLINK-3086:
---

This also includes abs() operator when used with an int  like for eg 1.abs() 
does not work.

> ExpressionParser does not support concatenation of suffix operations
> 
>
> Key: FLINK-3086
> URL: https://issues.apache.org/jira/browse/FLINK-3086
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Timo Walther
>
> The ExpressionParser of the Table API does not support concatenation of 
> suffix operations. e.g. 
> {code}table.select("field.cast(STRING).substring(2)"){code} throws  an 
> exception.
> {code}
> org.apache.flink.api.table.ExpressionException: Could not parse expression: 
> string matching regex `\z' expected but `.' found
>   at 
> org.apache.flink.api.table.parser.ExpressionParser$.parseExpressionList(ExpressionParser.scala:224)
> {code}
> However, the Scala implicit Table Expression API supports this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-1934) Add approximative k-nearest-neighbours (kNN) algorithm to machine learning library

2016-04-05 Thread Daniel Blazevski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227632#comment-15227632
 ] 

Daniel Blazevski edited comment on FLINK-1934 at 4/6/16 3:07 AM:
-

[~chiwanpark] [~till.rohrmann]

I have a Flink version -- still a bit preliminary -- of the approximate knn up 
and running.  The exact knn using a quadtree performs quite bad in 
moderate-to-high spatial dimension (e.g 20,000 test and training points in 6D, 
the quadtree is worse, but no worries I took care of this and the exact decides 
when to use quadtree or not).  

https://github.com/danielblazevski/flink/blob/FLINK-1934/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/nn/zknn.scala

A preliminary test shows good scaling with the number when the test + training 
points are increased.  

8,000 points in 6D (i.e. 8,000 test points and 8,000 training points):
Elapsed time approx =   : 2s
Elapsed time exact =   : 27s

64,000 in 6D:  
Elapsed time approx =   : 6s
(didn't run the exact version, we know it's O(N^2))

I will have to clean things up, add edge cases, etc which may slow down the 
run-time a bit, but will definitely not increase the complexity of the 
algorithm with respect to the number of test/training points.

This still use a cross product, which I was hoping to avoid, but not sure if 
that's possible.  Any thoughts?  Basically the idea is to hash the test/train 
set to 1D (I use the z-value hash based on [1]). 

I still have not implemented the ideas in [1] in full.  The full solution is 
quite complex.  They do a bunch of load balancing that I'm still learning, and 
not quite sure of the payoff.  One option could be that I clean up what I have 
now and optimize since it's already performing well, and we open a new issue 
for to do all the steps in [1].  

There are still many things to clean up, but any cleaning/edge cases will not 
add in computational complexity with respect to the number of test points.  
e.g. I now convert the coordinates to integers and ignore the decimal part and 
there are now lots of collisions in the z-value hash, normalizing the data and 
adding a fixed max number of bits to compute the z-value is needed, but will 
definitely not increase the complexity with respect to adding more 
test/training points (this is described towards the end of [3])

Any thoughts?


was (Author: danielblazevski):
[~chiwanpark] [~till.rohrmann]

I have a Flink version -- still a bit preliminary -- of the approximate knn up 
and running.  The exact knn using a quadtree performs quite bad in 
moderate-to-high spatial dimension (e.g 20,000 test and training points in 6D, 
the quadtree is worse, but no worries I took care of this and the exact decides 
when to use quadtree or not).  

https://github.com/danielblazevski/flink/blob/FLINK-1934/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/nn/zknn.scala

A preliminary test shows good scaling with the number when the test + training 
points are increased.  

8,000 points in 6D (i.e. 8,000 test points and 8,000 training points):
Elapsed time approx =   : 2s
Elapsed time exact =   : 27s

64,000 in 6D:  
Elapsed time approx =   : 6s
(didn't run the exact version, we know it's O(N^2))

I will have to clean things up, add edge cases, etc which may slow down the 
run-time a bit, but will definitely not increase the complexity of the 
algorithm with respect to the number of test/training points.

This still use a cross product, which I was hoping to avoid, but not sure if 
that's possible.  Any thoughts?  Basically the idea is to hash the test/train 
set to 1D (I use the z-value hash based on [1]). 

I still have not implemented the ideas in [1] in full.  The full solution is 
quite complex.  They do a bunch of load balancing that I'm still learning, and 
not quite sure of the payoff.  One option could be that I clean up what I have 
now and optimize since it's already performing well, and we open a new issue 
for to do all the steps in [1].  

There are still many things to clean up, but any cleaning/edge cases will not 
add in computational complexity with respect to the number of test points.  
e.g. I now convert the coordinates to integers and ignore the decimal part and 
there are now lots of collisions in the z-value hash, normalizing the data and 
adding a fixed max number of bits to compute the z-value (this is described 
towards the end of [3])

Any thoughts?

> Add approximative k-nearest-neighbours (kNN) algorithm to machine learning 
> library
> --
>
> Key: FLINK-1934
> URL: https://issues.apache.org/jira/browse/FLINK-1934
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Daniel 

[jira] [Comment Edited] (FLINK-1934) Add approximative k-nearest-neighbours (kNN) algorithm to machine learning library

2016-04-05 Thread Daniel Blazevski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227632#comment-15227632
 ] 

Daniel Blazevski edited comment on FLINK-1934 at 4/6/16 3:05 AM:
-

[~chiwanpark] [~till.rohrmann]

I have a Flink version -- still a bit preliminary -- of the approximate knn up 
and running.  The exact knn using a quadtree performs quite bad in 
moderate-to-high spatial dimension (e.g 20,000 test and training points in 6D, 
the quadtree is worse, but no worries I took care of this and the exact decides 
when to use quadtree or not).  

https://github.com/danielblazevski/flink/blob/FLINK-1934/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/nn/zknn.scala

A preliminary test shows good scaling with the number when the test + training 
points are increased.  

8,000 points in 6D (i.e. 8,000 test points and 8,000 training points):
Elapsed time approx =   : 2s
Elapsed time exact =   : 27s

64,000 in 6D:  
Elapsed time approx =   : 6s
(didn't run the exact version, we know it's O(N^2))

I will have to clean things up, add edge cases, etc which may slow down the 
run-time a bit, but will definitely not increase the complexity of the 
algorithm with respect to the number of test/training points.

This still use a cross product, which I was hoping to avoid, but not sure if 
that's possible.  Any thoughts?  Basically the idea is to hash the test/train 
set to 1D (I use the z-value hash based on [1]). 

I still have not implemented the ideas in [1] in full.  The full solution is 
quite complex.  They do a bunch of load balancing that I'm still learning, and 
not quite sure of the payoff.  One option could be that I clean up what I have 
now and optimize since it's already performing well, and we open a new issue 
for to do all the steps in [1].  

There are still many things to clean up, but any cleaning/edge cases will not 
add in computational complexity with respect to the number of test points.  
e.g. I now convert the coordinates to integers and ignore the decimal part and 
there are now lots of collisions in the z-value hash, normalizing the data and 
adding a fixed max number of bits to compute the z-value (this is described 
towards the end of [3])

Any thoughts?


was (Author: danielblazevski):
[~chiwanpark] [~till.rohrmann]

I have a Flink version -- still a bit preliminary -- of the approximate knn up 
and running.  The exact knn using a quadtree performs quite bad in 
moderate-to-high spatial dimension (e.g 20,000 test and training points in 6D, 
the quadtree is worse, but no worries I took care of this and the exact decides 
when to use quadtree or not).  

https://github.com/danielblazevski/flink/blob/FLINK-1934/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/nn/zknn.scala

A preliminary test shows good scaling with the number when the test + training 
points are increased.  

8,000 points (i.e. 8,000 test points and 8,000 training points):
Elapsed time approx =   : 2s
Elapsed time exact =   : 27s

64,000:  
Elapsed time approx =   : 6s
(didn't run the exact version, we know it's O(N^2))

I will have to clean things up, add edge cases, etc which may slow down the 
run-time a bit, but will definitely not increase the complexity of the 
algorithm with respect to the number of test/training points.

This still use a cross product, which I was hoping to avoid, but not sure if 
that's possible.  Any thoughts?  Basically the idea is to hash the test/train 
set to 1D (I use the z-value hash based on [1]). 

I still have not implemented the ideas in [1] in full.  The full solution is 
quite complex.  They do a bunch of load balancing that I'm still learning, and 
not quite sure of the payoff.  One option could be that I clean up what I have 
now and optimize since it's already performing well, and we open a new issue 
for to do all the steps in [1].  

There are still many things to clean up, but any cleaning/edge cases will not 
add in computational complexity with respect to the number of test points.  
e.g. I now convert the coordinates to integers and ignore the decimal part and 
there are now lots of collisions in the z-value hash, normalizing the data and 
adding a fixed max number of bits to compute the z-value (this is described 
towards the end of [3])

Any thoughts?

> Add approximative k-nearest-neighbours (kNN) algorithm to machine learning 
> library
> --
>
> Key: FLINK-1934
> URL: https://issues.apache.org/jira/browse/FLINK-1934
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Daniel Blazevski
>  Labels: ML
>
> kNN is still a widely used algorithm for classification and regression. 
> 

[jira] [Commented] (FLINK-1934) Add approximative k-nearest-neighbours (kNN) algorithm to machine learning library

2016-04-05 Thread Daniel Blazevski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227632#comment-15227632
 ] 

Daniel Blazevski commented on FLINK-1934:
-

[~chiwanpark] [~till.rohrmann]

I have a Flink version -- still a bit preliminary -- of the approximate knn up 
and running.  The exact knn using a quadtree performs quite bad in 
moderate-to-high spatial dimension (e.g 20,000 test and training points in 6D, 
the quadtree is worse, but no worries I took care of this and the exact decides 
when to use quadtree or not).  

https://github.com/danielblazevski/flink/blob/FLINK-1934/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/nn/zknn.scala

A preliminary test shows good scaling with the number when the test + training 
points are increased.  

8,000 points (i.e. 8,000 test points and 8,000 training points):
Elapsed time approx =   : 2s
Elapsed time exact =   : 27s

64,000:  
Elapsed time approx =   : 6s
(didn't run the exact version, we know it's O(N^2))

I will have to clean things up, add edge cases, etc which may slow down the 
run-time a bit, but will definitely not increase the complexity of the 
algorithm with respect to the number of test/training points.

This still use a cross product, which I was hoping to avoid, but not sure if 
that's possible.  Any thoughts?  Basically the idea is to hash the test/train 
set to 1D (I use the z-value hash based on [1]). 

I still have not implemented the ideas in [1] in full.  The full solution is 
quite complex.  They do a bunch of load balancing that I'm still learning, and 
not quite sure of the payoff.  One option could be that I clean up what I have 
now and optimize since it's already performing well, and we open a new issue 
for to do all the steps in [1].  

There are still many things to clean up, but any cleaning/edge cases will not 
add in computational complexity with respect to the number of test points.  
e.g. I now convert the coordinates to integers and ignore the decimal part and 
there are now lots of collisions in the z-value hash, normalizing the data and 
adding a fixed max number of bits to compute the z-value (this is described 
towards the end of [3])

Any thoughts?

> Add approximative k-nearest-neighbours (kNN) algorithm to machine learning 
> library
> --
>
> Key: FLINK-1934
> URL: https://issues.apache.org/jira/browse/FLINK-1934
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Daniel Blazevski
>  Labels: ML
>
> kNN is still a widely used algorithm for classification and regression. 
> However, due to the computational costs of an exact implementation, it does 
> not scale well to large amounts of data. Therefore, it is worthwhile to also 
> add an approximative kNN implementation as proposed in [1,2].  Reference [3] 
> is cited a few times in [1], and gives necessary background on the z-value 
> approach.
> Resources:
> [1] https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf
> [2] http://www.computer.org/csdl/proceedings/wacv/2007/2794/00/27940028.pdf
> [3] http://cs.sjtu.edu.cn/~yaobin/papers/icde10_knn.pdf



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat

2016-04-05 Thread Tian, Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227573#comment-15227573
 ] 

Tian, Li commented on FLINK-3655:
-

Thanks, I will do the path list first and use "readFile(FileInputFormat 
inputFormat, String.. filePaths)". 

> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Priority: Minor
>  Labels: starter
>
> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat so that a DataSource will process the directories 
> sequentially.
>
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat

2016-04-05 Thread Tian, Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227545#comment-15227545
 ] 

Tian, Li commented on FLINK-3655:
-

Will support wildcards

> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Priority: Minor
>  Labels: starter
>
> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat so that a DataSource will process the directories 
> sequentially.
>
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3664) Create a method to easily Summarize a DataSet

2016-04-05 Thread Todd Lisonbee (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227311#comment-15227311
 ] 

Todd Lisonbee commented on FLINK-3664:
--

I didn't have Travis CI setup with my github account so I added another commit 
so it would kick off a build.

> Create a method to easily Summarize a DataSet
> -
>
> Key: FLINK-3664
> URL: https://issues.apache.org/jira/browse/FLINK-3664
> Project: Flink
>  Issue Type: Improvement
>Reporter: Todd Lisonbee
> Attachments: DataSet-Summary-Design-March2016-v1.txt
>
>
> Here is an example:
> {code}
> /**
>  * Summarize a DataSet of Tuples by collecting single pass statistics for all 
> columns
>  */
> public Tuple summarize()
> Dataset> input = // [...]
> Tuple3 summary 
> = input.summarize()
> summary.getField(0).stddev()
> summary.getField(1).maxStringLength()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3664) Create a method to easily Summarize a DataSet

2016-04-05 Thread Todd Lisonbee (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227214#comment-15227214
 ] 

Todd Lisonbee commented on FLINK-3664:
--

Pull request is ready.  Please let me know if you'd like to see any other 
changes before merging.  Thanks.

> Create a method to easily Summarize a DataSet
> -
>
> Key: FLINK-3664
> URL: https://issues.apache.org/jira/browse/FLINK-3664
> Project: Flink
>  Issue Type: Improvement
>Reporter: Todd Lisonbee
> Attachments: DataSet-Summary-Design-March2016-v1.txt
>
>
> Here is an example:
> {code}
> /**
>  * Summarize a DataSet of Tuples by collecting single pass statistics for all 
> columns
>  */
> public Tuple summarize()
> Dataset> input = // [...]
> Tuple3 summary 
> = input.summarize()
> summary.getField(0).stddev()
> summary.getField(1).maxStringLength()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3664) Create a method to easily Summarize a DataSet

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227168#comment-15227168
 ] 

ASF GitHub Bot commented on FLINK-3664:
---

GitHub user tlisonbee opened a pull request:

https://github.com/apache/flink/pull/1855

[FLINK-3664] Create method to easily summarize a DataSet of Tuples

Adding summarize() method in DataSetUtils that will supply a number of 
single pass statistics for DataSets of Tuples.

Summary statistics depend on the type being summarized:

- Numeric types (Integer, IntValue, Float, Double, etc): min, max, mean, 
variance, standard deviation, NaN count, Infinity count, totalCount, etc.
- String, StringValue: minLength, maxLength, meanLength, emptyCount, 
totalCount
- Boolean, BooleanValue: trueCount, falseCount, totalCount.

Example usage:
`Dataset> input = // [...]`
`Tuple3 
summary = DataSetUtils.summarize(input)`

`summary.f0.getStandardDeviation()`
`summary.f1.getMaxLength()`

Uses the Kahan summation algorithm to avoid numeric instability.  The 
algorithm is described in: "Scalable and Numerically Stable Descriptive 
Statistics in SystemML", Tian et al, International Conference on Data 
Engineering 2012.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tlisonbee/flink FLINK-3664

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1855.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1855


commit 65f54df532829994a8be240a27b9138d01a186b5
Author: Todd Lisonbee 
Date:   2016-04-05T05:51:12Z

[FLINK-3664] Create DataSetUtils method to easily summarize a DataSet of 
Tuples




> Create a method to easily Summarize a DataSet
> -
>
> Key: FLINK-3664
> URL: https://issues.apache.org/jira/browse/FLINK-3664
> Project: Flink
>  Issue Type: Improvement
>Reporter: Todd Lisonbee
> Attachments: DataSet-Summary-Design-March2016-v1.txt
>
>
> Here is an example:
> {code}
> /**
>  * Summarize a DataSet of Tuples by collecting single pass statistics for all 
> columns
>  */
> public Tuple summarize()
> Dataset> input = // [...]
> Tuple3 summary 
> = input.summarize()
> summary.getField(0).stddev()
> summary.getField(1).maxStringLength()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3664] Create method to easily summarize...

2016-04-05 Thread tlisonbee
GitHub user tlisonbee opened a pull request:

https://github.com/apache/flink/pull/1855

[FLINK-3664] Create method to easily summarize a DataSet of Tuples

Adding summarize() method in DataSetUtils that will supply a number of 
single pass statistics for DataSets of Tuples.

Summary statistics depend on the type being summarized:

- Numeric types (Integer, IntValue, Float, Double, etc): min, max, mean, 
variance, standard deviation, NaN count, Infinity count, totalCount, etc.
- String, StringValue: minLength, maxLength, meanLength, emptyCount, 
totalCount
- Boolean, BooleanValue: trueCount, falseCount, totalCount.

Example usage:
`Dataset> input = // [...]`
`Tuple3 
summary = DataSetUtils.summarize(input)`

`summary.f0.getStandardDeviation()`
`summary.f1.getMaxLength()`

Uses the Kahan summation algorithm to avoid numeric instability.  The 
algorithm is described in: "Scalable and Numerically Stable Descriptive 
Statistics in SystemML", Tian et al, International Conference on Data 
Engineering 2012.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tlisonbee/flink FLINK-3664

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1855.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1855


commit 65f54df532829994a8be240a27b9138d01a186b5
Author: Todd Lisonbee 
Date:   2016-04-05T05:51:12Z

[FLINK-3664] Create DataSetUtils method to easily summarize a DataSet of 
Tuples




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3688) ClassCastException in StreamRecordSerializer when WindowOperator.trigger() is called and TimeCharacteristic = ProcessingTime

2016-04-05 Thread Konstantin Knauf (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226941#comment-15226941
 ] 

Konstantin Knauf commented on FLINK-3688:
-

Could open the PR (https://travis-ci.org/knaufk/flink/jobs/120960435), but the 
build fails immediatly with

java.io.FileNotFoundException: build-target/lib/flink-dist-*.jar (No such file 
or directory)
at java.util.zip.ZipFile.open(Native Method)
   at java.util.zip.ZipFile.(ZipFile.java:220)
at java.util.zip.ZipFile.(ZipFile.java:150)
at java.util.zip.ZipFile.(ZipFile.java:121)
at sun.tools.jar.Main.list(Main.java:1060)
at sun.tools.jar.Main.run(Main.java:291)
at sun.tools.jar.Main.main(Main.java:1233)
find: `./flink-yarn-tests/target/flink-yarn-tests*': No such file or directory

Not sure what to do about it? master fails with the same error on my Travis...

> ClassCastException in StreamRecordSerializer when WindowOperator.trigger() is 
> called and TimeCharacteristic = ProcessingTime
> 
>
> Key: FLINK-3688
> URL: https://issues.apache.org/jira/browse/FLINK-3688
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Critical
>
> Hi,
> when using {{TimeCharacteristics.ProcessingTime}} a ClassCastException is 
> thrown in {{StreamRecordSerializer}} when 
> {{WindowOperator.processWatermark()}} is called from 
> {{WindowOperator.trigger()}}, i.e. whenever a ProcessingTimeTimer is 
> triggered. 
> The problem seems to be that {{processWatermark()}} is also called in 
> {{trigger()}}, when time characteristic is ProcessingTime, but in 
> {{RecordWriterOutput}} {{enableWatermarkMultiplexing}} is {{false}} and the 
> {{TypeSerializer}} is a {{StreamRecordSerializer}}, which ultimately leads to 
> the ClassCastException. Do you agree?
> If this is indeed a bug, there several possible solutions.
> # Only calling {{processWatermark()}} in {{trigger()}}, when 
> TimeCharacteristic is EventTime
> # Not calling {{processWatermark()}} in {{trigger()}} at all, instead wait 
> for the next watermark to trigger the EventTimeTimers with a timestamp behind 
> the current watermark. This is, of course, a trade off. 
> # Using {{MultiplexingStreamRecordSerializer}} all the time, but I have no 
> idea what the side effect of this change would be. I assume there is a reason 
> for existence of the StreamRecordSerializer ;)
> StackTrace: 
> {quote}
> TimerException\{java.lang.RuntimeException: 
> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord\}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:716)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:370)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:293)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:323)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:710)
>   ... 7 more
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> 

[jira] [Commented] (FLINK-3696) Some Union tests fail for TableConfigMode.EFFICIENT

2016-04-05 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226745#comment-15226745
 ] 

Vasia Kalavri commented on FLINK-3696:
--

The problem here is that the TypeConverter creates a TupleTypeInfo for 
efficient mode when there is no expected physical type.
Thus, if the other union input is a scala tuple then the union operator 
complains that the input types are different.
Any suggestions how to nicely solve this [~twalthr]? Thanks!

> Some Union tests fail for TableConfigMode.EFFICIENT
> ---
>
> Key: FLINK-3696
> URL: https://issues.apache.org/jira/browse/FLINK-3696
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>
> e.g. testUnionWithFilter gives the following exception:
> {code}
> org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of 
> different types. Input1=scala.Tuple3(_1: Integer, _2: Long, _3: String), 
> input2=Java Tuple3
>   at 
> org.apache.flink.api.java.operators.UnionOperator.(UnionOperator.java:47)
>   at org.apache.flink.api.java.DataSet.union(DataSet.java:1208)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetUnion.translateToPlan(DataSetUnion.scala:81)
>   at 
> org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:95)
>   at 
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:91)
>   at 
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:51)
>   at 
> org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:43)
>   at 
> org.apache.flink.api.scala.table.test.UnionITCase.testUnionWithFilter(UnionITCase.scala:77)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at org.junit.runners.Suite.runChild(Suite.java:127)
>   at org.junit.runners.Suite.runChild(Suite.java:26)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at org.junit.runners.Suite.runChild(Suite.java:127)
>   at org.junit.runners.Suite.runChild(Suite.java:26)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
>   at 
> 

[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat

2016-04-05 Thread Gna Phetsarath (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226676#comment-15226676
 ] 

Gna Phetsarath commented on FLINK-3655:
---

Will do be doing wildcards as well, or should be put that as another ticket?


> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Priority: Minor
>  Labels: starter
>
> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat so that a DataSource will process the directories 
> sequentially.
>
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes

2016-04-05 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reassigned FLINK-3697:
-

Assignee: Robert Metzger

> keyBy() with nested POJO computes invalid field position indexes
> 
>
> Key: FLINK-3697
> URL: https://issues.apache.org/jira/browse/FLINK-3697
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.0
> Environment: MacOS X 10.10
>Reporter: Ron Crocker
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: pojo
>
> Using named keys in keyBy() for nested POJO types results in failure. The 
> iindexes for named key fields are used inconsistently with nested POJO types. 
> In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position 
> after (apparently) flattening the structure but is referenced in the 
> unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}.
> In the example below, getFlatFields() returns positions 0, 1, and 14. These 
> positions appear correct in the flattened structure of the Data class. 
> However, in {{KeySelector getSelectorForKeys(Keys keys, 
> TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to 
> {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results 
> {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the 
> length of the directly named fields of the object vs the length of flattened 
> version of that type.
> Concrete Example:
> Consider this graph:
> {code}
> DataStream dataStream = see.addSource(new 
> FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), 
> kafkaConsumerProperties));
> dataStream
>   .flatMap(new DataMapper())
>   .keyBy("aaa", "abc", "wxyz")
> {code}
> {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes 
> this NativeDataFormat object and extracts individual Data objects: {code}
> public class Data {
> public int aaa;
> public int abc;
> public long wxyz;
> public int t1;
> public int t2;
> public Policy policy;
> public Stats stats;
> public Data() {}
> {code}
> A {{Policy}} object is an instance of this class:
> {code}
> public class Policy {
> public short a;
> public short b;
> public boolean c;
> public boolean d;
> public Policy() {}
> }
> {code}
> A {{Stats}} object is an instance of this class:
> {code}
> public class Stats {
> public long count;
> public float a;
> public float b;
> public float c;
> public float d;
> public float e;
> public Stats() {}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226613#comment-15226613
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58572834
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
 ---
@@ -0,0 +1,43 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+cluster_name: 'Test Cluster'
+commitlog_sync: 'periodic'
+commitlog_sync_period_in_ms: 1
+commitlog_segment_size_in_mb: 16
+partitioner: 'org.apache.cassandra.dht.RandomPartitioner'
+endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch'
+commitlog_directory: $PATH\commit'
+data_file_directories:
+- $PATH\data'
+saved_caches_directory: $PATH\cache'
--- End diff --

I wonder why the path works on unix platforms. Only windows is using `\` as 
a separator.


> Add a connector for streaming data into Cassandra
> -
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into 
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance 
> (for the tests): 
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard 
> java types to the respective cassandra types.
> In addition, it seems that there is a object mapper from datastax to store 
> POJOs in Cassandra (there are annotations for defining the primary key and 
> types)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58572834
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml
 ---
@@ -0,0 +1,43 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+cluster_name: 'Test Cluster'
+commitlog_sync: 'periodic'
+commitlog_sync_period_in_ms: 1
+commitlog_segment_size_in_mb: 16
+partitioner: 'org.apache.cassandra.dht.RandomPartitioner'
+endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch'
+commitlog_directory: $PATH\commit'
+data_file_directories:
+- $PATH\data'
+saved_caches_directory: $PATH\cache'
--- End diff --

I wonder why the path works on unix platforms. Only windows is using `\` as 
a separator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3492) Allow users to define a min pause between checkpoints

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226604#comment-15226604
 ] 

ASF GitHub Bot commented on FLINK-3492:
---

Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1794#issuecomment-205887495
  
reworked test to be more reliable.


> Allow users to define a min pause between checkpoints
> -
>
> Key: FLINK-3492
> URL: https://issues.apache.org/jira/browse/FLINK-3492
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> FLINK-3051 introduced already a filed in the {{CheckpointConfig}} to specify 
> a min pause between checkpoints.
> In high-load situations (big state), jobs might spend their entire time 
> creating snapshots, not processing data. With a min pause between 
> checkpoints, users can guarantee that there is a certain time-span the system 
> can use for doing some actual data processing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3492] Configurable interval between Che...

2016-04-05 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1794#issuecomment-205887495
  
reworked test to be more reliable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3637] Refactor rolling sink writer

2016-04-05 Thread dalegaard
Github user dalegaard commented on the pull request:

https://github.com/apache/flink/pull/1826#issuecomment-205886932
  
@aljoscha okay, I'll fix that and rebase onto master. Naming things is one 
of the only two hard problems in computer science after all :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3637) Change RollingSink Writer interface to allow wider range of outputs

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226598#comment-15226598
 ] 

ASF GitHub Bot commented on FLINK-3637:
---

Github user dalegaard commented on the pull request:

https://github.com/apache/flink/pull/1826#issuecomment-205886932
  
@aljoscha okay, I'll fix that and rebase onto master. Naming things is one 
of the only two hard problems in computer science after all :)


> Change RollingSink Writer interface to allow wider range of outputs
> ---
>
> Key: FLINK-3637
> URL: https://issues.apache.org/jira/browse/FLINK-3637
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Lasse Dalegaard
>Assignee: Lasse Dalegaard
>  Labels: features
>
> Currently the RollingSink Writer interface only works with 
> FSDataOutputStreams, which precludes it from being used with some existing 
> libraries like Apache ORC and Parquet.
> To fix this, a new Writer interface can be created, which receives FileSystem 
> and Path objects, instead of FSDataOutputStream.
> To ensure exactly-once semantics, the Writer interface must also be extended 
> so that the current write-offset can be retrieved at checkpointing time. For 
> formats like ORC this requires a footer to be written, before the offset is 
> returned. Checkpointing already calls flush on the writer, but either flush 
> needs to return the current length of the output file, or alternatively a new 
> method has to be added for this.
> The existing Writer interface can be recreated with a wrapper on top of the 
> new Writer interface. The existing code that manages the FSDataOutputStream 
> can then be moved into this new wrapper.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2143] Added ReduceFunctionWithInverse

2016-04-05 Thread ggevay
Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/856#issuecomment-205884988
  
I think that this unfortunately can't be adapted to the new windowing 
system, so it can be closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2143) Add an overload to reduceWindow which takes the inverse of the reduceFunction as a second parameter

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226583#comment-15226583
 ] 

ASF GitHub Bot commented on FLINK-2143:
---

Github user ggevay closed the pull request at:

https://github.com/apache/flink/pull/856


> Add an overload to reduceWindow which takes the inverse of the reduceFunction 
> as a second parameter
> ---
>
> Key: FLINK-2143
> URL: https://issues.apache.org/jira/browse/FLINK-2143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>
> If the inverse of the reduceFunction is also available (for example 
> subtraction when summing numbers), then a PreReducer can maintain the 
> aggregate in O(1) memory and O(1) time for evict, store, and emitWindow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226596#comment-15226596
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58571353
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
 ---
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.UUID;
+
+public class CassandraTupleWriteAheadSinkExample {
+   public static void main(String[] args) throws Exception {
+
+   class MySource implements SourceFunction>, Checkpointed {
--- End diff --

I would move the class out of the main method


> Add a connector for streaming data into Cassandra
> -
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into 
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance 
> (for the tests): 
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard 
> java types to the respective cassandra types.
> In addition, it seems that there is a object mapper from datastax to store 
> POJOs in Cassandra (there are annotations for defining the primary key and 
> types)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58571353
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
 ---
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.UUID;
+
+public class CassandraTupleWriteAheadSinkExample {
+   public static void main(String[] args) throws Exception {
+
+   class MySource implements SourceFunction>, Checkpointed {
--- End diff --

I would move the class out of the main method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3637] Refactor rolling sink writer

2016-04-05 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1826#issuecomment-205885519
  
The changes look good. One thing I would like to have changed is to rename 
`SimpleWriterBase` to `StreamWriterBase` or `StreamWriter` based to reflect the 
fact that it is used for Stream based writers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2143] Added ReduceFunctionWithInverse

2016-04-05 Thread ggevay
Github user ggevay closed the pull request at:

https://github.com/apache/flink/pull/856


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2143) Add an overload to reduceWindow which takes the inverse of the reduceFunction as a second parameter

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226582#comment-15226582
 ] 

ASF GitHub Bot commented on FLINK-2143:
---

Github user ggevay commented on the pull request:

https://github.com/apache/flink/pull/856#issuecomment-205884988
  
I think that this unfortunately can't be adapted to the new windowing 
system, so it can be closed.


> Add an overload to reduceWindow which takes the inverse of the reduceFunction 
> as a second parameter
> ---
>
> Key: FLINK-2143
> URL: https://issues.apache.org/jira/browse/FLINK-2143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>
> If the inverse of the reduceFunction is also available (for example 
> subtraction when summing numbers), then a PreReducer can maintain the 
> aggregate in O(1) memory and O(1) time for evict, store, and emitWindow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2143) Add an overload to reduceWindow which takes the inverse of the reduceFunction as a second parameter

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226575#comment-15226575
 ] 

ASF GitHub Bot commented on FLINK-2143:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/856#issuecomment-205883786
  
@ggevay The windowing system changed a while back. Is this still valid or 
can it be closed?


> Add an overload to reduceWindow which takes the inverse of the reduceFunction 
> as a second parameter
> ---
>
> Key: FLINK-2143
> URL: https://issues.apache.org/jira/browse/FLINK-2143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>
> If the inverse of the reduceFunction is also available (for example 
> subtraction when summing numbers), then a PreReducer can maintain the 
> aggregate in O(1) memory and O(1) time for evict, store, and emitWindow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2143] Added ReduceFunctionWithInverse

2016-04-05 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/856#issuecomment-205883786
  
@ggevay The windowing system changed a while back. Is this still valid or 
can it be closed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3637) Change RollingSink Writer interface to allow wider range of outputs

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226571#comment-15226571
 ] 

ASF GitHub Bot commented on FLINK-3637:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1826#issuecomment-205883214
  
hi @dalegaard this must have slipped my mind. I'll review it tomorrow and 
merge if possible.


> Change RollingSink Writer interface to allow wider range of outputs
> ---
>
> Key: FLINK-3637
> URL: https://issues.apache.org/jira/browse/FLINK-3637
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Lasse Dalegaard
>Assignee: Lasse Dalegaard
>  Labels: features
>
> Currently the RollingSink Writer interface only works with 
> FSDataOutputStreams, which precludes it from being used with some existing 
> libraries like Apache ORC and Parquet.
> To fix this, a new Writer interface can be created, which receives FileSystem 
> and Path objects, instead of FSDataOutputStream.
> To ensure exactly-once semantics, the Writer interface must also be extended 
> so that the current write-offset can be retrieved at checkpointing time. For 
> formats like ORC this requires a footer to be written, before the offset is 
> returned. Checkpointing already calls flush on the writer, but either flush 
> needs to return the current length of the output file, or alternatively a new 
> method has to be added for this.
> The existing Writer interface can be recreated with a wrapper on top of the 
> new Writer interface. The existing code that manages the FSDataOutputStream 
> can then be moved into this new wrapper.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3637] Refactor rolling sink writer

2016-04-05 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1826#issuecomment-205883214
  
hi @dalegaard this must have slipped my mind. I'll review it tomorrow and 
merge if possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2166) Add fromCsvFile() to TableEnvironment

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226567#comment-15226567
 ] 

ASF GitHub Bot commented on FLINK-2166:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/939#issuecomment-205882687
  
@twalthr @fhueske @vasia Is this still valid with the recent changes in the 
Table API?


> Add fromCsvFile() to TableEnvironment
> -
>
> Key: FLINK-2166
> URL: https://issues.apache.org/jira/browse/FLINK-2166
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Priority: Minor
>  Labels: starter
>
> Add a {{fromCsvFile()}} method to the {{TableEnvironment}} to read a 
> {{Table}} from a CSV file.
> The implementation should reuse Flink's CsvInputFormat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-2166. Add fromCsvFile() method to TableE...

2016-04-05 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/939#issuecomment-205882687
  
@twalthr @fhueske @vasia Is this still valid with the recent changes in the 
Table API?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2828) Add interfaces for Table API input formats

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226564#comment-15226564
 ] 

ASF GitHub Bot commented on FLINK-2828:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1237#issuecomment-205881346
  
@twalthr is this still valid with the changes in the Table API or can it be 
closed?


> Add interfaces for Table API input formats
> --
>
> Key: FLINK-2828
> URL: https://issues.apache.org/jira/browse/FLINK-2828
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> In order to support input formats for the Table API, interfaces are 
> necessary. I propose two types of TableSources:
> - AdaptiveTableSources can adapt their output to the requirements of the 
> plan. Although the output schema stays the same, the TableSource can react on 
> field resolution and/or predicates internally and can return adapted 
> DataSet/DataStream versions in the "translate" step.
> - StaticTableSources are an easy way to provide the Table API with additional 
> input formats without much implementation effort (e.g. for fromCsvFile())
> TableSources need to be deeply integrated into the Table API.
> The TableEnvironment requires a newly introduced AbstractExecutionEnvironment 
> (common super class of all ExecutionEnvironments for DataSets and 
> DataStreams).
> Here's what a TableSource can see from more complicated queries:
> {code}
> getTableJava(tableSource1)
>   .filter("a===5 || a===6")
>   .select("a as a4, b as b4, c as c4")
>   .filter("b4===7")
>   .join(getTableJava(tableSource2))
>   .where("a===a4 && c==='Test' && c4==='Test2'")
> // Result predicates for tableSource1:
> //  List("a===5 || a===6", "b===7", "c==='Test2'")
> // Result predicates for tableSource2:
> //  List("c==='Test'")
> // Result resolved fields for tableSource1 (true = filtering, 
> false=selection):
> //  Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), 
> ("c", true))
> // Result resolved fields for tableSource2 (true = filtering, 
> false=selection):
> //  Set(("a", true), ("c", true))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226563#comment-15226563
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58568963
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import java.util.UUID;
+
+/**
+ * This class wraps different Cassandra sink implementations to provide a 
common interface for all of them.
+ *
+ * @param  input type
+ */
+public class CassandraSink {
+   private static final String jobID = 
UUID.randomUUID().toString().replace("-", "_");
--- End diff --

The problem is that there will be two `jobID`s which can lead to confusions.


> Add a connector for streaming data into Cassandra
> -
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into 
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance 
> (for the tests): 
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard 
> java types to the respective cassandra types.
> In addition, it seems that there is a object mapper from datastax to store 
> POJOs in Cassandra (there are annotations for defining the primary key and 
> types)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2828] [table] Add interfaces for Table ...

2016-04-05 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1237#issuecomment-205881346
  
@twalthr is this still valid with the changes in the Table API or can it be 
closed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58568963
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import java.util.UUID;
+
+/**
+ * This class wraps different Cassandra sink implementations to provide a 
common interface for all of them.
+ *
+ * @param  input type
+ */
+public class CassandraSink {
+   private static final String jobID = 
UUID.randomUUID().toString().replace("-", "_");
--- End diff --

The problem is that there will be two `jobID`s which can lead to confusions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3283) Failed Kafka 0.9 test on duplicate message

2016-04-05 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226559#comment-15226559
 ] 

Robert Metzger commented on FLINK-3283:
---

This commit improves the Kafka tests stability: 
http://git-wip-us.apache.org/repos/asf/flink/commit/02a3dfde

> Failed Kafka 0.9 test on duplicate message
> --
>
> Key: FLINK-3283
> URL: https://issues.apache.org/jira/browse/FLINK-3283
> Project: Flink
>  Issue Type: Test
>Reporter: Ufuk Celebi
>Assignee: Robert Metzger
>
> On a branch with unrelated changes 
> {{Kafka09ITCase.testMultipleSourcesOnePartition:82->KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest}}
>  failed.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/104582020/log.txt
> {code}
> Caused by: java.lang.Exception: Received a duplicate: 1712
>   at 
> org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink.invoke(ValidatingExactlyOnceSink.java:53)
>   at 
> org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink.invoke(ValidatingExactlyOnceSink.java:30)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:37)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:232)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3283) Failed Kafka 0.9 test on duplicate message

2016-04-05 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reassigned FLINK-3283:
-

Assignee: Robert Metzger

> Failed Kafka 0.9 test on duplicate message
> --
>
> Key: FLINK-3283
> URL: https://issues.apache.org/jira/browse/FLINK-3283
> Project: Flink
>  Issue Type: Test
>Reporter: Ufuk Celebi
>Assignee: Robert Metzger
>
> On a branch with unrelated changes 
> {{Kafka09ITCase.testMultipleSourcesOnePartition:82->KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest}}
>  failed.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/104582020/log.txt
> {code}
> Caused by: java.lang.Exception: Received a duplicate: 1712
>   at 
> org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink.invoke(ValidatingExactlyOnceSink.java:53)
>   at 
> org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink.invoke(ValidatingExactlyOnceSink.java:30)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:37)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:232)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2732) Add access to the TaskManagers' log file and out file in the web dashboard.

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226554#comment-15226554
 ] 

ASF GitHub Bot commented on FLINK-2732:
---

Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1790#issuecomment-205877524
  
Rebased.


> Add access to the TaskManagers' log file and out file in the web dashboard.
> ---
>
> Key: FLINK-2732
> URL: https://issues.apache.org/jira/browse/FLINK-2732
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
> Fix For: 1.0.0
>
>
> Add access to the TaskManagers' log file and out file in the web dashboard.
> This needs addition on the server side, as the log files need to be 
> transferred   to the JobManager via the blob server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard

2016-04-05 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1790#issuecomment-205877524
  
Rebased.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3697] Properly access type information ...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1851#discussion_r58564424
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
 ---
@@ -182,11 +182,14 @@ public R set(R record, F fieldValue) {
@SuppressWarnings("unchecked")
CompositeType cType = (CompositeType) type;
 
+   if(field.contains(".")) {
+   throw new IllegalArgumentException("The Pojo 
field accessor currently doesn't support nested POJOs");
--- End diff --

Thank you for reviewing my pull request. I've opened FLINK-3702 for that.

See also here: 
https://github.com/apache/flink/pull/1851/files#diff-e5e091a2c1e4bf850ef93e2010fe4c81R132


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1337) Create an Amazon EMR Bootstrap Action

2016-04-05 Thread Timur Fayruzov (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226522#comment-15226522
 ] 

Timur Fayruzov commented on FLINK-1337:
---

It's an old issue, but if there is still interest I can contribute. I'm working 
out EMR cluster bootstrap for Flink right now.

> Create an Amazon EMR Bootstrap Action
> -
>
> Key: FLINK-1337
> URL: https://issues.apache.org/jira/browse/FLINK-1337
> Project: Flink
>  Issue Type: New Feature
>  Components: other
>Reporter: Stephan Ewen
>Priority: Minor
>
> EMR offers bootstrap actions that prepare the cluster by installing 
> additional components, etc..
> We can offer a Flink bootstrap action that downloads, unpacks, and configures 
> Flink. It may optionally install libraries that we like to use (such as 
> Python, BLAS/JBLAS, ...)
> http://blogs.aws.amazon.com/bigdata/post/TxO6EHTHQALSIB/Getting-Started-with-Amazon-EMR-Bootstrap-Actions



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226513#comment-15226513
 ] 

ASF GitHub Bot commented on FLINK-3697:
---

Github user RonCrocker commented on the pull request:

https://github.com/apache/flink/pull/1851#issuecomment-205868281
  
:+1:



> keyBy() with nested POJO computes invalid field position indexes
> 
>
> Key: FLINK-3697
> URL: https://issues.apache.org/jira/browse/FLINK-3697
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.0
> Environment: MacOS X 10.10
>Reporter: Ron Crocker
>Priority: Critical
>  Labels: pojo
>
> Using named keys in keyBy() for nested POJO types results in failure. The 
> iindexes for named key fields are used inconsistently with nested POJO types. 
> In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position 
> after (apparently) flattening the structure but is referenced in the 
> unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}.
> In the example below, getFlatFields() returns positions 0, 1, and 14. These 
> positions appear correct in the flattened structure of the Data class. 
> However, in {{KeySelector getSelectorForKeys(Keys keys, 
> TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to 
> {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results 
> {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the 
> length of the directly named fields of the object vs the length of flattened 
> version of that type.
> Concrete Example:
> Consider this graph:
> {code}
> DataStream dataStream = see.addSource(new 
> FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), 
> kafkaConsumerProperties));
> dataStream
>   .flatMap(new DataMapper())
>   .keyBy("aaa", "abc", "wxyz")
> {code}
> {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes 
> this NativeDataFormat object and extracts individual Data objects: {code}
> public class Data {
> public int aaa;
> public int abc;
> public long wxyz;
> public int t1;
> public int t2;
> public Policy policy;
> public Stats stats;
> public Data() {}
> {code}
> A {{Policy}} object is an instance of this class:
> {code}
> public class Policy {
> public short a;
> public short b;
> public boolean c;
> public boolean d;
> public Policy() {}
> }
> {code}
> A {{Stats}} object is an instance of this class:
> {code}
> public class Stats {
> public long count;
> public float a;
> public float b;
> public float c;
> public float d;
> public float e;
> public Stats() {}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3697] Properly access type information ...

2016-04-05 Thread RonCrocker
Github user RonCrocker commented on the pull request:

https://github.com/apache/flink/pull/1851#issuecomment-205868281
  
:+1:



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226511#comment-15226511
 ] 

ASF GitHub Bot commented on FLINK-3697:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1851#discussion_r58564424
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
 ---
@@ -182,11 +182,14 @@ public R set(R record, F fieldValue) {
@SuppressWarnings("unchecked")
CompositeType cType = (CompositeType) type;
 
+   if(field.contains(".")) {
+   throw new IllegalArgumentException("The Pojo 
field accessor currently doesn't support nested POJOs");
--- End diff --

Thank you for reviewing my pull request. I've opened FLINK-3702 for that.

See also here: 
https://github.com/apache/flink/pull/1851/files#diff-e5e091a2c1e4bf850ef93e2010fe4c81R132


> keyBy() with nested POJO computes invalid field position indexes
> 
>
> Key: FLINK-3697
> URL: https://issues.apache.org/jira/browse/FLINK-3697
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.0
> Environment: MacOS X 10.10
>Reporter: Ron Crocker
>Priority: Critical
>  Labels: pojo
>
> Using named keys in keyBy() for nested POJO types results in failure. The 
> iindexes for named key fields are used inconsistently with nested POJO types. 
> In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position 
> after (apparently) flattening the structure but is referenced in the 
> unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}.
> In the example below, getFlatFields() returns positions 0, 1, and 14. These 
> positions appear correct in the flattened structure of the Data class. 
> However, in {{KeySelector getSelectorForKeys(Keys keys, 
> TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to 
> {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results 
> {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the 
> length of the directly named fields of the object vs the length of flattened 
> version of that type.
> Concrete Example:
> Consider this graph:
> {code}
> DataStream dataStream = see.addSource(new 
> FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), 
> kafkaConsumerProperties));
> dataStream
>   .flatMap(new DataMapper())
>   .keyBy("aaa", "abc", "wxyz")
> {code}
> {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes 
> this NativeDataFormat object and extracts individual Data objects: {code}
> public class Data {
> public int aaa;
> public int abc;
> public long wxyz;
> public int t1;
> public int t2;
> public Policy policy;
> public Stats stats;
> public Data() {}
> {code}
> A {{Policy}} object is an instance of this class:
> {code}
> public class Policy {
> public short a;
> public short b;
> public boolean c;
> public boolean d;
> public Policy() {}
> }
> {code}
> A {{Stats}} object is an instance of this class:
> {code}
> public class Stats {
> public long count;
> public float a;
> public float b;
> public float c;
> public float d;
> public float e;
> public Stats() {}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226507#comment-15226507
 ] 

ASF GitHub Bot commented on FLINK-3697:
---

Github user RonCrocker commented on a diff in the pull request:

https://github.com/apache/flink/pull/1851#discussion_r58564095
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
 ---
@@ -182,11 +182,14 @@ public R set(R record, F fieldValue) {
@SuppressWarnings("unchecked")
CompositeType cType = (CompositeType) type;
 
+   if(field.contains(".")) {
+   throw new IllegalArgumentException("The Pojo 
field accessor currently doesn't support nested POJOs");
--- End diff --

Note that the problem reported in 
[FLINK-3697](https://issues.apache.org/jira/browse/FLINK-3697) was not related 
to accessing a nested field.. Perhaps there should be a subsequent JIRA ticket 
to support nested POJO accessors.


> keyBy() with nested POJO computes invalid field position indexes
> 
>
> Key: FLINK-3697
> URL: https://issues.apache.org/jira/browse/FLINK-3697
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.0
> Environment: MacOS X 10.10
>Reporter: Ron Crocker
>Priority: Critical
>  Labels: pojo
>
> Using named keys in keyBy() for nested POJO types results in failure. The 
> iindexes for named key fields are used inconsistently with nested POJO types. 
> In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position 
> after (apparently) flattening the structure but is referenced in the 
> unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}.
> In the example below, getFlatFields() returns positions 0, 1, and 14. These 
> positions appear correct in the flattened structure of the Data class. 
> However, in {{KeySelector getSelectorForKeys(Keys keys, 
> TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to 
> {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results 
> {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the 
> length of the directly named fields of the object vs the length of flattened 
> version of that type.
> Concrete Example:
> Consider this graph:
> {code}
> DataStream dataStream = see.addSource(new 
> FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), 
> kafkaConsumerProperties));
> dataStream
>   .flatMap(new DataMapper())
>   .keyBy("aaa", "abc", "wxyz")
> {code}
> {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes 
> this NativeDataFormat object and extracts individual Data objects: {code}
> public class Data {
> public int aaa;
> public int abc;
> public long wxyz;
> public int t1;
> public int t2;
> public Policy policy;
> public Stats stats;
> public Data() {}
> {code}
> A {{Policy}} object is an instance of this class:
> {code}
> public class Policy {
> public short a;
> public short b;
> public boolean c;
> public boolean d;
> public Policy() {}
> }
> {code}
> A {{Stats}} object is an instance of this class:
> {code}
> public class Stats {
> public long count;
> public float a;
> public float b;
> public float c;
> public float d;
> public float e;
> public Stats() {}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3697] Properly access type information ...

2016-04-05 Thread RonCrocker
Github user RonCrocker commented on a diff in the pull request:

https://github.com/apache/flink/pull/1851#discussion_r58564095
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java
 ---
@@ -182,11 +182,14 @@ public R set(R record, F fieldValue) {
@SuppressWarnings("unchecked")
CompositeType cType = (CompositeType) type;
 
+   if(field.contains(".")) {
+   throw new IllegalArgumentException("The Pojo 
field accessor currently doesn't support nested POJOs");
--- End diff --

Note that the problem reported in 
[FLINK-3697](https://issues.apache.org/jira/browse/FLINK-3697) was not related 
to accessing a nested field.. Perhaps there should be a subsequent JIRA ticket 
to support nested POJO accessors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes

2016-04-05 Thread Ron Crocker (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226504#comment-15226504
 ] 

Ron Crocker commented on FLINK-3697:


[~rmetzger] This code is fine to use for your test case. 

The key aspect to include is a key field is lexicographically "greater than" a 
field that holds a nested POJO. This is what causes the index of the key field 
(in the flattened representation of the POJO) to be beyond the natural 
(unflattened) fields of the outer POJO.

I would have provided a shorter example but I didn't have the time to do so.

> keyBy() with nested POJO computes invalid field position indexes
> 
>
> Key: FLINK-3697
> URL: https://issues.apache.org/jira/browse/FLINK-3697
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.0
> Environment: MacOS X 10.10
>Reporter: Ron Crocker
>Priority: Critical
>  Labels: pojo
>
> Using named keys in keyBy() for nested POJO types results in failure. The 
> iindexes for named key fields are used inconsistently with nested POJO types. 
> In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position 
> after (apparently) flattening the structure but is referenced in the 
> unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}.
> In the example below, getFlatFields() returns positions 0, 1, and 14. These 
> positions appear correct in the flattened structure of the Data class. 
> However, in {{KeySelector getSelectorForKeys(Keys keys, 
> TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to 
> {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results 
> {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the 
> length of the directly named fields of the object vs the length of flattened 
> version of that type.
> Concrete Example:
> Consider this graph:
> {code}
> DataStream dataStream = see.addSource(new 
> FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), 
> kafkaConsumerProperties));
> dataStream
>   .flatMap(new DataMapper())
>   .keyBy("aaa", "abc", "wxyz")
> {code}
> {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes 
> this NativeDataFormat object and extracts individual Data objects: {code}
> public class Data {
> public int aaa;
> public int abc;
> public long wxyz;
> public int t1;
> public int t2;
> public Policy policy;
> public Stats stats;
> public Data() {}
> {code}
> A {{Policy}} object is an instance of this class:
> {code}
> public class Policy {
> public short a;
> public short b;
> public boolean c;
> public boolean d;
> public Policy() {}
> }
> {code}
> A {{Stats}} object is an instance of this class:
> {code}
> public class Stats {
> public long count;
> public float a;
> public float b;
> public float c;
> public float d;
> public float e;
> public Stats() {}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58562758
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import java.util.UUID;
+
+/**
+ * This class wraps different Cassandra sink implementations to provide a 
common interface for all of them.
+ *
+ * @param  input type
+ */
+public class CassandraSink {
+   private static final String jobID = 
UUID.randomUUID().toString().replace("-", "_");
--- End diff --

why should it be named sinkID when it is supposed to be a job-specific ID?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226491#comment-15226491
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58562758
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import java.util.UUID;
+
+/**
+ * This class wraps different Cassandra sink implementations to provide a 
common interface for all of them.
+ *
+ * @param  input type
+ */
+public class CassandraSink {
+   private static final String jobID = 
UUID.randomUUID().toString().replace("-", "_");
--- End diff --

why should it be named sinkID when it is supposed to be a job-specific ID?


> Add a connector for streaming data into Cassandra
> -
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into 
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance 
> (for the tests): 
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard 
> java types to the respective cassandra types.
> In addition, it seems that there is a object mapper from datastax to store 
> POJOs in Cassandra (there are annotations for defining the primary key and 
> types)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3706) YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable

2016-04-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-3706:

Attachment: log.txt

> YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable
> 
>
> Key: FLINK-3706
> URL: https://issues.apache.org/jira/browse/FLINK-3706
> Project: Flink
>  Issue Type: Bug
>Reporter: Aljoscha Krettek
>  Labels: test-stability
> Attachments: log.txt
>
>
> I encountered a failed test on travis.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3706) YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable

2016-04-05 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-3706:
---

 Summary: YARNSessionCapacitySchedulerITCase.testNonexistingQueue 
unstable
 Key: FLINK-3706
 URL: https://issues.apache.org/jira/browse/FLINK-3706
 Project: Flink
  Issue Type: Bug
Reporter: Aljoscha Krettek
 Attachments: log.txt

I encountered a failed test on travis.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58562221
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.ArrayList;
+
+public class BatchExample {
+   private static final String INSERT_QUERY = "INSERT INTO test.batches 
(number, strings) VALUES (?,?);";
+   private static final String SELECT_QUERY = "SELECT number, strings FROM 
test.batches;";
+
+   /*
+*  table script: "CREATE TABLE test.batches (number int, strings 
text, PRIMARY KEY(number, strings));"
+*/
--- End diff --

I would remove this comment and move the query to the class javadocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226486#comment-15226486
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58562221
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.ArrayList;
+
+public class BatchExample {
+   private static final String INSERT_QUERY = "INSERT INTO test.batches 
(number, strings) VALUES (?,?);";
+   private static final String SELECT_QUERY = "SELECT number, strings FROM 
test.batches;";
+
+   /*
+*  table script: "CREATE TABLE test.batches (number int, strings 
text, PRIMARY KEY(number, strings));"
+*/
--- End diff --

I would remove this comment and move the query to the class javadocs.


> Add a connector for streaming data into Cassandra
> -
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into 
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance 
> (for the tests): 
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard 
> java types to the respective cassandra types.
> In addition, it seems that there is a object mapper from datastax to store 
> POJOs in Cassandra (there are annotations for defining the primary key and 
> types)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226485#comment-15226485
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58562121
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.ArrayList;
+
+public class BatchExample {
--- End diff --

Javadocs


> Add a connector for streaming data into Cassandra
> -
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into 
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance 
> (for the tests): 
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard 
> java types to the respective cassandra types.
> In addition, it seems that there is a object mapper from datastax to store 
> POJOs in Cassandra (there are annotations for defining the primary key and 
> types)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58562019
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
 ---
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+
+import java.io.Serializable;
+
+public abstract class ClusterBuilder implements Serializable {
--- End diff --

Since this is a user facing class, I would add some javadocs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226484#comment-15226484
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58562019
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java
 ---
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+
+import java.io.Serializable;
+
+public abstract class ClusterBuilder implements Serializable {
--- End diff --

Since this is a user facing class, I would add some javadocs


> Add a connector for streaming data into Cassandra
> -
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into 
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance 
> (for the tests): 
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard 
> java types to the respective cassandra types.
> In addition, it seems that there is a object mapper from datastax to store 
> POJOs in Cassandra (there are annotations for defining the primary key and 
> types)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58562121
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import java.util.ArrayList;
+
+public class BatchExample {
--- End diff --

Javadocs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226483#comment-15226483
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58561933
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
 ---
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
stores incoming records within a 
+ * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only 
commits them to cassandra
+ * if a checkpoint is completed.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraTupleWriteAheadSink extends 
GenericAtLeastOnceSink {
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   private final String insertQuery;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+   private transient FutureCallback callback;
+
+   private ClusterBuilder builder;
+
+   private int updatesSent = 0;
+   private AtomicInteger updatesConfirmed = new AtomicInteger(0);
+
+   private transient Object[] fields;
+
+   protected CassandraTupleWriteAheadSink(String insertQuery, 
TypeSerializer serializer, ClusterBuilder builder, String jobID, 
CheckpointCommitter committer) throws Exception {
+   super(committer, serializer, jobID);
+   this.insertQuery = insertQuery;
+   this.builder = builder;
+   ClosureCleaner.clean(builder, true);
+   }
+
+   public void open() throws Exception {
+   super.open();
+   if (!getRuntimeContext().isCheckpointingEnabled()) {
+   throw new IllegalStateException("The write-ahead log 
requires checkpointing to be enabled.");
+   }
+   this.callback = new FutureCallback() {
+   @Override
+   public void onSuccess(ResultSet resultSet) {
+   updatesConfirmed.incrementAndGet();
+   }
+
+   @Override
+   public void onFailure(Throwable throwable) {
+   exception = throwable;
+   }
+   };
+   cluster = builder.getCluster();
+   session = cluster.connect();
+   preparedStatement = session.prepare(insertQuery);
+
+   fields = new Object[((TupleSerializer) 
serializer).getArity()];
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   try {
+   session.close();
+   } catch (Exception e) {
+   

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58561933
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
 ---
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
stores incoming records within a 
+ * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only 
commits them to cassandra
+ * if a checkpoint is completed.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraTupleWriteAheadSink extends 
GenericAtLeastOnceSink {
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   private final String insertQuery;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+   private transient FutureCallback callback;
+
+   private ClusterBuilder builder;
+
+   private int updatesSent = 0;
+   private AtomicInteger updatesConfirmed = new AtomicInteger(0);
+
+   private transient Object[] fields;
+
+   protected CassandraTupleWriteAheadSink(String insertQuery, 
TypeSerializer serializer, ClusterBuilder builder, String jobID, 
CheckpointCommitter committer) throws Exception {
+   super(committer, serializer, jobID);
+   this.insertQuery = insertQuery;
+   this.builder = builder;
+   ClosureCleaner.clean(builder, true);
+   }
+
+   public void open() throws Exception {
+   super.open();
+   if (!getRuntimeContext().isCheckpointingEnabled()) {
+   throw new IllegalStateException("The write-ahead log 
requires checkpointing to be enabled.");
+   }
+   this.callback = new FutureCallback() {
+   @Override
+   public void onSuccess(ResultSet resultSet) {
+   updatesConfirmed.incrementAndGet();
+   }
+
+   @Override
+   public void onFailure(Throwable throwable) {
+   exception = throwable;
+   }
+   };
+   cluster = builder.getCluster();
+   session = cluster.connect();
+   preparedStatement = session.prepare(insertQuery);
+
+   fields = new Object[((TupleSerializer) 
serializer).getArity()];
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   try {
+   session.close();
+   } catch (Exception e) {
+   LOG.error("Error while closing session.", e);
+   }
+   try {
+   cluster.close();
+   } catch (Exception e) {
+   LOG.error("Error while closing cluster.", e);
+   }

[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226480#comment-15226480
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58561402
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
 ---
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
stores incoming records within a 
+ * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only 
commits them to cassandra
+ * if a checkpoint is completed.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraTupleWriteAheadSink extends 
GenericAtLeastOnceSink {
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   private final String insertQuery;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+   private transient FutureCallback callback;
+
+   private ClusterBuilder builder;
+
+   private int updatesSent = 0;
+   private AtomicInteger updatesConfirmed = new AtomicInteger(0);
+
+   private transient Object[] fields;
+
+   protected CassandraTupleWriteAheadSink(String insertQuery, 
TypeSerializer serializer, ClusterBuilder builder, String jobID, 
CheckpointCommitter committer) throws Exception {
+   super(committer, serializer, jobID);
+   this.insertQuery = insertQuery;
+   this.builder = builder;
+   ClosureCleaner.clean(builder, true);
+   }
+
+   public void open() throws Exception {
+   super.open();
+   if (!getRuntimeContext().isCheckpointingEnabled()) {
+   throw new IllegalStateException("The write-ahead log 
requires checkpointing to be enabled.");
+   }
+   this.callback = new FutureCallback() {
+   @Override
+   public void onSuccess(ResultSet resultSet) {
+   updatesConfirmed.incrementAndGet();
+   }
+
+   @Override
+   public void onFailure(Throwable throwable) {
+   exception = throwable;
--- End diff --

Error logging here as well


> Add a connector for streaming data into Cassandra
> -
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there 

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58561402
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
 ---
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Sink that emits its input elements into a Cassandra database. This sink 
stores incoming records within a 
+ * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only 
commits them to cassandra
+ * if a checkpoint is completed.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class CassandraTupleWriteAheadSink extends 
GenericAtLeastOnceSink {
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   private final String insertQuery;
+   private transient PreparedStatement preparedStatement;
+
+   private transient Throwable exception = null;
+   private transient FutureCallback callback;
+
+   private ClusterBuilder builder;
+
+   private int updatesSent = 0;
+   private AtomicInteger updatesConfirmed = new AtomicInteger(0);
+
+   private transient Object[] fields;
+
+   protected CassandraTupleWriteAheadSink(String insertQuery, 
TypeSerializer serializer, ClusterBuilder builder, String jobID, 
CheckpointCommitter committer) throws Exception {
+   super(committer, serializer, jobID);
+   this.insertQuery = insertQuery;
+   this.builder = builder;
+   ClosureCleaner.clean(builder, true);
+   }
+
+   public void open() throws Exception {
+   super.open();
+   if (!getRuntimeContext().isCheckpointingEnabled()) {
+   throw new IllegalStateException("The write-ahead log 
requires checkpointing to be enabled.");
+   }
+   this.callback = new FutureCallback() {
+   @Override
+   public void onSuccess(ResultSet resultSet) {
+   updatesConfirmed.incrementAndGet();
+   }
+
+   @Override
+   public void onFailure(Throwable throwable) {
+   exception = throwable;
--- End diff --

Error logging here as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat

2016-04-05 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226474#comment-15226474
 ] 

Maximilian Michels edited comment on FLINK-3655 at 4/5/16 3:37 PM:
---

Sounds good. It is important to maintain backwards compatibility. 

I'm not sure about the "comma-separated Path string". File names may contain 
commas. So we might skip that for now and do the path list first.

I think we could also use {{readFile(FileInputFormat inputFormat, String.. 
filePaths)}} which will return the filePath as a {{String[] filepaths}} array. 


was (Author: mxm):
Sounds good. It is important to maintain backwards compatibility. 

I'm not sure about the "comma-separated Path string". File names may contain 
commas. So we might skip that for now and do the path list first.

I think we could also use {{readFile(FileInputFormat inputFormat, String.. 
filePaths}} which will return the filePath as a {{String[] filepaths}} array. 

> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Priority: Minor
>  Labels: starter
>
> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat so that a DataSource will process the directories 
> sequentially.
>
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226477#comment-15226477
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58561096
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * CassandraSinkBase is the common abstract class of {@link 
CassandraPojoSink} and {@link CassandraTupleSink}.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public abstract class CassandraSinkBase extends 
RichSinkFunction {
+   protected static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBase.class);
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   protected transient Throwable exception = null;
+   protected transient FutureCallback callback;
+
+   private final ClusterBuilder builder;
+
+   protected CassandraSinkBase(ClusterBuilder builder) {
+   this.builder = builder;
+   ClosureCleaner.clean(builder, true);
+   }
+
+   @Override
+   public void open(Configuration configuration) {
+   this.callback = new FutureCallback() {
+   @Override
+   public void onSuccess(V ignored) {
+   }
+
+   @Override
+   public void onFailure(Throwable t) {
+   exception = t;
--- End diff --

Can you log the exception as well. Maybe invoke() is never called after the 
failure and nobody knows what's going on


> Add a connector for streaming data into Cassandra
> -
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into 
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance 
> (for the tests): 
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard 
> java types to the respective cassandra types.
> In addition, it seems that there is a object mapper from datastax to store 
> POJOs in Cassandra (there are annotations for defining the primary key and 
> types)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat

2016-04-05 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226474#comment-15226474
 ] 

Maximilian Michels commented on FLINK-3655:
---

Sounds good. It is important to maintain backwards compatibility. 

I'm not sure about the "comma-separated Path string". File names may contain 
commas. So we might skip that for now and do the path list first.

I think we could also use {{readFile(FileInputFormat inputFormat, String.. 
filePaths}} which will return the filePath as a {{String[] filepaths}} array. 

> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Priority: Minor
>  Labels: starter
>
> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat so that a DataSource will process the directories 
> sequentially.
>
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58561096
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * CassandraSinkBase is the common abstract class of {@link 
CassandraPojoSink} and {@link CassandraTupleSink}.
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public abstract class CassandraSinkBase extends 
RichSinkFunction {
+   protected static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBase.class);
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   protected transient Throwable exception = null;
+   protected transient FutureCallback callback;
+
+   private final ClusterBuilder builder;
+
+   protected CassandraSinkBase(ClusterBuilder builder) {
+   this.builder = builder;
+   ClosureCleaner.clean(builder, true);
+   }
+
+   @Override
+   public void open(Configuration configuration) {
+   this.callback = new FutureCallback() {
+   @Override
+   public void onSuccess(V ignored) {
+   }
+
+   @Override
+   public void onFailure(Throwable t) {
+   exception = t;
--- End diff --

Can you log the exception as well. Maybe invoke() is never called after the 
failure and nobody knows what's going on


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3705) Provide explanation for Hadoop dependencies and how to configure it

2016-04-05 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-3705:
--

 Summary: Provide explanation for Hadoop dependencies and how to 
configure it
 Key: FLINK-3705
 URL: https://issues.apache.org/jira/browse/FLINK-3705
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Ufuk Celebi
Priority: Minor


I think it will be valuable to add a dedicated page explaining how Flink 
interacts with Hadoop and why it has the dependency. Furthermore, it will be 
beneficial to give an overview of how to configure it, because there are 
multiple ways to do it (in Flink's config file, environment varaiables) and 
multiple scenarios where you do it (standaline, YARN).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3614) Remove Non-Keyed Window Operator

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226444#comment-15226444
 ] 

ASF GitHub Bot commented on FLINK-3614:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/1805


> Remove Non-Keyed Window Operator
> 
>
> Key: FLINK-3614
> URL: https://issues.apache.org/jira/browse/FLINK-3614
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> I propose to remove the special Non-Keyed Window Operator and implement 
> non-parallel windows by using the standard WindowOperator with a dummy 
> KeySelector.
> Maintaining everything for two WindowOperators is a huge burden. The 
> implementation is completely separate by now. For example, the Non-Keyed 
> window operator does not use the StateBackend for state, i.e. cannot use 
> RocksDB. Also, with upcoming changes (Merging/Session windows, aligned 
> windows) this will only increase the maintenance burden.
> Also, the fast AlignedProcessingTimeWindows operators also only support the 
> Parallel/Keyed case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3174) Add merging WindowAssigner

2016-04-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek resolved FLINK-3174.
-
   Resolution: Fixed
Fix Version/s: (was: 1.0.0)
   1.1.0

Done in 
https://github.com/apache/flink/commit/6cd8ceb10c841827cf89b74ecf5a0495a6933d53

> Add merging WindowAssigner
> --
>
> Key: FLINK-3174
> URL: https://issues.apache.org/jira/browse/FLINK-3174
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.1.0
>
>
> We should add the possibility for WindowAssigners to merge windows. This will 
> enable Session windowing support, similar to how Google Cloud Dataflow 
> supports.
> For session windows, each element would initially be assigned to its own 
> window. When triggering we check the windows and see if any can be merged. 
> This way, elements with overlapping session windows merge into one session.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226447#comment-15226447
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58557862
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import java.util.UUID;
+
+/**
+ * This class wraps different Cassandra sink implementations to provide a 
common interface for all of them.
+ *
+ * @param  input type
+ */
+public class CassandraSink {
+   private static final String jobID = 
UUID.randomUUID().toString().replace("-", "_");
+   private final boolean useDataStreamSink;
+   private DataStreamSink sink1;
+   private SingleOutputStreamOperator sink2;
+
+   private CassandraSink(DataStreamSink sink) {
+   sink1 = sink;
+   useDataStreamSink = true;
+   }
+
+   private CassandraSink(SingleOutputStreamOperator sink) {
+   sink2 = sink;
+   useDataStreamSink = false;
+   }
+
+   private SinkTransformation getSinkTransformation() {
+   return sink1.getTransformation();
+   }
+
+   private StreamTransformation getStreamTransformation() {
+   return sink2.getTransformation();
+   }
+
+   /**
+* Sets the name of this sink. This name is
+* used by the visualization and logging during runtime.
+*
+* @return The named sink.
+*/
+   public CassandraSink name(String name) {
+   if (useDataStreamSink) {
+   getSinkTransformation().setName(name);
+   } else {
+   getStreamTransformation().setName(name);
+   }
+   return this;
+   }
+
+   /**
+* Sets an ID for this operator.
+* 
+* The specified ID is used to assign the same operator ID across job
+* submissions (for example when starting a job from a savepoint).
+* 
+* Important: this ID needs to be unique per
+* transformation and job. Otherwise, job submission will fail.
+*
+* @param uid The unique user-specified ID of this transformation.
+* @return The operator with the specified ID.
+*/
+   public CassandraSink uid(String uid) {
+   if (useDataStreamSink) {
+   getSinkTransformation().setUid(uid);
+   } else {
+   getStreamTransformation().setUid(uid);
+   }
+   return this;
+   }
+
+   /**
+* Sets the parallelism for this sink. The degree must be higher than 
zero.
+*
+* @param parallelism The parallelism for this sink.
+* @return The sink with set parallelism.
+*/
+   public CassandraSink setParallelism(int parallelism) {
+   if (useDataStreamSink) {
+  

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58557862
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import java.util.UUID;
+
+/**
+ * This class wraps different Cassandra sink implementations to provide a 
common interface for all of them.
+ *
+ * @param  input type
+ */
+public class CassandraSink {
+   private static final String jobID = 
UUID.randomUUID().toString().replace("-", "_");
+   private final boolean useDataStreamSink;
+   private DataStreamSink sink1;
+   private SingleOutputStreamOperator sink2;
+
+   private CassandraSink(DataStreamSink sink) {
+   sink1 = sink;
+   useDataStreamSink = true;
+   }
+
+   private CassandraSink(SingleOutputStreamOperator sink) {
+   sink2 = sink;
+   useDataStreamSink = false;
+   }
+
+   private SinkTransformation getSinkTransformation() {
+   return sink1.getTransformation();
+   }
+
+   private StreamTransformation getStreamTransformation() {
+   return sink2.getTransformation();
+   }
+
+   /**
+* Sets the name of this sink. This name is
+* used by the visualization and logging during runtime.
+*
+* @return The named sink.
+*/
+   public CassandraSink name(String name) {
+   if (useDataStreamSink) {
+   getSinkTransformation().setName(name);
+   } else {
+   getStreamTransformation().setName(name);
+   }
+   return this;
+   }
+
+   /**
+* Sets an ID for this operator.
+* 
+* The specified ID is used to assign the same operator ID across job
+* submissions (for example when starting a job from a savepoint).
+* 
+* Important: this ID needs to be unique per
+* transformation and job. Otherwise, job submission will fail.
+*
+* @param uid The unique user-specified ID of this transformation.
+* @return The operator with the specified ID.
+*/
+   public CassandraSink uid(String uid) {
+   if (useDataStreamSink) {
+   getSinkTransformation().setUid(uid);
+   } else {
+   getStreamTransformation().setUid(uid);
+   }
+   return this;
+   }
+
+   /**
+* Sets the parallelism for this sink. The degree must be higher than 
zero.
+*
+* @param parallelism The parallelism for this sink.
+* @return The sink with set parallelism.
+*/
+   public CassandraSink setParallelism(int parallelism) {
+   if (useDataStreamSink) {
+   getSinkTransformation().setParallelism(parallelism);
+   } else {
+   getStreamTransformation().setParallelism(parallelism);
+   }
+   return this;
+   }
+
  

[jira] [Resolved] (FLINK-3614) Remove Non-Keyed Window Operator

2016-04-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek resolved FLINK-3614.
-
   Resolution: Fixed
Fix Version/s: 1.1.0

Done in 
https://github.com/apache/flink/commit/505512dbe461b9840dde6197c71dbb90b49c0495

> Remove Non-Keyed Window Operator
> 
>
> Key: FLINK-3614
> URL: https://issues.apache.org/jira/browse/FLINK-3614
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.1.0
>
>
> I propose to remove the special Non-Keyed Window Operator and implement 
> non-parallel windows by using the standard WindowOperator with a dummy 
> KeySelector.
> Maintaining everything for two WindowOperators is a huge burden. The 
> implementation is completely separate by now. For example, the Non-Keyed 
> window operator does not use the StateBackend for state, i.e. cannot use 
> RocksDB. Also, with upcoming changes (Merging/Session windows, aligned 
> windows) this will only increase the maintenance burden.
> Also, the fast AlignedProcessingTimeWindows operators also only support the 
> Parallel/Keyed case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3614] Remove Non-Keyed Window Operator

2016-04-05 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/1805


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3174) Add merging WindowAssigner

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226441#comment-15226441
 ] 

ASF GitHub Bot commented on FLINK-3174:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/1802


> Add merging WindowAssigner
> --
>
> Key: FLINK-3174
> URL: https://issues.apache.org/jira/browse/FLINK-3174
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.0.0
>
>
> We should add the possibility for WindowAssigners to merge windows. This will 
> enable Session windowing support, similar to how Google Cloud Dataflow 
> supports.
> For session windows, each element would initially be assigned to its own 
> window. When triggering we check the windows and see if any can be merged. 
> This way, elements with overlapping session windows merge into one session.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3614) Remove Non-Keyed Window Operator

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226443#comment-15226443
 ] 

ASF GitHub Bot commented on FLINK-3614:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1805#issuecomment-205855000
  
Manually merged.


> Remove Non-Keyed Window Operator
> 
>
> Key: FLINK-3614
> URL: https://issues.apache.org/jira/browse/FLINK-3614
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> I propose to remove the special Non-Keyed Window Operator and implement 
> non-parallel windows by using the standard WindowOperator with a dummy 
> KeySelector.
> Maintaining everything for two WindowOperators is a huge burden. The 
> implementation is completely separate by now. For example, the Non-Keyed 
> window operator does not use the StateBackend for state, i.e. cannot use 
> RocksDB. Also, with upcoming changes (Merging/Session windows, aligned 
> windows) this will only increase the maintenance burden.
> Also, the fast AlignedProcessingTimeWindows operators also only support the 
> Parallel/Keyed case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3614] Remove Non-Keyed Window Operator

2016-04-05 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1805#issuecomment-205855000
  
Manually merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3174] Add MergingWindowAssigner and Ses...

2016-04-05 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/1802


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226433#comment-15226433
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58556585
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import java.util.UUID;
+
+/**
+ * This class wraps different Cassandra sink implementations to provide a 
common interface for all of them.
+ *
+ * @param  input type
+ */
+public class CassandraSink {
+   private static final String jobID = 
UUID.randomUUID().toString().replace("-", "_");
+   private final boolean useDataStreamSink;
+   private DataStreamSink sink1;
+   private SingleOutputStreamOperator sink2;
+
+   private CassandraSink(DataStreamSink sink) {
+   sink1 = sink;
+   useDataStreamSink = true;
+   }
+
+   private CassandraSink(SingleOutputStreamOperator sink) {
+   sink2 = sink;
+   useDataStreamSink = false;
+   }
+
+   private SinkTransformation getSinkTransformation() {
+   return sink1.getTransformation();
+   }
+
+   private StreamTransformation getStreamTransformation() {
+   return sink2.getTransformation();
+   }
+
+   /**
+* Sets the name of this sink. This name is
+* used by the visualization and logging during runtime.
+*
+* @return The named sink.
+*/
+   public CassandraSink name(String name) {
+   if (useDataStreamSink) {
+   getSinkTransformation().setName(name);
+   } else {
+   getStreamTransformation().setName(name);
+   }
+   return this;
+   }
+
+   /**
+* Sets an ID for this operator.
+* 
+* The specified ID is used to assign the same operator ID across job
+* submissions (for example when starting a job from a savepoint).
+* 
+* Important: this ID needs to be unique per
+* transformation and job. Otherwise, job submission will fail.
+*
+* @param uid The unique user-specified ID of this transformation.
+* @return The operator with the specified ID.
+*/
+   public CassandraSink uid(String uid) {
+   if (useDataStreamSink) {
+   getSinkTransformation().setUid(uid);
+   } else {
+   getStreamTransformation().setUid(uid);
+   }
+   return this;
+   }
+
+   /**
+* Sets the parallelism for this sink. The degree must be higher than 
zero.
+*
+* @param parallelism The parallelism for this sink.
+* @return The sink with set parallelism.
+*/
+   public CassandraSink setParallelism(int parallelism) {
+   if (useDataStreamSink) {
+  

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58556585
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import java.util.UUID;
+
+/**
+ * This class wraps different Cassandra sink implementations to provide a 
common interface for all of them.
+ *
+ * @param  input type
+ */
+public class CassandraSink {
+   private static final String jobID = 
UUID.randomUUID().toString().replace("-", "_");
+   private final boolean useDataStreamSink;
+   private DataStreamSink sink1;
+   private SingleOutputStreamOperator sink2;
+
+   private CassandraSink(DataStreamSink sink) {
+   sink1 = sink;
+   useDataStreamSink = true;
+   }
+
+   private CassandraSink(SingleOutputStreamOperator sink) {
+   sink2 = sink;
+   useDataStreamSink = false;
+   }
+
+   private SinkTransformation getSinkTransformation() {
+   return sink1.getTransformation();
+   }
+
+   private StreamTransformation getStreamTransformation() {
+   return sink2.getTransformation();
+   }
+
+   /**
+* Sets the name of this sink. This name is
+* used by the visualization and logging during runtime.
+*
+* @return The named sink.
+*/
+   public CassandraSink name(String name) {
+   if (useDataStreamSink) {
+   getSinkTransformation().setName(name);
+   } else {
+   getStreamTransformation().setName(name);
+   }
+   return this;
+   }
+
+   /**
+* Sets an ID for this operator.
+* 
+* The specified ID is used to assign the same operator ID across job
+* submissions (for example when starting a job from a savepoint).
+* 
+* Important: this ID needs to be unique per
+* transformation and job. Otherwise, job submission will fail.
+*
+* @param uid The unique user-specified ID of this transformation.
+* @return The operator with the specified ID.
+*/
+   public CassandraSink uid(String uid) {
+   if (useDataStreamSink) {
+   getSinkTransformation().setUid(uid);
+   } else {
+   getStreamTransformation().setUid(uid);
+   }
+   return this;
+   }
+
+   /**
+* Sets the parallelism for this sink. The degree must be higher than 
zero.
+*
+* @param parallelism The parallelism for this sink.
+* @return The sink with set parallelism.
+*/
+   public CassandraSink setParallelism(int parallelism) {
+   if (useDataStreamSink) {
+   getSinkTransformation().setParallelism(parallelism);
+   } else {
+   getStreamTransformation().setParallelism(parallelism);
+   }
+   return this;
+   }
+
  

[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226428#comment-15226428
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58556061
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import java.util.UUID;
+
+/**
+ * This class wraps different Cassandra sink implementations to provide a 
common interface for all of them.
+ *
+ * @param  input type
+ */
+public class CassandraSink {
+   private static final String jobID = 
UUID.randomUUID().toString().replace("-", "_");
--- End diff --

`jobID` is used within Flink a lot and has a different meaning there. Maybe 
it make sense to rename this to "sink id".

Why is this field static? This can lead to problems when a user is using 
two cassandra sinks


> Add a connector for streaming data into Cassandra
> -
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into 
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance 
> (for the tests): 
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard 
> java types to the respective cassandra types.
> In addition, it seems that there is a object mapper from datastax to store 
> POJOs in Cassandra (there are annotations for defining the primary key and 
> types)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58556061
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+import java.util.UUID;
+
+/**
+ * This class wraps different Cassandra sink implementations to provide a 
common interface for all of them.
+ *
+ * @param  input type
+ */
+public class CassandraSink {
+   private static final String jobID = 
UUID.randomUUID().toString().replace("-", "_");
--- End diff --

`jobID` is used within Flink a lot and has a different meaning there. Maybe 
it make sense to rename this to "sink id".

Why is this field static? This can lead to problems when a user is using 
two cassandra sinks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58548774
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 ---
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ * 
+ * Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+   private ClusterBuilder builder;
+   private transient Cluster cluster;
+   private transient Session session;
+
+   private static final String KEYSPACE = "flink_auxiliary";
+   private String TABLE = "checkpoints_";
--- End diff --

by convention, (non static) fields are lowercase


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58548996
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 ---
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ * 
+ * Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+   private ClusterBuilder builder;
+   private transient Cluster cluster;
+   private transient Session session;
+
+   private static final String KEYSPACE = "flink_auxiliary";
+   private String TABLE = "checkpoints_";
+
+   private transient PreparedStatement deleteStatement;
+   private transient PreparedStatement updateStatement;
+   private transient PreparedStatement selectStatement;
+
+   public CassandraCommitter(ClusterBuilder builder) {
+   this.builder = builder;
+   ClosureCleaner.clean(builder, true);
+   }
+
+   /**
+* Internally used to set the job ID after instantiation.
+*
+* @param id
+* @throws Exception
+*/
+   public void setJobId(String id) throws Exception {
+   super.setJobId(id);
+   TABLE += id;
+   }
+
+   /**
+* Generates the necessary tables to store information.
+*
+* @return
+* @throws Exception
+*/
+   @Override
+   public void createResource() throws Exception {
+   cluster = builder.getCluster();
+   session = cluster.connect();
+
+   session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s 
with replication={'class':'SimpleStrategy', 'replication_factor':3};", 
KEYSPACE));
+   session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s 
(sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, 
sub_id));", KEYSPACE, TABLE));
--- End diff --

I think it would be better to allow users passing a custom keyspace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226375#comment-15226375
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58548996
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 ---
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ * 
+ * Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+   private ClusterBuilder builder;
+   private transient Cluster cluster;
+   private transient Session session;
+
+   private static final String KEYSPACE = "flink_auxiliary";
+   private String TABLE = "checkpoints_";
+
+   private transient PreparedStatement deleteStatement;
+   private transient PreparedStatement updateStatement;
+   private transient PreparedStatement selectStatement;
+
+   public CassandraCommitter(ClusterBuilder builder) {
+   this.builder = builder;
+   ClosureCleaner.clean(builder, true);
+   }
+
+   /**
+* Internally used to set the job ID after instantiation.
+*
+* @param id
+* @throws Exception
+*/
+   public void setJobId(String id) throws Exception {
+   super.setJobId(id);
+   TABLE += id;
+   }
+
+   /**
+* Generates the necessary tables to store information.
+*
+* @return
+* @throws Exception
+*/
+   @Override
+   public void createResource() throws Exception {
+   cluster = builder.getCluster();
+   session = cluster.connect();
+
+   session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s 
with replication={'class':'SimpleStrategy', 'replication_factor':3};", 
KEYSPACE));
+   session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s 
(sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, 
sub_id));", KEYSPACE, TABLE));
--- End diff --

I think it would be better to allow users passing a custom keyspace.


> Add a connector for streaming data into Cassandra
> -
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into 
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance 
> (for the tests): 
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard 
> java types to the respective cassandra types.
> In addition, it seems that there is a object mapper from datastax to store 
> POJOs in Cassandra (there are annotations for defining the primary key and 
> types)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226373#comment-15226373
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58548774
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 ---
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ * 
+ * Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+   private ClusterBuilder builder;
+   private transient Cluster cluster;
+   private transient Session session;
+
+   private static final String KEYSPACE = "flink_auxiliary";
+   private String TABLE = "checkpoints_";
--- End diff --

by convention, (non static) fields are lowercase


> Add a connector for streaming data into Cassandra
> -
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into 
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance 
> (for the tests): 
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard 
> java types to the respective cassandra types.
> In addition, it seems that there is a object mapper from datastax to store 
> POJOs in Cassandra (there are annotations for defining the primary key and 
> types)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-04-05 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58548581
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.batch.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} 
into Apache Cassandra.
+ *
+ * @param  type of Tuple
+ */
+public class CassandraOutputFormat extends 
RichOutputFormat {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraOutputFormat.class);
+
+   private final String insertQuery;
+   private final ClusterBuilder builder;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement prepared;
+   private transient FutureCallback callback;
+   private transient Throwable exception = null;
+
+   public CassandraOutputFormat(String insertQuery, ClusterBuilder 
builder) {
+   if (Strings.isNullOrEmpty(insertQuery)) {
+   throw new IllegalArgumentException("insertQuery cannot 
be null or empty");
+   }
+   if (builder == null) {
+   throw new IllegalArgumentException("Builder cannot be 
null.");
+   }
+   this.insertQuery = insertQuery;
+   this.builder = builder;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   this.cluster = builder.getCluster();
+   }
+
+   /**
+* Opens a Session to Cassandra and initializes the prepared statement.
+*
+* @param taskNumber The number of the parallel instance.
+* @throws IOException Thrown, if the output could not be opened due to 
an
+* I/O problem.
+*/
+   @Override
+   public void open(int taskNumber, int numTasks) throws IOException {
+   this.session = cluster.connect();
+   this.prepared = session.prepare(insertQuery);
+   this.callback = new FutureCallback() {
+   @Override
+   public void onSuccess(ResultSet ignored) {
+   }
+
+   @Override
+   public void onFailure(Throwable t) {
+   exception = t;
+   }
+   };
+   }
+
+   @Override
+   public void writeRecord(OUT record) throws IOException {
+   if (exception != null) {
+   throw new IOException("write record failed", exception);
+   }
+
+   Object[] fields = new Object[record.getArity()];
+   for (int i = 0; i < record.getArity(); i++) {
+   fields[i] = record.getField(i);
+   }
+   ResultSetFuture result = 
session.executeAsync(prepared.bind(fields));
+   Futures.addCallback(result, callback);
+   }
+
+   /**
+* Closes all resources used.
+*/
+   @Override
+   public void close() throws IOException {
+   try {
 

[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226369#comment-15226369
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r58548581
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.batch.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} 
into Apache Cassandra.
+ *
+ * @param  type of Tuple
+ */
+public class CassandraOutputFormat extends 
RichOutputFormat {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CassandraOutputFormat.class);
+
+   private final String insertQuery;
+   private final ClusterBuilder builder;
+
+   private transient Cluster cluster;
+   private transient Session session;
+   private transient PreparedStatement prepared;
+   private transient FutureCallback callback;
+   private transient Throwable exception = null;
+
+   public CassandraOutputFormat(String insertQuery, ClusterBuilder 
builder) {
+   if (Strings.isNullOrEmpty(insertQuery)) {
+   throw new IllegalArgumentException("insertQuery cannot 
be null or empty");
+   }
+   if (builder == null) {
+   throw new IllegalArgumentException("Builder cannot be 
null.");
+   }
+   this.insertQuery = insertQuery;
+   this.builder = builder;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   this.cluster = builder.getCluster();
+   }
+
+   /**
+* Opens a Session to Cassandra and initializes the prepared statement.
+*
+* @param taskNumber The number of the parallel instance.
+* @throws IOException Thrown, if the output could not be opened due to 
an
+* I/O problem.
+*/
+   @Override
+   public void open(int taskNumber, int numTasks) throws IOException {
+   this.session = cluster.connect();
+   this.prepared = session.prepare(insertQuery);
+   this.callback = new FutureCallback() {
+   @Override
+   public void onSuccess(ResultSet ignored) {
+   }
+
+   @Override
+   public void onFailure(Throwable t) {
+   exception = t;
+   }
+   };
+   }
+
+   @Override
+   public void writeRecord(OUT record) throws IOException {
+   if (exception != null) {
+   throw new IOException("write record failed", exception);
+   }
+
+   Object[] fields = new Object[record.getArity()];
+   for (int i = 0; i < record.getArity(); i++) {
+   fields[i] = record.getField(i);
+   }
+   ResultSetFuture result = 

[jira] [Created] (FLINK-3704) JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure unstable

2016-04-05 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-3704:
-

 Summary: 
JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure 
unstable
 Key: FLINK-3704
 URL: https://issues.apache.org/jira/browse/FLINK-3704
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Reporter: Robert Metzger


https://s3.amazonaws.com/archive.travis-ci.org/jobs/120882840/log.txt

{code}
testJobManagerProcessFailure[1](org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase)
  Time elapsed: 9.302 sec  <<< ERROR!
java.io.IOException: Actor at akka.tcp://flink@127.0.0.1:55591/user/jobmanager 
not reachable. Please make sure that the actor is running and its port is 
reachable.
at 
org.apache.flink.runtime.akka.AkkaUtils$.getActorRef(AkkaUtils.scala:384)
at org.apache.flink.runtime.akka.AkkaUtils.getActorRef(AkkaUtils.scala)
at 
org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure(JobManagerHAProcessFailureBatchRecoveryITCase.java:290)
Caused by: akka.actor.ActorNotFound: Actor not found for: 
ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:55591/), 
Path(/user/jobmanager)]
at 
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
at 
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at 
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at 
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
at 
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
at 
akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
at 
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
at 
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3398) Flink Kafka consumer should support auto-commit opt-outs

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226294#comment-15226294
 ] 

ASF GitHub Bot commented on FLINK-3398:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1690#issuecomment-205819299
  
I think the functionality is very desirable. In many cases, you really 
don't want Flink to write something to Zookeeper for some random GroupId. This 
floods ZooKeeper with garbage that helps no one.

I think that this is actually a kind of nice solution. The 
`auto.commit.enable` in Flink means not necessary periodically, but "on 
checkpoint or periodically".


> Flink Kafka consumer should support auto-commit opt-outs
> 
>
> Key: FLINK-3398
> URL: https://issues.apache.org/jira/browse/FLINK-3398
> Project: Flink
>  Issue Type: Bug
>Reporter: Shikhar Bhushan
>
> Currently the Kafka source will commit consumer offsets to Zookeeper, either 
> upon a checkpoint if checkpointing is enabled, otherwise periodically based 
> on {{auto.commit.interval.ms}}
> It should be possible to opt-out of committing consumer offsets to Zookeeper. 
> Kafka has this config as {{auto.commit.enable}} (0.8) and 
> {{enable.auto.commit}} (0.9).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3398: Allow for opting-out from Kafka of...

2016-04-05 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1690#issuecomment-205819299
  
I think the functionality is very desirable. In many cases, you really 
don't want Flink to write something to Zookeeper for some random GroupId. This 
floods ZooKeeper with garbage that helps no one.

I think that this is actually a kind of nice solution. The 
`auto.commit.enable` in Flink means not necessary periodically, but "on 
checkpoint or periodically".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3541) Clean up workaround in FlinkKafkaConsumer09

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226271#comment-15226271
 ] 

ASF GitHub Bot commented on FLINK-3541:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1846#issuecomment-205811304
  
Yes, we can drop the retry loop.


> Clean up workaround in FlinkKafkaConsumer09 
> 
>
> Key: FLINK-3541
> URL: https://issues.apache.org/jira/browse/FLINK-3541
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Priority: Minor
>
> In the current {{FlinkKafkaConsumer09}} implementation, we repeatedly start a 
> new {{KafkaConsumer}} if the method {{KafkaConsumer.partitionsFor}} returns a 
> NPE. This is due to a bug with the Kafka version 0.9.0.0. See 
> https://issues.apache.org/jira/browse/KAFKA-2880. The code can be found in 
> the constructor of {{FlinkKafkaConsumer09.java:208}}.
> However, the problem is marked as fixed for version 0.9.0.1, which we also 
> use for the flink-connector-kafka. Therefore, we should be able to get rid of 
> the workaround.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3541] [Kafka Connector] Clean up workar...

2016-04-05 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1846#issuecomment-205811304
  
Yes, we can drop the retry loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3700] [core] Add 'Preconditions' utilit...

2016-04-05 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1853#issuecomment-205811166
  
+1 for merging :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3700) Replace Guava Preconditions class with Flink Preconditions

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226270#comment-15226270
 ] 

ASF GitHub Bot commented on FLINK-3700:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1853#issuecomment-205811166
  
+1 for merging :-)


> Replace Guava Preconditions class with Flink Preconditions
> --
>
> Key: FLINK-3700
> URL: https://issues.apache.org/jira/browse/FLINK-3700
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>
> In order to reduce the dependency on Guava (which has cause us quite a bit of 
> pain in the past with its version conflicts), I suggest to add a Flink 
> {{Preconditions}} class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3700) Replace Guava Preconditions class with Flink Preconditions

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226250#comment-15226250
 ] 

ASF GitHub Bot commented on FLINK-3700:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1854

[FLINK-3700] [core] Remove Guava as a dependency from "flink-core"

**Note: This builds on top of pull request  #1853**

Almost all Guava functionality used within `flink-core` has corresponding 
utils in Flink's codebase, or the JDK library.

This replaces the Guava code as follows
  - Preconditions calls by Flink's Preconditions class
  - Collection utils by simple Java Collection calls
  - Iterator's by Flink's Union Iterator
  - Murmur Hasher calls by Flink's `MathUtil.murmurHash()`
  - Files by simple util methods around `java.nio.Files`
  - InetAddresses IPv6 encoding code has been adapted into Flink's NetUtils 
(with attribution comments)

Some util classes where moved from `flink-runtime` to `flink-core`.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink guava_free_core

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1854.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1854


commit 535e83a297d39315d9d4b36672444b44f409c81e
Author: Stephan Ewen 
Date:   2016-04-05T10:37:33Z

[FLINK-3700] [build] Add 'findbugs' (javax.annotation) annotations as a 
core dependency.

commit eac0813d9f7298211bf52aec61ce346a133436f8
Author: Stephan Ewen 
Date:   2016-04-05T11:23:14Z

[FLINK-3700] [core] Add 'Preconditions' utility class.

commit 052db9286ad8aa8df3e122c300361dbc384a0190
Author: Stephan Ewen 
Date:   2016-04-05T13:18:32Z

[FLINK-3700] [core] Removes Guava Dependency from flink-core

Almost all Guava functionality used within flink-core has corresponding
utils in Flink's codebase, or the JDK library.

This replaces the Guava code as follows
  - Preconditions calls by Flink's Preconditions class
  - Collection utils by simple Java Collection calls
  - Iterator's by Flink's Union Iterator
  - Files by simple util methods arount java.nio.Files
  - InetAddresses IPv6 encoding code has been adapted into Flink's NetUtils 
(with attribution comments)

Some util classes where moved from flink-runtime to flink-core.




> Replace Guava Preconditions class with Flink Preconditions
> --
>
> Key: FLINK-3700
> URL: https://issues.apache.org/jira/browse/FLINK-3700
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>
> In order to reduce the dependency on Guava (which has cause us quite a bit of 
> pain in the past with its version conflicts), I suggest to add a Flink 
> {{Preconditions}} class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3700] [core] Remove Guava as a dependen...

2016-04-05 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1854

[FLINK-3700] [core] Remove Guava as a dependency from "flink-core"

**Note: This builds on top of pull request  #1853**

Almost all Guava functionality used within `flink-core` has corresponding 
utils in Flink's codebase, or the JDK library.

This replaces the Guava code as follows
  - Preconditions calls by Flink's Preconditions class
  - Collection utils by simple Java Collection calls
  - Iterator's by Flink's Union Iterator
  - Murmur Hasher calls by Flink's `MathUtil.murmurHash()`
  - Files by simple util methods around `java.nio.Files`
  - InetAddresses IPv6 encoding code has been adapted into Flink's NetUtils 
(with attribution comments)

Some util classes where moved from `flink-runtime` to `flink-core`.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink guava_free_core

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1854.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1854


commit 535e83a297d39315d9d4b36672444b44f409c81e
Author: Stephan Ewen 
Date:   2016-04-05T10:37:33Z

[FLINK-3700] [build] Add 'findbugs' (javax.annotation) annotations as a 
core dependency.

commit eac0813d9f7298211bf52aec61ce346a133436f8
Author: Stephan Ewen 
Date:   2016-04-05T11:23:14Z

[FLINK-3700] [core] Add 'Preconditions' utility class.

commit 052db9286ad8aa8df3e122c300361dbc384a0190
Author: Stephan Ewen 
Date:   2016-04-05T13:18:32Z

[FLINK-3700] [core] Removes Guava Dependency from flink-core

Almost all Guava functionality used within flink-core has corresponding
utils in Flink's codebase, or the JDK library.

This replaces the Guava code as follows
  - Preconditions calls by Flink's Preconditions class
  - Collection utils by simple Java Collection calls
  - Iterator's by Flink's Union Iterator
  - Files by simple util methods arount java.nio.Files
  - InetAddresses IPv6 encoding code has been adapted into Flink's NetUtils 
(with attribution comments)

Some util classes where moved from flink-runtime to flink-core.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3700) Replace Guava Preconditions class with Flink Preconditions

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226246#comment-15226246
 ] 

ASF GitHub Bot commented on FLINK-3700:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1853

[FLINK-3700] [core] Add 'Preconditions' utility class.

The functionality that Flink uses from Guava is super simple and limited.
We get a big dependency that has caused a lot of pain in the past, simply 
to get access to some simple utility methods. This has cause us quite a bit of 
pain in the past, because of Guava version conflicts and the necessary 
dependency shading.

In order to reduce the dependency on Guava, this adds a simple Flink 
Preconditions class.

While referencing well established libraries is a good idea for standalone 
apps, it is a problem for frameworks like Flink, if the dependencies are not 
well-behaved with respect to version compatibility. Guava is not well behaved 
in that sense.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
flink_preconditions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1853.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1853






> Replace Guava Preconditions class with Flink Preconditions
> --
>
> Key: FLINK-3700
> URL: https://issues.apache.org/jira/browse/FLINK-3700
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>
> In order to reduce the dependency on Guava (which has cause us quite a bit of 
> pain in the past with its version conflicts), I suggest to add a Flink 
> {{Preconditions}} class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3700] [core] Add 'Preconditions' utilit...

2016-04-05 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1853

[FLINK-3700] [core] Add 'Preconditions' utility class.

The functionality that Flink uses from Guava is super simple and limited.
We get a big dependency that has caused a lot of pain in the past, simply 
to get access to some simple utility methods. This has cause us quite a bit of 
pain in the past, because of Guava version conflicts and the necessary 
dependency shading.

In order to reduce the dependency on Guava, this adds a simple Flink 
Preconditions class.

While referencing well established libraries is a good idea for standalone 
apps, it is a problem for frameworks like Flink, if the dependencies are not 
well-behaved with respect to version compatibility. Guava is not well behaved 
in that sense.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
flink_preconditions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1853.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1853






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3689) JobManager blocks cluster shutdown when not connected to ResourceManager

2016-04-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226229#comment-15226229
 ] 

ASF GitHub Bot commented on FLINK-3689:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1852

[FLINK-3689] do not shutdown test ActorSystem

Instead of shutting down the ActorSystem created in the test, we simply 
send a
message upon executing the shutdown method of the JobManager, TaskManager, 
and
ResourceManager. This ensures we can check for shutdown code execution 
without
interfering with the test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3689

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1852.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1852


commit 39e9bb7be3a000cf7e47c4f6dd4856d36832480e
Author: Maximilian Michels 
Date:   2016-04-05T12:35:47Z

[FLINK-3689] do not shutdown test ActorSystem

Instead of shutting down the ActorSystem created in the test, we simply 
send a
message upon executing the shutdown method of the JobManager, TaskManager, 
and
ResourceManager. This ensures we can check for shutdown code execution 
without
interfering with the test.




> JobManager blocks cluster shutdown when not connected to ResourceManager
> 
>
> Key: FLINK-3689
> URL: https://issues.apache.org/jira/browse/FLINK-3689
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >