[jira] [Commented] (FLINK-3086) ExpressionParser does not support concatenation of suffix operations
[ https://issues.apache.org/jira/browse/FLINK-3086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227709#comment-15227709 ] ramkrishna.s.vasudevan commented on FLINK-3086: --- This also includes abs() operator when used with an int like for eg 1.abs() does not work. > ExpressionParser does not support concatenation of suffix operations > > > Key: FLINK-3086 > URL: https://issues.apache.org/jira/browse/FLINK-3086 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther > > The ExpressionParser of the Table API does not support concatenation of > suffix operations. e.g. > {code}table.select("field.cast(STRING).substring(2)"){code} throws an > exception. > {code} > org.apache.flink.api.table.ExpressionException: Could not parse expression: > string matching regex `\z' expected but `.' found > at > org.apache.flink.api.table.parser.ExpressionParser$.parseExpressionList(ExpressionParser.scala:224) > {code} > However, the Scala implicit Table Expression API supports this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-1934) Add approximative k-nearest-neighbours (kNN) algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227632#comment-15227632 ] Daniel Blazevski edited comment on FLINK-1934 at 4/6/16 3:07 AM: - [~chiwanpark] [~till.rohrmann] I have a Flink version -- still a bit preliminary -- of the approximate knn up and running. The exact knn using a quadtree performs quite bad in moderate-to-high spatial dimension (e.g 20,000 test and training points in 6D, the quadtree is worse, but no worries I took care of this and the exact decides when to use quadtree or not). https://github.com/danielblazevski/flink/blob/FLINK-1934/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/nn/zknn.scala A preliminary test shows good scaling with the number when the test + training points are increased. 8,000 points in 6D (i.e. 8,000 test points and 8,000 training points): Elapsed time approx = : 2s Elapsed time exact = : 27s 64,000 in 6D: Elapsed time approx = : 6s (didn't run the exact version, we know it's O(N^2)) I will have to clean things up, add edge cases, etc which may slow down the run-time a bit, but will definitely not increase the complexity of the algorithm with respect to the number of test/training points. This still use a cross product, which I was hoping to avoid, but not sure if that's possible. Any thoughts? Basically the idea is to hash the test/train set to 1D (I use the z-value hash based on [1]). I still have not implemented the ideas in [1] in full. The full solution is quite complex. They do a bunch of load balancing that I'm still learning, and not quite sure of the payoff. One option could be that I clean up what I have now and optimize since it's already performing well, and we open a new issue for to do all the steps in [1]. There are still many things to clean up, but any cleaning/edge cases will not add in computational complexity with respect to the number of test points. e.g. I now convert the coordinates to integers and ignore the decimal part and there are now lots of collisions in the z-value hash, normalizing the data and adding a fixed max number of bits to compute the z-value is needed, but will definitely not increase the complexity with respect to adding more test/training points (this is described towards the end of [3]) Any thoughts? was (Author: danielblazevski): [~chiwanpark] [~till.rohrmann] I have a Flink version -- still a bit preliminary -- of the approximate knn up and running. The exact knn using a quadtree performs quite bad in moderate-to-high spatial dimension (e.g 20,000 test and training points in 6D, the quadtree is worse, but no worries I took care of this and the exact decides when to use quadtree or not). https://github.com/danielblazevski/flink/blob/FLINK-1934/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/nn/zknn.scala A preliminary test shows good scaling with the number when the test + training points are increased. 8,000 points in 6D (i.e. 8,000 test points and 8,000 training points): Elapsed time approx = : 2s Elapsed time exact = : 27s 64,000 in 6D: Elapsed time approx = : 6s (didn't run the exact version, we know it's O(N^2)) I will have to clean things up, add edge cases, etc which may slow down the run-time a bit, but will definitely not increase the complexity of the algorithm with respect to the number of test/training points. This still use a cross product, which I was hoping to avoid, but not sure if that's possible. Any thoughts? Basically the idea is to hash the test/train set to 1D (I use the z-value hash based on [1]). I still have not implemented the ideas in [1] in full. The full solution is quite complex. They do a bunch of load balancing that I'm still learning, and not quite sure of the payoff. One option could be that I clean up what I have now and optimize since it's already performing well, and we open a new issue for to do all the steps in [1]. There are still many things to clean up, but any cleaning/edge cases will not add in computational complexity with respect to the number of test points. e.g. I now convert the coordinates to integers and ignore the decimal part and there are now lots of collisions in the z-value hash, normalizing the data and adding a fixed max number of bits to compute the z-value (this is described towards the end of [3]) Any thoughts? > Add approximative k-nearest-neighbours (kNN) algorithm to machine learning > library > -- > > Key: FLINK-1934 > URL: https://issues.apache.org/jira/browse/FLINK-1934 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann >Assignee: Daniel
[jira] [Comment Edited] (FLINK-1934) Add approximative k-nearest-neighbours (kNN) algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227632#comment-15227632 ] Daniel Blazevski edited comment on FLINK-1934 at 4/6/16 3:05 AM: - [~chiwanpark] [~till.rohrmann] I have a Flink version -- still a bit preliminary -- of the approximate knn up and running. The exact knn using a quadtree performs quite bad in moderate-to-high spatial dimension (e.g 20,000 test and training points in 6D, the quadtree is worse, but no worries I took care of this and the exact decides when to use quadtree or not). https://github.com/danielblazevski/flink/blob/FLINK-1934/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/nn/zknn.scala A preliminary test shows good scaling with the number when the test + training points are increased. 8,000 points in 6D (i.e. 8,000 test points and 8,000 training points): Elapsed time approx = : 2s Elapsed time exact = : 27s 64,000 in 6D: Elapsed time approx = : 6s (didn't run the exact version, we know it's O(N^2)) I will have to clean things up, add edge cases, etc which may slow down the run-time a bit, but will definitely not increase the complexity of the algorithm with respect to the number of test/training points. This still use a cross product, which I was hoping to avoid, but not sure if that's possible. Any thoughts? Basically the idea is to hash the test/train set to 1D (I use the z-value hash based on [1]). I still have not implemented the ideas in [1] in full. The full solution is quite complex. They do a bunch of load balancing that I'm still learning, and not quite sure of the payoff. One option could be that I clean up what I have now and optimize since it's already performing well, and we open a new issue for to do all the steps in [1]. There are still many things to clean up, but any cleaning/edge cases will not add in computational complexity with respect to the number of test points. e.g. I now convert the coordinates to integers and ignore the decimal part and there are now lots of collisions in the z-value hash, normalizing the data and adding a fixed max number of bits to compute the z-value (this is described towards the end of [3]) Any thoughts? was (Author: danielblazevski): [~chiwanpark] [~till.rohrmann] I have a Flink version -- still a bit preliminary -- of the approximate knn up and running. The exact knn using a quadtree performs quite bad in moderate-to-high spatial dimension (e.g 20,000 test and training points in 6D, the quadtree is worse, but no worries I took care of this and the exact decides when to use quadtree or not). https://github.com/danielblazevski/flink/blob/FLINK-1934/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/nn/zknn.scala A preliminary test shows good scaling with the number when the test + training points are increased. 8,000 points (i.e. 8,000 test points and 8,000 training points): Elapsed time approx = : 2s Elapsed time exact = : 27s 64,000: Elapsed time approx = : 6s (didn't run the exact version, we know it's O(N^2)) I will have to clean things up, add edge cases, etc which may slow down the run-time a bit, but will definitely not increase the complexity of the algorithm with respect to the number of test/training points. This still use a cross product, which I was hoping to avoid, but not sure if that's possible. Any thoughts? Basically the idea is to hash the test/train set to 1D (I use the z-value hash based on [1]). I still have not implemented the ideas in [1] in full. The full solution is quite complex. They do a bunch of load balancing that I'm still learning, and not quite sure of the payoff. One option could be that I clean up what I have now and optimize since it's already performing well, and we open a new issue for to do all the steps in [1]. There are still many things to clean up, but any cleaning/edge cases will not add in computational complexity with respect to the number of test points. e.g. I now convert the coordinates to integers and ignore the decimal part and there are now lots of collisions in the z-value hash, normalizing the data and adding a fixed max number of bits to compute the z-value (this is described towards the end of [3]) Any thoughts? > Add approximative k-nearest-neighbours (kNN) algorithm to machine learning > library > -- > > Key: FLINK-1934 > URL: https://issues.apache.org/jira/browse/FLINK-1934 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann >Assignee: Daniel Blazevski > Labels: ML > > kNN is still a widely used algorithm for classification and regression. >
[jira] [Commented] (FLINK-1934) Add approximative k-nearest-neighbours (kNN) algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227632#comment-15227632 ] Daniel Blazevski commented on FLINK-1934: - [~chiwanpark] [~till.rohrmann] I have a Flink version -- still a bit preliminary -- of the approximate knn up and running. The exact knn using a quadtree performs quite bad in moderate-to-high spatial dimension (e.g 20,000 test and training points in 6D, the quadtree is worse, but no worries I took care of this and the exact decides when to use quadtree or not). https://github.com/danielblazevski/flink/blob/FLINK-1934/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/nn/zknn.scala A preliminary test shows good scaling with the number when the test + training points are increased. 8,000 points (i.e. 8,000 test points and 8,000 training points): Elapsed time approx = : 2s Elapsed time exact = : 27s 64,000: Elapsed time approx = : 6s (didn't run the exact version, we know it's O(N^2)) I will have to clean things up, add edge cases, etc which may slow down the run-time a bit, but will definitely not increase the complexity of the algorithm with respect to the number of test/training points. This still use a cross product, which I was hoping to avoid, but not sure if that's possible. Any thoughts? Basically the idea is to hash the test/train set to 1D (I use the z-value hash based on [1]). I still have not implemented the ideas in [1] in full. The full solution is quite complex. They do a bunch of load balancing that I'm still learning, and not quite sure of the payoff. One option could be that I clean up what I have now and optimize since it's already performing well, and we open a new issue for to do all the steps in [1]. There are still many things to clean up, but any cleaning/edge cases will not add in computational complexity with respect to the number of test points. e.g. I now convert the coordinates to integers and ignore the decimal part and there are now lots of collisions in the z-value hash, normalizing the data and adding a fixed max number of bits to compute the z-value (this is described towards the end of [3]) Any thoughts? > Add approximative k-nearest-neighbours (kNN) algorithm to machine learning > library > -- > > Key: FLINK-1934 > URL: https://issues.apache.org/jira/browse/FLINK-1934 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann >Assignee: Daniel Blazevski > Labels: ML > > kNN is still a widely used algorithm for classification and regression. > However, due to the computational costs of an exact implementation, it does > not scale well to large amounts of data. Therefore, it is worthwhile to also > add an approximative kNN implementation as proposed in [1,2]. Reference [3] > is cited a few times in [1], and gives necessary background on the z-value > approach. > Resources: > [1] https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf > [2] http://www.computer.org/csdl/proceedings/wacv/2007/2794/00/27940028.pdf > [3] http://cs.sjtu.edu.cn/~yaobin/papers/icde10_knn.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat
[ https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227573#comment-15227573 ] Tian, Li commented on FLINK-3655: - Thanks, I will do the path list first and use "readFile(FileInputFormat inputFormat, String.. filePaths)". > Allow comma-separated or multiple directories to be specified for > FileInputFormat > - > > Key: FLINK-3655 > URL: https://issues.apache.org/jira/browse/FLINK-3655 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Gna Phetsarath >Priority: Minor > Labels: starter > > Allow comma-separated or multiple directories to be specified for > FileInputFormat so that a DataSource will process the directories > sequentially. > > env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*") > in Scala >env.readFile(paths: Seq[String]) > or > env.readFile(path: String, otherPaths: String*) > Wildcard support would be a bonus. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat
[ https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227545#comment-15227545 ] Tian, Li commented on FLINK-3655: - Will support wildcards > Allow comma-separated or multiple directories to be specified for > FileInputFormat > - > > Key: FLINK-3655 > URL: https://issues.apache.org/jira/browse/FLINK-3655 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Gna Phetsarath >Priority: Minor > Labels: starter > > Allow comma-separated or multiple directories to be specified for > FileInputFormat so that a DataSource will process the directories > sequentially. > > env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*") > in Scala >env.readFile(paths: Seq[String]) > or > env.readFile(path: String, otherPaths: String*) > Wildcard support would be a bonus. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3664) Create a method to easily Summarize a DataSet
[ https://issues.apache.org/jira/browse/FLINK-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227311#comment-15227311 ] Todd Lisonbee commented on FLINK-3664: -- I didn't have Travis CI setup with my github account so I added another commit so it would kick off a build. > Create a method to easily Summarize a DataSet > - > > Key: FLINK-3664 > URL: https://issues.apache.org/jira/browse/FLINK-3664 > Project: Flink > Issue Type: Improvement >Reporter: Todd Lisonbee > Attachments: DataSet-Summary-Design-March2016-v1.txt > > > Here is an example: > {code} > /** > * Summarize a DataSet of Tuples by collecting single pass statistics for all > columns > */ > public Tuple summarize() > Dataset> input = // [...] > Tuple3 summary > = input.summarize() > summary.getField(0).stddev() > summary.getField(1).maxStringLength() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3664) Create a method to easily Summarize a DataSet
[ https://issues.apache.org/jira/browse/FLINK-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227214#comment-15227214 ] Todd Lisonbee commented on FLINK-3664: -- Pull request is ready. Please let me know if you'd like to see any other changes before merging. Thanks. > Create a method to easily Summarize a DataSet > - > > Key: FLINK-3664 > URL: https://issues.apache.org/jira/browse/FLINK-3664 > Project: Flink > Issue Type: Improvement >Reporter: Todd Lisonbee > Attachments: DataSet-Summary-Design-March2016-v1.txt > > > Here is an example: > {code} > /** > * Summarize a DataSet of Tuples by collecting single pass statistics for all > columns > */ > public Tuple summarize() > Dataset> input = // [...] > Tuple3 summary > = input.summarize() > summary.getField(0).stddev() > summary.getField(1).maxStringLength() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3664) Create a method to easily Summarize a DataSet
[ https://issues.apache.org/jira/browse/FLINK-3664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15227168#comment-15227168 ] ASF GitHub Bot commented on FLINK-3664: --- GitHub user tlisonbee opened a pull request: https://github.com/apache/flink/pull/1855 [FLINK-3664] Create method to easily summarize a DataSet of Tuples Adding summarize() method in DataSetUtils that will supply a number of single pass statistics for DataSets of Tuples. Summary statistics depend on the type being summarized: - Numeric types (Integer, IntValue, Float, Double, etc): min, max, mean, variance, standard deviation, NaN count, Infinity count, totalCount, etc. - String, StringValue: minLength, maxLength, meanLength, emptyCount, totalCount - Boolean, BooleanValue: trueCount, falseCount, totalCount. Example usage: `Dataset> input = // [...]` `Tuple3 summary = DataSetUtils.summarize(input)` `summary.f0.getStandardDeviation()` `summary.f1.getMaxLength()` Uses the Kahan summation algorithm to avoid numeric instability. The algorithm is described in: "Scalable and Numerically Stable Descriptive Statistics in SystemML", Tian et al, International Conference on Data Engineering 2012. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tlisonbee/flink FLINK-3664 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1855.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1855 commit 65f54df532829994a8be240a27b9138d01a186b5 Author: Todd Lisonbee Date: 2016-04-05T05:51:12Z [FLINK-3664] Create DataSetUtils method to easily summarize a DataSet of Tuples > Create a method to easily Summarize a DataSet > - > > Key: FLINK-3664 > URL: https://issues.apache.org/jira/browse/FLINK-3664 > Project: Flink > Issue Type: Improvement >Reporter: Todd Lisonbee > Attachments: DataSet-Summary-Design-March2016-v1.txt > > > Here is an example: > {code} > /** > * Summarize a DataSet of Tuples by collecting single pass statistics for all > columns > */ > public Tuple summarize() > Dataset > input = // [...] > Tuple3 summary > = input.summarize() > summary.getField(0).stddev() > summary.getField(1).maxStringLength() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3664] Create method to easily summarize...
GitHub user tlisonbee opened a pull request: https://github.com/apache/flink/pull/1855 [FLINK-3664] Create method to easily summarize a DataSet of Tuples Adding summarize() method in DataSetUtils that will supply a number of single pass statistics for DataSets of Tuples. Summary statistics depend on the type being summarized: - Numeric types (Integer, IntValue, Float, Double, etc): min, max, mean, variance, standard deviation, NaN count, Infinity count, totalCount, etc. - String, StringValue: minLength, maxLength, meanLength, emptyCount, totalCount - Boolean, BooleanValue: trueCount, falseCount, totalCount. Example usage: `Dataset> input = // [...]` `Tuple3 summary = DataSetUtils.summarize(input)` `summary.f0.getStandardDeviation()` `summary.f1.getMaxLength()` Uses the Kahan summation algorithm to avoid numeric instability. The algorithm is described in: "Scalable and Numerically Stable Descriptive Statistics in SystemML", Tian et al, International Conference on Data Engineering 2012. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tlisonbee/flink FLINK-3664 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1855.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1855 commit 65f54df532829994a8be240a27b9138d01a186b5 Author: Todd Lisonbee Date: 2016-04-05T05:51:12Z [FLINK-3664] Create DataSetUtils method to easily summarize a DataSet of Tuples --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3688) ClassCastException in StreamRecordSerializer when WindowOperator.trigger() is called and TimeCharacteristic = ProcessingTime
[ https://issues.apache.org/jira/browse/FLINK-3688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226941#comment-15226941 ] Konstantin Knauf commented on FLINK-3688: - Could open the PR (https://travis-ci.org/knaufk/flink/jobs/120960435), but the build fails immediatly with java.io.FileNotFoundException: build-target/lib/flink-dist-*.jar (No such file or directory) at java.util.zip.ZipFile.open(Native Method) at java.util.zip.ZipFile.(ZipFile.java:220) at java.util.zip.ZipFile.(ZipFile.java:150) at java.util.zip.ZipFile.(ZipFile.java:121) at sun.tools.jar.Main.list(Main.java:1060) at sun.tools.jar.Main.run(Main.java:291) at sun.tools.jar.Main.main(Main.java:1233) find: `./flink-yarn-tests/target/flink-yarn-tests*': No such file or directory Not sure what to do about it? master fails with the same error on my Travis... > ClassCastException in StreamRecordSerializer when WindowOperator.trigger() is > called and TimeCharacteristic = ProcessingTime > > > Key: FLINK-3688 > URL: https://issues.apache.org/jira/browse/FLINK-3688 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Critical > > Hi, > when using {{TimeCharacteristics.ProcessingTime}} a ClassCastException is > thrown in {{StreamRecordSerializer}} when > {{WindowOperator.processWatermark()}} is called from > {{WindowOperator.trigger()}}, i.e. whenever a ProcessingTimeTimer is > triggered. > The problem seems to be that {{processWatermark()}} is also called in > {{trigger()}}, when time characteristic is ProcessingTime, but in > {{RecordWriterOutput}} {{enableWatermarkMultiplexing}} is {{false}} and the > {{TypeSerializer}} is a {{StreamRecordSerializer}}, which ultimately leads to > the ClassCastException. Do you agree? > If this is indeed a bug, there several possible solutions. > # Only calling {{processWatermark()}} in {{trigger()}}, when > TimeCharacteristic is EventTime > # Not calling {{processWatermark()}} in {{trigger()}} at all, instead wait > for the next watermark to trigger the EventTimeTimers with a timestamp behind > the current watermark. This is, of course, a trade off. > # Using {{MultiplexingStreamRecordSerializer}} all the time, but I have no > idea what the side effect of this change would be. I assume there is a reason > for existence of the StreamRecordSerializer ;) > StackTrace: > {quote} > TimerException\{java.lang.RuntimeException: > org.apache.flink.streaming.api.watermark.Watermark cannot be cast to > org.apache.flink.streaming.runtime.streamrecord.StreamRecord\} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:716) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > Caused by: java.lang.RuntimeException: > org.apache.flink.streaming.api.watermark.Watermark cannot be cast to > org.apache.flink.streaming.runtime.streamrecord.StreamRecord > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:370) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:293) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:323) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:710) > ... 7 more > Caused by: java.lang.ClassCastException: > org.apache.flink.streaming.api.watermark.Watermark cannot be cast to > org.apache.flink.streaming.runtime.streamrecord.StreamRecord > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at >
[jira] [Commented] (FLINK-3696) Some Union tests fail for TableConfigMode.EFFICIENT
[ https://issues.apache.org/jira/browse/FLINK-3696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226745#comment-15226745 ] Vasia Kalavri commented on FLINK-3696: -- The problem here is that the TypeConverter creates a TupleTypeInfo for efficient mode when there is no expected physical type. Thus, if the other union input is a scala tuple then the union operator complains that the input types are different. Any suggestions how to nicely solve this [~twalthr]? Thanks! > Some Union tests fail for TableConfigMode.EFFICIENT > --- > > Key: FLINK-3696 > URL: https://issues.apache.org/jira/browse/FLINK-3696 > Project: Flink > Issue Type: Bug > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri > > e.g. testUnionWithFilter gives the following exception: > {code} > org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of > different types. Input1=scala.Tuple3(_1: Integer, _2: Long, _3: String), > input2=Java Tuple3> at > org.apache.flink.api.java.operators.UnionOperator.(UnionOperator.java:47) > at org.apache.flink.api.java.DataSet.union(DataSet.java:1208) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetUnion.translateToPlan(DataSetUnion.scala:81) > at > org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:95) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:91) > at > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:51) > at > org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:43) > at > org.apache.flink.api.scala.table.test.UnionITCase.testUnionWithFilter(UnionITCase.scala:77) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at org.junit.runners.Suite.runChild(Suite.java:127) > at org.junit.runners.Suite.runChild(Suite.java:26) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at org.junit.runners.Suite.runChild(Suite.java:127) > at org.junit.runners.Suite.runChild(Suite.java:26) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at org.junit.runner.JUnitCore.run(JUnitCore.java:160) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78) > at >
[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat
[ https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226676#comment-15226676 ] Gna Phetsarath commented on FLINK-3655: --- Will do be doing wildcards as well, or should be put that as another ticket? > Allow comma-separated or multiple directories to be specified for > FileInputFormat > - > > Key: FLINK-3655 > URL: https://issues.apache.org/jira/browse/FLINK-3655 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Gna Phetsarath >Priority: Minor > Labels: starter > > Allow comma-separated or multiple directories to be specified for > FileInputFormat so that a DataSource will process the directories > sequentially. > > env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*") > in Scala >env.readFile(paths: Seq[String]) > or > env.readFile(path: String, otherPaths: String*) > Wildcard support would be a bonus. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes
[ https://issues.apache.org/jira/browse/FLINK-3697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-3697: - Assignee: Robert Metzger > keyBy() with nested POJO computes invalid field position indexes > > > Key: FLINK-3697 > URL: https://issues.apache.org/jira/browse/FLINK-3697 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.0 > Environment: MacOS X 10.10 >Reporter: Ron Crocker >Assignee: Robert Metzger >Priority: Critical > Labels: pojo > > Using named keys in keyBy() for nested POJO types results in failure. The > iindexes for named key fields are used inconsistently with nested POJO types. > In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position > after (apparently) flattening the structure but is referenced in the > unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}. > In the example below, getFlatFields() returns positions 0, 1, and 14. These > positions appear correct in the flattened structure of the Data class. > However, in {{KeySelectorgetSelectorForKeys(Keys keys, > TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to > {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results > {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the > length of the directly named fields of the object vs the length of flattened > version of that type. > Concrete Example: > Consider this graph: > {code} > DataStream dataStream = see.addSource(new > FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), > kafkaConsumerProperties)); > dataStream > .flatMap(new DataMapper()) > .keyBy("aaa", "abc", "wxyz") > {code} > {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes > this NativeDataFormat object and extracts individual Data objects: {code} > public class Data { > public int aaa; > public int abc; > public long wxyz; > public int t1; > public int t2; > public Policy policy; > public Stats stats; > public Data() {} > {code} > A {{Policy}} object is an instance of this class: > {code} > public class Policy { > public short a; > public short b; > public boolean c; > public boolean d; > public Policy() {} > } > {code} > A {{Stats}} object is an instance of this class: > {code} > public class Stats { > public long count; > public float a; > public float b; > public float c; > public float d; > public float e; > public Stats() {} > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226613#comment-15226613 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58572834 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml --- @@ -0,0 +1,43 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cluster_name: 'Test Cluster' +commitlog_sync: 'periodic' +commitlog_sync_period_in_ms: 1 +commitlog_segment_size_in_mb: 16 +partitioner: 'org.apache.cassandra.dht.RandomPartitioner' +endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch' +commitlog_directory: $PATH\commit' +data_file_directories: +- $PATH\data' +saved_caches_directory: $PATH\cache' --- End diff -- I wonder why the path works on unix platforms. Only windows is using `\` as a separator. > Add a connector for streaming data into Cassandra > - > > Key: FLINK-3311 > URL: https://issues.apache.org/jira/browse/FLINK-3311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Andrea Sella > > We had users in the past asking for a Flink+Cassandra integration. > It seems that there is a well-developed java client for connecting into > Cassandra: https://github.com/datastax/java-driver (ASL 2.0) > There are also tutorials out there on how to start a local cassandra instance > (for the tests): > http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html > For the data types, I think we should support TupleX types, and map standard > java types to the respective cassandra types. > In addition, it seems that there is a object mapper from datastax to store > POJOs in Cassandra (there are annotations for defining the primary key and > types) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58572834 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml --- @@ -0,0 +1,43 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cluster_name: 'Test Cluster' +commitlog_sync: 'periodic' +commitlog_sync_period_in_ms: 1 +commitlog_segment_size_in_mb: 16 +partitioner: 'org.apache.cassandra.dht.RandomPartitioner' +endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch' +commitlog_directory: $PATH\commit' +data_file_directories: +- $PATH\data' +saved_caches_directory: $PATH\cache' --- End diff -- I wonder why the path works on unix platforms. Only windows is using `\` as a separator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3492) Allow users to define a min pause between checkpoints
[ https://issues.apache.org/jira/browse/FLINK-3492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226604#comment-15226604 ] ASF GitHub Bot commented on FLINK-3492: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1794#issuecomment-205887495 reworked test to be more reliable. > Allow users to define a min pause between checkpoints > - > > Key: FLINK-3492 > URL: https://issues.apache.org/jira/browse/FLINK-3492 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > FLINK-3051 introduced already a filed in the {{CheckpointConfig}} to specify > a min pause between checkpoints. > In high-load situations (big state), jobs might spend their entire time > creating snapshots, not processing data. With a min pause between > checkpoints, users can guarantee that there is a certain time-span the system > can use for doing some actual data processing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3492] Configurable interval between Che...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1794#issuecomment-205887495 reworked test to be more reliable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3637] Refactor rolling sink writer
Github user dalegaard commented on the pull request: https://github.com/apache/flink/pull/1826#issuecomment-205886932 @aljoscha okay, I'll fix that and rebase onto master. Naming things is one of the only two hard problems in computer science after all :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3637) Change RollingSink Writer interface to allow wider range of outputs
[ https://issues.apache.org/jira/browse/FLINK-3637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226598#comment-15226598 ] ASF GitHub Bot commented on FLINK-3637: --- Github user dalegaard commented on the pull request: https://github.com/apache/flink/pull/1826#issuecomment-205886932 @aljoscha okay, I'll fix that and rebase onto master. Naming things is one of the only two hard problems in computer science after all :) > Change RollingSink Writer interface to allow wider range of outputs > --- > > Key: FLINK-3637 > URL: https://issues.apache.org/jira/browse/FLINK-3637 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Lasse Dalegaard >Assignee: Lasse Dalegaard > Labels: features > > Currently the RollingSink Writer interface only works with > FSDataOutputStreams, which precludes it from being used with some existing > libraries like Apache ORC and Parquet. > To fix this, a new Writer interface can be created, which receives FileSystem > and Path objects, instead of FSDataOutputStream. > To ensure exactly-once semantics, the Writer interface must also be extended > so that the current write-offset can be retrieved at checkpointing time. For > formats like ORC this requires a footer to be written, before the offset is > returned. Checkpointing already calls flush on the writer, but either flush > needs to return the current length of the output file, or alternatively a new > method has to be added for this. > The existing Writer interface can be recreated with a wrapper on top of the > new Writer interface. The existing code that manages the FSDataOutputStream > can then be moved into this new wrapper. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2143] Added ReduceFunctionWithInverse
Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/856#issuecomment-205884988 I think that this unfortunately can't be adapted to the new windowing system, so it can be closed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2143) Add an overload to reduceWindow which takes the inverse of the reduceFunction as a second parameter
[ https://issues.apache.org/jira/browse/FLINK-2143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226583#comment-15226583 ] ASF GitHub Bot commented on FLINK-2143: --- Github user ggevay closed the pull request at: https://github.com/apache/flink/pull/856 > Add an overload to reduceWindow which takes the inverse of the reduceFunction > as a second parameter > --- > > Key: FLINK-2143 > URL: https://issues.apache.org/jira/browse/FLINK-2143 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Gabor Gevay >Assignee: Gabor Gevay > > If the inverse of the reduceFunction is also available (for example > subtraction when summing numbers), then a PreReducer can maintain the > aggregate in O(1) memory and O(1) time for evict, store, and emitWindow. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226596#comment-15226596 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58571353 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java --- @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra.example; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.cassandra.CassandraSink; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import java.util.UUID; + +public class CassandraTupleWriteAheadSinkExample { + public static void main(String[] args) throws Exception { + + class MySource implements SourceFunction>, Checkpointed { --- End diff -- I would move the class out of the main method > Add a connector for streaming data into Cassandra > - > > Key: FLINK-3311 > URL: https://issues.apache.org/jira/browse/FLINK-3311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Andrea Sella > > We had users in the past asking for a Flink+Cassandra integration. > It seems that there is a well-developed java client for connecting into > Cassandra: https://github.com/datastax/java-driver (ASL 2.0) > There are also tutorials out there on how to start a local cassandra instance > (for the tests): > http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html > For the data types, I think we should support TupleX types, and map standard > java types to the respective cassandra types. > In addition, it seems that there is a object mapper from datastax to store > POJOs in Cassandra (there are annotations for defining the primary key and > types) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58571353 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java --- @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra.example; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.cassandra.CassandraSink; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import java.util.UUID; + +public class CassandraTupleWriteAheadSinkExample { + public static void main(String[] args) throws Exception { + + class MySource implements SourceFunction>, Checkpointed { --- End diff -- I would move the class out of the main method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3637] Refactor rolling sink writer
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1826#issuecomment-205885519 The changes look good. One thing I would like to have changed is to rename `SimpleWriterBase` to `StreamWriterBase` or `StreamWriter` based to reflect the fact that it is used for Stream based writers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2143] Added ReduceFunctionWithInverse
Github user ggevay closed the pull request at: https://github.com/apache/flink/pull/856 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2143) Add an overload to reduceWindow which takes the inverse of the reduceFunction as a second parameter
[ https://issues.apache.org/jira/browse/FLINK-2143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226582#comment-15226582 ] ASF GitHub Bot commented on FLINK-2143: --- Github user ggevay commented on the pull request: https://github.com/apache/flink/pull/856#issuecomment-205884988 I think that this unfortunately can't be adapted to the new windowing system, so it can be closed. > Add an overload to reduceWindow which takes the inverse of the reduceFunction > as a second parameter > --- > > Key: FLINK-2143 > URL: https://issues.apache.org/jira/browse/FLINK-2143 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Gabor Gevay >Assignee: Gabor Gevay > > If the inverse of the reduceFunction is also available (for example > subtraction when summing numbers), then a PreReducer can maintain the > aggregate in O(1) memory and O(1) time for evict, store, and emitWindow. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2143) Add an overload to reduceWindow which takes the inverse of the reduceFunction as a second parameter
[ https://issues.apache.org/jira/browse/FLINK-2143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226575#comment-15226575 ] ASF GitHub Bot commented on FLINK-2143: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/856#issuecomment-205883786 @ggevay The windowing system changed a while back. Is this still valid or can it be closed? > Add an overload to reduceWindow which takes the inverse of the reduceFunction > as a second parameter > --- > > Key: FLINK-2143 > URL: https://issues.apache.org/jira/browse/FLINK-2143 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Gabor Gevay >Assignee: Gabor Gevay > > If the inverse of the reduceFunction is also available (for example > subtraction when summing numbers), then a PreReducer can maintain the > aggregate in O(1) memory and O(1) time for evict, store, and emitWindow. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2143] Added ReduceFunctionWithInverse
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/856#issuecomment-205883786 @ggevay The windowing system changed a while back. Is this still valid or can it be closed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3637) Change RollingSink Writer interface to allow wider range of outputs
[ https://issues.apache.org/jira/browse/FLINK-3637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226571#comment-15226571 ] ASF GitHub Bot commented on FLINK-3637: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1826#issuecomment-205883214 hi @dalegaard this must have slipped my mind. I'll review it tomorrow and merge if possible. > Change RollingSink Writer interface to allow wider range of outputs > --- > > Key: FLINK-3637 > URL: https://issues.apache.org/jira/browse/FLINK-3637 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Lasse Dalegaard >Assignee: Lasse Dalegaard > Labels: features > > Currently the RollingSink Writer interface only works with > FSDataOutputStreams, which precludes it from being used with some existing > libraries like Apache ORC and Parquet. > To fix this, a new Writer interface can be created, which receives FileSystem > and Path objects, instead of FSDataOutputStream. > To ensure exactly-once semantics, the Writer interface must also be extended > so that the current write-offset can be retrieved at checkpointing time. For > formats like ORC this requires a footer to be written, before the offset is > returned. Checkpointing already calls flush on the writer, but either flush > needs to return the current length of the output file, or alternatively a new > method has to be added for this. > The existing Writer interface can be recreated with a wrapper on top of the > new Writer interface. The existing code that manages the FSDataOutputStream > can then be moved into this new wrapper. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3637] Refactor rolling sink writer
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1826#issuecomment-205883214 hi @dalegaard this must have slipped my mind. I'll review it tomorrow and merge if possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2166) Add fromCsvFile() to TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-2166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226567#comment-15226567 ] ASF GitHub Bot commented on FLINK-2166: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/939#issuecomment-205882687 @twalthr @fhueske @vasia Is this still valid with the recent changes in the Table API? > Add fromCsvFile() to TableEnvironment > - > > Key: FLINK-2166 > URL: https://issues.apache.org/jira/browse/FLINK-2166 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 0.9 >Reporter: Fabian Hueske >Priority: Minor > Labels: starter > > Add a {{fromCsvFile()}} method to the {{TableEnvironment}} to read a > {{Table}} from a CSV file. > The implementation should reuse Flink's CsvInputFormat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-2166. Add fromCsvFile() method to TableE...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/939#issuecomment-205882687 @twalthr @fhueske @vasia Is this still valid with the recent changes in the Table API? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2828) Add interfaces for Table API input formats
[ https://issues.apache.org/jira/browse/FLINK-2828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226564#comment-15226564 ] ASF GitHub Bot commented on FLINK-2828: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1237#issuecomment-205881346 @twalthr is this still valid with the changes in the Table API or can it be closed? > Add interfaces for Table API input formats > -- > > Key: FLINK-2828 > URL: https://issues.apache.org/jira/browse/FLINK-2828 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Timo Walther > > In order to support input formats for the Table API, interfaces are > necessary. I propose two types of TableSources: > - AdaptiveTableSources can adapt their output to the requirements of the > plan. Although the output schema stays the same, the TableSource can react on > field resolution and/or predicates internally and can return adapted > DataSet/DataStream versions in the "translate" step. > - StaticTableSources are an easy way to provide the Table API with additional > input formats without much implementation effort (e.g. for fromCsvFile()) > TableSources need to be deeply integrated into the Table API. > The TableEnvironment requires a newly introduced AbstractExecutionEnvironment > (common super class of all ExecutionEnvironments for DataSets and > DataStreams). > Here's what a TableSource can see from more complicated queries: > {code} > getTableJava(tableSource1) > .filter("a===5 || a===6") > .select("a as a4, b as b4, c as c4") > .filter("b4===7") > .join(getTableJava(tableSource2)) > .where("a===a4 && c==='Test' && c4==='Test2'") > // Result predicates for tableSource1: > // List("a===5 || a===6", "b===7", "c==='Test2'") > // Result predicates for tableSource2: > // List("c==='Test'") > // Result resolved fields for tableSource1 (true = filtering, > false=selection): > // Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), > ("c", true)) > // Result resolved fields for tableSource2 (true = filtering, > false=selection): > // Set(("a", true), ("c", true)) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226563#comment-15226563 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58568963 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +import java.util.UUID; + +/** + * This class wraps different Cassandra sink implementations to provide a common interface for all of them. + * + * @param input type + */ +public class CassandraSink { + private static final String jobID = UUID.randomUUID().toString().replace("-", "_"); --- End diff -- The problem is that there will be two `jobID`s which can lead to confusions. > Add a connector for streaming data into Cassandra > - > > Key: FLINK-3311 > URL: https://issues.apache.org/jira/browse/FLINK-3311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Andrea Sella > > We had users in the past asking for a Flink+Cassandra integration. > It seems that there is a well-developed java client for connecting into > Cassandra: https://github.com/datastax/java-driver (ASL 2.0) > There are also tutorials out there on how to start a local cassandra instance > (for the tests): > http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html > For the data types, I think we should support TupleX types, and map standard > java types to the respective cassandra types. > In addition, it seems that there is a object mapper from datastax to store > POJOs in Cassandra (there are annotations for defining the primary key and > types) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2828] [table] Add interfaces for Table ...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1237#issuecomment-205881346 @twalthr is this still valid with the changes in the Table API or can it be closed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58568963 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +import java.util.UUID; + +/** + * This class wraps different Cassandra sink implementations to provide a common interface for all of them. + * + * @param input type + */ +public class CassandraSink { + private static final String jobID = UUID.randomUUID().toString().replace("-", "_"); --- End diff -- The problem is that there will be two `jobID`s which can lead to confusions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3283) Failed Kafka 0.9 test on duplicate message
[ https://issues.apache.org/jira/browse/FLINK-3283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226559#comment-15226559 ] Robert Metzger commented on FLINK-3283: --- This commit improves the Kafka tests stability: http://git-wip-us.apache.org/repos/asf/flink/commit/02a3dfde > Failed Kafka 0.9 test on duplicate message > -- > > Key: FLINK-3283 > URL: https://issues.apache.org/jira/browse/FLINK-3283 > Project: Flink > Issue Type: Test >Reporter: Ufuk Celebi >Assignee: Robert Metzger > > On a branch with unrelated changes > {{Kafka09ITCase.testMultipleSourcesOnePartition:82->KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest}} > failed. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/104582020/log.txt > {code} > Caused by: java.lang.Exception: Received a duplicate: 1712 > at > org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink.invoke(ValidatingExactlyOnceSink.java:53) > at > org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink.invoke(ValidatingExactlyOnceSink.java:30) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:37) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:232) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3283) Failed Kafka 0.9 test on duplicate message
[ https://issues.apache.org/jira/browse/FLINK-3283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-3283: - Assignee: Robert Metzger > Failed Kafka 0.9 test on duplicate message > -- > > Key: FLINK-3283 > URL: https://issues.apache.org/jira/browse/FLINK-3283 > Project: Flink > Issue Type: Test >Reporter: Ufuk Celebi >Assignee: Robert Metzger > > On a branch with unrelated changes > {{Kafka09ITCase.testMultipleSourcesOnePartition:82->KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest}} > failed. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/104582020/log.txt > {code} > Caused by: java.lang.Exception: Received a duplicate: 1712 > at > org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink.invoke(ValidatingExactlyOnceSink.java:53) > at > org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink.invoke(ValidatingExactlyOnceSink.java:30) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:37) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:232) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2732) Add access to the TaskManagers' log file and out file in the web dashboard.
[ https://issues.apache.org/jira/browse/FLINK-2732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226554#comment-15226554 ] ASF GitHub Bot commented on FLINK-2732: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1790#issuecomment-205877524 Rebased. > Add access to the TaskManagers' log file and out file in the web dashboard. > --- > > Key: FLINK-2732 > URL: https://issues.apache.org/jira/browse/FLINK-2732 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Chesnay Schepler > Fix For: 1.0.0 > > > Add access to the TaskManagers' log file and out file in the web dashboard. > This needs addition on the server side, as the log files need to be > transferred to the JobManager via the blob server. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2732] Display TM logs in Dashboard
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1790#issuecomment-205877524 Rebased. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3697] Properly access type information ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1851#discussion_r58564424 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java --- @@ -182,11 +182,14 @@ public R set(R record, F fieldValue) { @SuppressWarnings("unchecked") CompositeType cType = (CompositeType) type; + if(field.contains(".")) { + throw new IllegalArgumentException("The Pojo field accessor currently doesn't support nested POJOs"); --- End diff -- Thank you for reviewing my pull request. I've opened FLINK-3702 for that. See also here: https://github.com/apache/flink/pull/1851/files#diff-e5e091a2c1e4bf850ef93e2010fe4c81R132 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1337) Create an Amazon EMR Bootstrap Action
[ https://issues.apache.org/jira/browse/FLINK-1337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226522#comment-15226522 ] Timur Fayruzov commented on FLINK-1337: --- It's an old issue, but if there is still interest I can contribute. I'm working out EMR cluster bootstrap for Flink right now. > Create an Amazon EMR Bootstrap Action > - > > Key: FLINK-1337 > URL: https://issues.apache.org/jira/browse/FLINK-1337 > Project: Flink > Issue Type: New Feature > Components: other >Reporter: Stephan Ewen >Priority: Minor > > EMR offers bootstrap actions that prepare the cluster by installing > additional components, etc.. > We can offer a Flink bootstrap action that downloads, unpacks, and configures > Flink. It may optionally install libraries that we like to use (such as > Python, BLAS/JBLAS, ...) > http://blogs.aws.amazon.com/bigdata/post/TxO6EHTHQALSIB/Getting-Started-with-Amazon-EMR-Bootstrap-Actions -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes
[ https://issues.apache.org/jira/browse/FLINK-3697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226513#comment-15226513 ] ASF GitHub Bot commented on FLINK-3697: --- Github user RonCrocker commented on the pull request: https://github.com/apache/flink/pull/1851#issuecomment-205868281 :+1: > keyBy() with nested POJO computes invalid field position indexes > > > Key: FLINK-3697 > URL: https://issues.apache.org/jira/browse/FLINK-3697 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.0 > Environment: MacOS X 10.10 >Reporter: Ron Crocker >Priority: Critical > Labels: pojo > > Using named keys in keyBy() for nested POJO types results in failure. The > iindexes for named key fields are used inconsistently with nested POJO types. > In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position > after (apparently) flattening the structure but is referenced in the > unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}. > In the example below, getFlatFields() returns positions 0, 1, and 14. These > positions appear correct in the flattened structure of the Data class. > However, in {{KeySelectorgetSelectorForKeys(Keys keys, > TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to > {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results > {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the > length of the directly named fields of the object vs the length of flattened > version of that type. > Concrete Example: > Consider this graph: > {code} > DataStream dataStream = see.addSource(new > FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), > kafkaConsumerProperties)); > dataStream > .flatMap(new DataMapper()) > .keyBy("aaa", "abc", "wxyz") > {code} > {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes > this NativeDataFormat object and extracts individual Data objects: {code} > public class Data { > public int aaa; > public int abc; > public long wxyz; > public int t1; > public int t2; > public Policy policy; > public Stats stats; > public Data() {} > {code} > A {{Policy}} object is an instance of this class: > {code} > public class Policy { > public short a; > public short b; > public boolean c; > public boolean d; > public Policy() {} > } > {code} > A {{Stats}} object is an instance of this class: > {code} > public class Stats { > public long count; > public float a; > public float b; > public float c; > public float d; > public float e; > public Stats() {} > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3697] Properly access type information ...
Github user RonCrocker commented on the pull request: https://github.com/apache/flink/pull/1851#issuecomment-205868281 :+1: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes
[ https://issues.apache.org/jira/browse/FLINK-3697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226511#comment-15226511 ] ASF GitHub Bot commented on FLINK-3697: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1851#discussion_r58564424 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java --- @@ -182,11 +182,14 @@ public R set(R record, F fieldValue) { @SuppressWarnings("unchecked") CompositeType cType = (CompositeType) type; + if(field.contains(".")) { + throw new IllegalArgumentException("The Pojo field accessor currently doesn't support nested POJOs"); --- End diff -- Thank you for reviewing my pull request. I've opened FLINK-3702 for that. See also here: https://github.com/apache/flink/pull/1851/files#diff-e5e091a2c1e4bf850ef93e2010fe4c81R132 > keyBy() with nested POJO computes invalid field position indexes > > > Key: FLINK-3697 > URL: https://issues.apache.org/jira/browse/FLINK-3697 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.0 > Environment: MacOS X 10.10 >Reporter: Ron Crocker >Priority: Critical > Labels: pojo > > Using named keys in keyBy() for nested POJO types results in failure. The > iindexes for named key fields are used inconsistently with nested POJO types. > In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position > after (apparently) flattening the structure but is referenced in the > unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}. > In the example below, getFlatFields() returns positions 0, 1, and 14. These > positions appear correct in the flattened structure of the Data class. > However, in {{KeySelectorgetSelectorForKeys(Keys keys, > TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to > {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results > {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the > length of the directly named fields of the object vs the length of flattened > version of that type. > Concrete Example: > Consider this graph: > {code} > DataStream dataStream = see.addSource(new > FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), > kafkaConsumerProperties)); > dataStream > .flatMap(new DataMapper()) > .keyBy("aaa", "abc", "wxyz") > {code} > {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes > this NativeDataFormat object and extracts individual Data objects: {code} > public class Data { > public int aaa; > public int abc; > public long wxyz; > public int t1; > public int t2; > public Policy policy; > public Stats stats; > public Data() {} > {code} > A {{Policy}} object is an instance of this class: > {code} > public class Policy { > public short a; > public short b; > public boolean c; > public boolean d; > public Policy() {} > } > {code} > A {{Stats}} object is an instance of this class: > {code} > public class Stats { > public long count; > public float a; > public float b; > public float c; > public float d; > public float e; > public Stats() {} > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes
[ https://issues.apache.org/jira/browse/FLINK-3697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226507#comment-15226507 ] ASF GitHub Bot commented on FLINK-3697: --- Github user RonCrocker commented on a diff in the pull request: https://github.com/apache/flink/pull/1851#discussion_r58564095 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java --- @@ -182,11 +182,14 @@ public R set(R record, F fieldValue) { @SuppressWarnings("unchecked") CompositeType cType = (CompositeType) type; + if(field.contains(".")) { + throw new IllegalArgumentException("The Pojo field accessor currently doesn't support nested POJOs"); --- End diff -- Note that the problem reported in [FLINK-3697](https://issues.apache.org/jira/browse/FLINK-3697) was not related to accessing a nested field.. Perhaps there should be a subsequent JIRA ticket to support nested POJO accessors. > keyBy() with nested POJO computes invalid field position indexes > > > Key: FLINK-3697 > URL: https://issues.apache.org/jira/browse/FLINK-3697 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.0 > Environment: MacOS X 10.10 >Reporter: Ron Crocker >Priority: Critical > Labels: pojo > > Using named keys in keyBy() for nested POJO types results in failure. The > iindexes for named key fields are used inconsistently with nested POJO types. > In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position > after (apparently) flattening the structure but is referenced in the > unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}. > In the example below, getFlatFields() returns positions 0, 1, and 14. These > positions appear correct in the flattened structure of the Data class. > However, in {{KeySelectorgetSelectorForKeys(Keys keys, > TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to > {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results > {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the > length of the directly named fields of the object vs the length of flattened > version of that type. > Concrete Example: > Consider this graph: > {code} > DataStream dataStream = see.addSource(new > FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), > kafkaConsumerProperties)); > dataStream > .flatMap(new DataMapper()) > .keyBy("aaa", "abc", "wxyz") > {code} > {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes > this NativeDataFormat object and extracts individual Data objects: {code} > public class Data { > public int aaa; > public int abc; > public long wxyz; > public int t1; > public int t2; > public Policy policy; > public Stats stats; > public Data() {} > {code} > A {{Policy}} object is an instance of this class: > {code} > public class Policy { > public short a; > public short b; > public boolean c; > public boolean d; > public Policy() {} > } > {code} > A {{Stats}} object is an instance of this class: > {code} > public class Stats { > public long count; > public float a; > public float b; > public float c; > public float d; > public float e; > public Stats() {} > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3697] Properly access type information ...
Github user RonCrocker commented on a diff in the pull request: https://github.com/apache/flink/pull/1851#discussion_r58564095 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/FieldAccessor.java --- @@ -182,11 +182,14 @@ public R set(R record, F fieldValue) { @SuppressWarnings("unchecked") CompositeType cType = (CompositeType) type; + if(field.contains(".")) { + throw new IllegalArgumentException("The Pojo field accessor currently doesn't support nested POJOs"); --- End diff -- Note that the problem reported in [FLINK-3697](https://issues.apache.org/jira/browse/FLINK-3697) was not related to accessing a nested field.. Perhaps there should be a subsequent JIRA ticket to support nested POJO accessors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes
[ https://issues.apache.org/jira/browse/FLINK-3697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226504#comment-15226504 ] Ron Crocker commented on FLINK-3697: [~rmetzger] This code is fine to use for your test case. The key aspect to include is a key field is lexicographically "greater than" a field that holds a nested POJO. This is what causes the index of the key field (in the flattened representation of the POJO) to be beyond the natural (unflattened) fields of the outer POJO. I would have provided a shorter example but I didn't have the time to do so. > keyBy() with nested POJO computes invalid field position indexes > > > Key: FLINK-3697 > URL: https://issues.apache.org/jira/browse/FLINK-3697 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.0 > Environment: MacOS X 10.10 >Reporter: Ron Crocker >Priority: Critical > Labels: pojo > > Using named keys in keyBy() for nested POJO types results in failure. The > iindexes for named key fields are used inconsistently with nested POJO types. > In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position > after (apparently) flattening the structure but is referenced in the > unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}. > In the example below, getFlatFields() returns positions 0, 1, and 14. These > positions appear correct in the flattened structure of the Data class. > However, in {{KeySelectorgetSelectorForKeys(Keys keys, > TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to > {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results > {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the > length of the directly named fields of the object vs the length of flattened > version of that type. > Concrete Example: > Consider this graph: > {code} > DataStream dataStream = see.addSource(new > FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), > kafkaConsumerProperties)); > dataStream > .flatMap(new DataMapper()) > .keyBy("aaa", "abc", "wxyz") > {code} > {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes > this NativeDataFormat object and extracts individual Data objects: {code} > public class Data { > public int aaa; > public int abc; > public long wxyz; > public int t1; > public int t2; > public Policy policy; > public Stats stats; > public Data() {} > {code} > A {{Policy}} object is an instance of this class: > {code} > public class Policy { > public short a; > public short b; > public boolean c; > public boolean d; > public Policy() {} > } > {code} > A {{Stats}} object is an instance of this class: > {code} > public class Stats { > public long count; > public float a; > public float b; > public float c; > public float d; > public float e; > public Stats() {} > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58562758 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +import java.util.UUID; + +/** + * This class wraps different Cassandra sink implementations to provide a common interface for all of them. + * + * @param input type + */ +public class CassandraSink { + private static final String jobID = UUID.randomUUID().toString().replace("-", "_"); --- End diff -- why should it be named sinkID when it is supposed to be a job-specific ID? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226491#comment-15226491 ] ASF GitHub Bot commented on FLINK-3311: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58562758 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +import java.util.UUID; + +/** + * This class wraps different Cassandra sink implementations to provide a common interface for all of them. + * + * @param input type + */ +public class CassandraSink { + private static final String jobID = UUID.randomUUID().toString().replace("-", "_"); --- End diff -- why should it be named sinkID when it is supposed to be a job-specific ID? > Add a connector for streaming data into Cassandra > - > > Key: FLINK-3311 > URL: https://issues.apache.org/jira/browse/FLINK-3311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Andrea Sella > > We had users in the past asking for a Flink+Cassandra integration. > It seems that there is a well-developed java client for connecting into > Cassandra: https://github.com/datastax/java-driver (ASL 2.0) > There are also tutorials out there on how to start a local cassandra instance > (for the tests): > http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html > For the data types, I think we should support TupleX types, and map standard > java types to the respective cassandra types. > In addition, it seems that there is a object mapper from datastax to store > POJOs in Cassandra (there are annotations for defining the primary key and > types) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3706) YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable
[ https://issues.apache.org/jira/browse/FLINK-3706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-3706: Attachment: log.txt > YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable > > > Key: FLINK-3706 > URL: https://issues.apache.org/jira/browse/FLINK-3706 > Project: Flink > Issue Type: Bug >Reporter: Aljoscha Krettek > Labels: test-stability > Attachments: log.txt > > > I encountered a failed test on travis. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3706) YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable
Aljoscha Krettek created FLINK-3706: --- Summary: YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable Key: FLINK-3706 URL: https://issues.apache.org/jira/browse/FLINK-3706 Project: Flink Issue Type: Bug Reporter: Aljoscha Krettek Attachments: log.txt I encountered a failed test on travis. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58562221 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.batch.connectors.cassandra.example; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Cluster.Builder; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import java.util.ArrayList; + +public class BatchExample { + private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);"; + private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;"; + + /* +* table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));" +*/ --- End diff -- I would remove this comment and move the query to the class javadocs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226486#comment-15226486 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58562221 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.batch.connectors.cassandra.example; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Cluster.Builder; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import java.util.ArrayList; + +public class BatchExample { + private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);"; + private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;"; + + /* +* table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));" +*/ --- End diff -- I would remove this comment and move the query to the class javadocs. > Add a connector for streaming data into Cassandra > - > > Key: FLINK-3311 > URL: https://issues.apache.org/jira/browse/FLINK-3311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Andrea Sella > > We had users in the past asking for a Flink+Cassandra integration. > It seems that there is a well-developed java client for connecting into > Cassandra: https://github.com/datastax/java-driver (ASL 2.0) > There are also tutorials out there on how to start a local cassandra instance > (for the tests): > http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html > For the data types, I think we should support TupleX types, and map standard > java types to the respective cassandra types. > In addition, it seems that there is a object mapper from datastax to store > POJOs in Cassandra (there are annotations for defining the primary key and > types) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226485#comment-15226485 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58562121 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.batch.connectors.cassandra.example; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Cluster.Builder; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import java.util.ArrayList; + +public class BatchExample { --- End diff -- Javadocs > Add a connector for streaming data into Cassandra > - > > Key: FLINK-3311 > URL: https://issues.apache.org/jira/browse/FLINK-3311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Andrea Sella > > We had users in the past asking for a Flink+Cassandra integration. > It seems that there is a well-developed java client for connecting into > Cassandra: https://github.com/datastax/java-driver (ASL 2.0) > There are also tutorials out there on how to start a local cassandra instance > (for the tests): > http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html > For the data types, I think we should support TupleX types, and map standard > java types to the respective cassandra types. > In addition, it seems that there is a object mapper from datastax to store > POJOs in Cassandra (there are annotations for defining the primary key and > types) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58562019 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java --- @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; + +import java.io.Serializable; + +public abstract class ClusterBuilder implements Serializable { --- End diff -- Since this is a user facing class, I would add some javadocs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226484#comment-15226484 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58562019 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/ClusterBuilder.java --- @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; + +import java.io.Serializable; + +public abstract class ClusterBuilder implements Serializable { --- End diff -- Since this is a user facing class, I would add some javadocs > Add a connector for streaming data into Cassandra > - > > Key: FLINK-3311 > URL: https://issues.apache.org/jira/browse/FLINK-3311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Andrea Sella > > We had users in the past asking for a Flink+Cassandra integration. > It seems that there is a well-developed java client for connecting into > Cassandra: https://github.com/datastax/java-driver (ASL 2.0) > There are also tutorials out there on how to start a local cassandra instance > (for the tests): > http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html > For the data types, I think we should support TupleX types, and map standard > java types to the respective cassandra types. > In addition, it seems that there is a object mapper from datastax to store > POJOs in Cassandra (there are annotations for defining the primary key and > types) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58562121 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchExample.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.batch.connectors.cassandra.example; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Cluster.Builder; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import java.util.ArrayList; + +public class BatchExample { --- End diff -- Javadocs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226483#comment-15226483 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58561933 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java --- @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Sink that emits its input elements into a Cassandra database. This sink stores incoming records within a + * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to cassandra + * if a checkpoint is completed. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraTupleWriteAheadSink extends GenericAtLeastOnceSink { + protected transient Cluster cluster; + protected transient Session session; + + private final String insertQuery; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + private transient FutureCallback callback; + + private ClusterBuilder builder; + + private int updatesSent = 0; + private AtomicInteger updatesConfirmed = new AtomicInteger(0); + + private transient Object[] fields; + + protected CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer serializer, ClusterBuilder builder, String jobID, CheckpointCommitter committer) throws Exception { + super(committer, serializer, jobID); + this.insertQuery = insertQuery; + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + public void open() throws Exception { + super.open(); + if (!getRuntimeContext().isCheckpointingEnabled()) { + throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled."); + } + this.callback = new FutureCallback() { + @Override + public void onSuccess(ResultSet resultSet) { + updatesConfirmed.incrementAndGet(); + } + + @Override + public void onFailure(Throwable throwable) { + exception = throwable; + } + }; + cluster = builder.getCluster(); + session = cluster.connect(); + preparedStatement = session.prepare(insertQuery); + + fields = new Object[((TupleSerializer) serializer).getArity()]; + } + + @Override + public void close() throws Exception { + super.close(); + try { + session.close(); + } catch (Exception e) { +
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58561933 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java --- @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Sink that emits its input elements into a Cassandra database. This sink stores incoming records within a + * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to cassandra + * if a checkpoint is completed. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraTupleWriteAheadSink extends GenericAtLeastOnceSink { + protected transient Cluster cluster; + protected transient Session session; + + private final String insertQuery; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + private transient FutureCallback callback; + + private ClusterBuilder builder; + + private int updatesSent = 0; + private AtomicInteger updatesConfirmed = new AtomicInteger(0); + + private transient Object[] fields; + + protected CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer serializer, ClusterBuilder builder, String jobID, CheckpointCommitter committer) throws Exception { + super(committer, serializer, jobID); + this.insertQuery = insertQuery; + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + public void open() throws Exception { + super.open(); + if (!getRuntimeContext().isCheckpointingEnabled()) { + throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled."); + } + this.callback = new FutureCallback() { + @Override + public void onSuccess(ResultSet resultSet) { + updatesConfirmed.incrementAndGet(); + } + + @Override + public void onFailure(Throwable throwable) { + exception = throwable; + } + }; + cluster = builder.getCluster(); + session = cluster.connect(); + preparedStatement = session.prepare(insertQuery); + + fields = new Object[((TupleSerializer) serializer).getArity()]; + } + + @Override + public void close() throws Exception { + super.close(); + try { + session.close(); + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + try { + cluster.close(); + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + }
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226480#comment-15226480 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58561402 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java --- @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Sink that emits its input elements into a Cassandra database. This sink stores incoming records within a + * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to cassandra + * if a checkpoint is completed. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraTupleWriteAheadSink extends GenericAtLeastOnceSink { + protected transient Cluster cluster; + protected transient Session session; + + private final String insertQuery; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + private transient FutureCallback callback; + + private ClusterBuilder builder; + + private int updatesSent = 0; + private AtomicInteger updatesConfirmed = new AtomicInteger(0); + + private transient Object[] fields; + + protected CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer serializer, ClusterBuilder builder, String jobID, CheckpointCommitter committer) throws Exception { + super(committer, serializer, jobID); + this.insertQuery = insertQuery; + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + public void open() throws Exception { + super.open(); + if (!getRuntimeContext().isCheckpointingEnabled()) { + throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled."); + } + this.callback = new FutureCallback() { + @Override + public void onSuccess(ResultSet resultSet) { + updatesConfirmed.incrementAndGet(); + } + + @Override + public void onFailure(Throwable throwable) { + exception = throwable; --- End diff -- Error logging here as well > Add a connector for streaming data into Cassandra > - > > Key: FLINK-3311 > URL: https://issues.apache.org/jira/browse/FLINK-3311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Andrea Sella > > We had users in the past asking for a Flink+Cassandra integration. > It seems that there
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58561402 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java --- @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericAtLeastOnceSink; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Sink that emits its input elements into a Cassandra database. This sink stores incoming records within a + * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to cassandra + * if a checkpoint is completed. + * + * @param Type of the elements emitted by this sink + */ +public class CassandraTupleWriteAheadSink extends GenericAtLeastOnceSink { + protected transient Cluster cluster; + protected transient Session session; + + private final String insertQuery; + private transient PreparedStatement preparedStatement; + + private transient Throwable exception = null; + private transient FutureCallback callback; + + private ClusterBuilder builder; + + private int updatesSent = 0; + private AtomicInteger updatesConfirmed = new AtomicInteger(0); + + private transient Object[] fields; + + protected CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer serializer, ClusterBuilder builder, String jobID, CheckpointCommitter committer) throws Exception { + super(committer, serializer, jobID); + this.insertQuery = insertQuery; + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + public void open() throws Exception { + super.open(); + if (!getRuntimeContext().isCheckpointingEnabled()) { + throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled."); + } + this.callback = new FutureCallback() { + @Override + public void onSuccess(ResultSet resultSet) { + updatesConfirmed.incrementAndGet(); + } + + @Override + public void onFailure(Throwable throwable) { + exception = throwable; --- End diff -- Error logging here as well --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat
[ https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226474#comment-15226474 ] Maximilian Michels edited comment on FLINK-3655 at 4/5/16 3:37 PM: --- Sounds good. It is important to maintain backwards compatibility. I'm not sure about the "comma-separated Path string". File names may contain commas. So we might skip that for now and do the path list first. I think we could also use {{readFile(FileInputFormat inputFormat, String.. filePaths)}} which will return the filePath as a {{String[] filepaths}} array. was (Author: mxm): Sounds good. It is important to maintain backwards compatibility. I'm not sure about the "comma-separated Path string". File names may contain commas. So we might skip that for now and do the path list first. I think we could also use {{readFile(FileInputFormat inputFormat, String.. filePaths}} which will return the filePath as a {{String[] filepaths}} array. > Allow comma-separated or multiple directories to be specified for > FileInputFormat > - > > Key: FLINK-3655 > URL: https://issues.apache.org/jira/browse/FLINK-3655 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Gna Phetsarath >Priority: Minor > Labels: starter > > Allow comma-separated or multiple directories to be specified for > FileInputFormat so that a DataSource will process the directories > sequentially. > > env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*") > in Scala >env.readFile(paths: Seq[String]) > or > env.readFile(path: String, otherPaths: String*) > Wildcard support would be a bonus. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226477#comment-15226477 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58561096 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}. + * + * @param Type of the elements emitted by this sink + */ +public abstract class CassandraSinkBaseextends RichSinkFunction { + protected static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBase.class); + protected transient Cluster cluster; + protected transient Session session; + + protected transient Throwable exception = null; + protected transient FutureCallback callback; + + private final ClusterBuilder builder; + + protected CassandraSinkBase(ClusterBuilder builder) { + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + @Override + public void open(Configuration configuration) { + this.callback = new FutureCallback() { + @Override + public void onSuccess(V ignored) { + } + + @Override + public void onFailure(Throwable t) { + exception = t; --- End diff -- Can you log the exception as well. Maybe invoke() is never called after the failure and nobody knows what's going on > Add a connector for streaming data into Cassandra > - > > Key: FLINK-3311 > URL: https://issues.apache.org/jira/browse/FLINK-3311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Andrea Sella > > We had users in the past asking for a Flink+Cassandra integration. > It seems that there is a well-developed java client for connecting into > Cassandra: https://github.com/datastax/java-driver (ASL 2.0) > There are also tutorials out there on how to start a local cassandra instance > (for the tests): > http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html > For the data types, I think we should support TupleX types, and map standard > java types to the respective cassandra types. > In addition, it seems that there is a object mapper from datastax to store > POJOs in Cassandra (there are annotations for defining the primary key and > types) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat
[ https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226474#comment-15226474 ] Maximilian Michels commented on FLINK-3655: --- Sounds good. It is important to maintain backwards compatibility. I'm not sure about the "comma-separated Path string". File names may contain commas. So we might skip that for now and do the path list first. I think we could also use {{readFile(FileInputFormat inputFormat, String.. filePaths}} which will return the filePath as a {{String[] filepaths}} array. > Allow comma-separated or multiple directories to be specified for > FileInputFormat > - > > Key: FLINK-3655 > URL: https://issues.apache.org/jira/browse/FLINK-3655 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Gna Phetsarath >Priority: Minor > Labels: starter > > Allow comma-separated or multiple directories to be specified for > FileInputFormat so that a DataSource will process the directories > sequentially. > > env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*") > in Scala >env.readFile(paths: Seq[String]) > or > env.readFile(path: String, otherPaths: String*) > Wildcard support would be a bonus. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58561096 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}. + * + * @param Type of the elements emitted by this sink + */ +public abstract class CassandraSinkBaseextends RichSinkFunction { + protected static final Logger LOG = LoggerFactory.getLogger(CassandraSinkBase.class); + protected transient Cluster cluster; + protected transient Session session; + + protected transient Throwable exception = null; + protected transient FutureCallback callback; + + private final ClusterBuilder builder; + + protected CassandraSinkBase(ClusterBuilder builder) { + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + @Override + public void open(Configuration configuration) { + this.callback = new FutureCallback() { + @Override + public void onSuccess(V ignored) { + } + + @Override + public void onFailure(Throwable t) { + exception = t; --- End diff -- Can you log the exception as well. Maybe invoke() is never called after the failure and nobody knows what's going on --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3705) Provide explanation for Hadoop dependencies and how to configure it
Ufuk Celebi created FLINK-3705: -- Summary: Provide explanation for Hadoop dependencies and how to configure it Key: FLINK-3705 URL: https://issues.apache.org/jira/browse/FLINK-3705 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Ufuk Celebi Priority: Minor I think it will be valuable to add a dedicated page explaining how Flink interacts with Hadoop and why it has the dependency. Furthermore, it will be beneficial to give an overview of how to configure it, because there are multiple ways to do it (in Flink's config file, environment varaiables) and multiple scenarios where you do it (standaline, YARN). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3614) Remove Non-Keyed Window Operator
[ https://issues.apache.org/jira/browse/FLINK-3614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226444#comment-15226444 ] ASF GitHub Bot commented on FLINK-3614: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/1805 > Remove Non-Keyed Window Operator > > > Key: FLINK-3614 > URL: https://issues.apache.org/jira/browse/FLINK-3614 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > I propose to remove the special Non-Keyed Window Operator and implement > non-parallel windows by using the standard WindowOperator with a dummy > KeySelector. > Maintaining everything for two WindowOperators is a huge burden. The > implementation is completely separate by now. For example, the Non-Keyed > window operator does not use the StateBackend for state, i.e. cannot use > RocksDB. Also, with upcoming changes (Merging/Session windows, aligned > windows) this will only increase the maintenance burden. > Also, the fast AlignedProcessingTimeWindows operators also only support the > Parallel/Keyed case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3174) Add merging WindowAssigner
[ https://issues.apache.org/jira/browse/FLINK-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek resolved FLINK-3174. - Resolution: Fixed Fix Version/s: (was: 1.0.0) 1.1.0 Done in https://github.com/apache/flink/commit/6cd8ceb10c841827cf89b74ecf5a0495a6933d53 > Add merging WindowAssigner > -- > > Key: FLINK-3174 > URL: https://issues.apache.org/jira/browse/FLINK-3174 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.1.0 > > > We should add the possibility for WindowAssigners to merge windows. This will > enable Session windowing support, similar to how Google Cloud Dataflow > supports. > For session windows, each element would initially be assigned to its own > window. When triggering we check the windows and see if any can be merged. > This way, elements with overlapping session windows merge into one session. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226447#comment-15226447 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58557862 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +import java.util.UUID; + +/** + * This class wraps different Cassandra sink implementations to provide a common interface for all of them. + * + * @param input type + */ +public class CassandraSink { + private static final String jobID = UUID.randomUUID().toString().replace("-", "_"); + private final boolean useDataStreamSink; + private DataStreamSink sink1; + private SingleOutputStreamOperator sink2; + + private CassandraSink(DataStreamSink sink) { + sink1 = sink; + useDataStreamSink = true; + } + + private CassandraSink(SingleOutputStreamOperator sink) { + sink2 = sink; + useDataStreamSink = false; + } + + private SinkTransformation getSinkTransformation() { + return sink1.getTransformation(); + } + + private StreamTransformation getStreamTransformation() { + return sink2.getTransformation(); + } + + /** +* Sets the name of this sink. This name is +* used by the visualization and logging during runtime. +* +* @return The named sink. +*/ + public CassandraSink name(String name) { + if (useDataStreamSink) { + getSinkTransformation().setName(name); + } else { + getStreamTransformation().setName(name); + } + return this; + } + + /** +* Sets an ID for this operator. +* +* The specified ID is used to assign the same operator ID across job +* submissions (for example when starting a job from a savepoint). +* +* Important: this ID needs to be unique per +* transformation and job. Otherwise, job submission will fail. +* +* @param uid The unique user-specified ID of this transformation. +* @return The operator with the specified ID. +*/ + public CassandraSink uid(String uid) { + if (useDataStreamSink) { + getSinkTransformation().setUid(uid); + } else { + getStreamTransformation().setUid(uid); + } + return this; + } + + /** +* Sets the parallelism for this sink. The degree must be higher than zero. +* +* @param parallelism The parallelism for this sink. +* @return The sink with set parallelism. +*/ + public CassandraSink setParallelism(int parallelism) { + if (useDataStreamSink) { +
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58557862 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +import java.util.UUID; + +/** + * This class wraps different Cassandra sink implementations to provide a common interface for all of them. + * + * @param input type + */ +public class CassandraSink { + private static final String jobID = UUID.randomUUID().toString().replace("-", "_"); + private final boolean useDataStreamSink; + private DataStreamSink sink1; + private SingleOutputStreamOperator sink2; + + private CassandraSink(DataStreamSink sink) { + sink1 = sink; + useDataStreamSink = true; + } + + private CassandraSink(SingleOutputStreamOperator sink) { + sink2 = sink; + useDataStreamSink = false; + } + + private SinkTransformation getSinkTransformation() { + return sink1.getTransformation(); + } + + private StreamTransformation getStreamTransformation() { + return sink2.getTransformation(); + } + + /** +* Sets the name of this sink. This name is +* used by the visualization and logging during runtime. +* +* @return The named sink. +*/ + public CassandraSink name(String name) { + if (useDataStreamSink) { + getSinkTransformation().setName(name); + } else { + getStreamTransformation().setName(name); + } + return this; + } + + /** +* Sets an ID for this operator. +* +* The specified ID is used to assign the same operator ID across job +* submissions (for example when starting a job from a savepoint). +* +* Important: this ID needs to be unique per +* transformation and job. Otherwise, job submission will fail. +* +* @param uid The unique user-specified ID of this transformation. +* @return The operator with the specified ID. +*/ + public CassandraSink uid(String uid) { + if (useDataStreamSink) { + getSinkTransformation().setUid(uid); + } else { + getStreamTransformation().setUid(uid); + } + return this; + } + + /** +* Sets the parallelism for this sink. The degree must be higher than zero. +* +* @param parallelism The parallelism for this sink. +* @return The sink with set parallelism. +*/ + public CassandraSink setParallelism(int parallelism) { + if (useDataStreamSink) { + getSinkTransformation().setParallelism(parallelism); + } else { + getStreamTransformation().setParallelism(parallelism); + } + return this; + } +
[jira] [Resolved] (FLINK-3614) Remove Non-Keyed Window Operator
[ https://issues.apache.org/jira/browse/FLINK-3614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek resolved FLINK-3614. - Resolution: Fixed Fix Version/s: 1.1.0 Done in https://github.com/apache/flink/commit/505512dbe461b9840dde6197c71dbb90b49c0495 > Remove Non-Keyed Window Operator > > > Key: FLINK-3614 > URL: https://issues.apache.org/jira/browse/FLINK-3614 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.1.0 > > > I propose to remove the special Non-Keyed Window Operator and implement > non-parallel windows by using the standard WindowOperator with a dummy > KeySelector. > Maintaining everything for two WindowOperators is a huge burden. The > implementation is completely separate by now. For example, the Non-Keyed > window operator does not use the StateBackend for state, i.e. cannot use > RocksDB. Also, with upcoming changes (Merging/Session windows, aligned > windows) this will only increase the maintenance burden. > Also, the fast AlignedProcessingTimeWindows operators also only support the > Parallel/Keyed case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3614] Remove Non-Keyed Window Operator
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/1805 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3174) Add merging WindowAssigner
[ https://issues.apache.org/jira/browse/FLINK-3174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226441#comment-15226441 ] ASF GitHub Bot commented on FLINK-3174: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/1802 > Add merging WindowAssigner > -- > > Key: FLINK-3174 > URL: https://issues.apache.org/jira/browse/FLINK-3174 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.0.0 > > > We should add the possibility for WindowAssigners to merge windows. This will > enable Session windowing support, similar to how Google Cloud Dataflow > supports. > For session windows, each element would initially be assigned to its own > window. When triggering we check the windows and see if any can be merged. > This way, elements with overlapping session windows merge into one session. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3614) Remove Non-Keyed Window Operator
[ https://issues.apache.org/jira/browse/FLINK-3614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226443#comment-15226443 ] ASF GitHub Bot commented on FLINK-3614: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1805#issuecomment-205855000 Manually merged. > Remove Non-Keyed Window Operator > > > Key: FLINK-3614 > URL: https://issues.apache.org/jira/browse/FLINK-3614 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > I propose to remove the special Non-Keyed Window Operator and implement > non-parallel windows by using the standard WindowOperator with a dummy > KeySelector. > Maintaining everything for two WindowOperators is a huge burden. The > implementation is completely separate by now. For example, the Non-Keyed > window operator does not use the StateBackend for state, i.e. cannot use > RocksDB. Also, with upcoming changes (Merging/Session windows, aligned > windows) this will only increase the maintenance burden. > Also, the fast AlignedProcessingTimeWindows operators also only support the > Parallel/Keyed case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3614] Remove Non-Keyed Window Operator
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1805#issuecomment-205855000 Manually merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3174] Add MergingWindowAssigner and Ses...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/1802 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226433#comment-15226433 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58556585 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +import java.util.UUID; + +/** + * This class wraps different Cassandra sink implementations to provide a common interface for all of them. + * + * @param input type + */ +public class CassandraSink { + private static final String jobID = UUID.randomUUID().toString().replace("-", "_"); + private final boolean useDataStreamSink; + private DataStreamSink sink1; + private SingleOutputStreamOperator sink2; + + private CassandraSink(DataStreamSink sink) { + sink1 = sink; + useDataStreamSink = true; + } + + private CassandraSink(SingleOutputStreamOperator sink) { + sink2 = sink; + useDataStreamSink = false; + } + + private SinkTransformation getSinkTransformation() { + return sink1.getTransformation(); + } + + private StreamTransformation getStreamTransformation() { + return sink2.getTransformation(); + } + + /** +* Sets the name of this sink. This name is +* used by the visualization and logging during runtime. +* +* @return The named sink. +*/ + public CassandraSink name(String name) { + if (useDataStreamSink) { + getSinkTransformation().setName(name); + } else { + getStreamTransformation().setName(name); + } + return this; + } + + /** +* Sets an ID for this operator. +* +* The specified ID is used to assign the same operator ID across job +* submissions (for example when starting a job from a savepoint). +* +* Important: this ID needs to be unique per +* transformation and job. Otherwise, job submission will fail. +* +* @param uid The unique user-specified ID of this transformation. +* @return The operator with the specified ID. +*/ + public CassandraSink uid(String uid) { + if (useDataStreamSink) { + getSinkTransformation().setUid(uid); + } else { + getStreamTransformation().setUid(uid); + } + return this; + } + + /** +* Sets the parallelism for this sink. The degree must be higher than zero. +* +* @param parallelism The parallelism for this sink. +* @return The sink with set parallelism. +*/ + public CassandraSink setParallelism(int parallelism) { + if (useDataStreamSink) { +
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58556585 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +import java.util.UUID; + +/** + * This class wraps different Cassandra sink implementations to provide a common interface for all of them. + * + * @param input type + */ +public class CassandraSink { + private static final String jobID = UUID.randomUUID().toString().replace("-", "_"); + private final boolean useDataStreamSink; + private DataStreamSink sink1; + private SingleOutputStreamOperator sink2; + + private CassandraSink(DataStreamSink sink) { + sink1 = sink; + useDataStreamSink = true; + } + + private CassandraSink(SingleOutputStreamOperator sink) { + sink2 = sink; + useDataStreamSink = false; + } + + private SinkTransformation getSinkTransformation() { + return sink1.getTransformation(); + } + + private StreamTransformation getStreamTransformation() { + return sink2.getTransformation(); + } + + /** +* Sets the name of this sink. This name is +* used by the visualization and logging during runtime. +* +* @return The named sink. +*/ + public CassandraSink name(String name) { + if (useDataStreamSink) { + getSinkTransformation().setName(name); + } else { + getStreamTransformation().setName(name); + } + return this; + } + + /** +* Sets an ID for this operator. +* +* The specified ID is used to assign the same operator ID across job +* submissions (for example when starting a job from a savepoint). +* +* Important: this ID needs to be unique per +* transformation and job. Otherwise, job submission will fail. +* +* @param uid The unique user-specified ID of this transformation. +* @return The operator with the specified ID. +*/ + public CassandraSink uid(String uid) { + if (useDataStreamSink) { + getSinkTransformation().setUid(uid); + } else { + getStreamTransformation().setUid(uid); + } + return this; + } + + /** +* Sets the parallelism for this sink. The degree must be higher than zero. +* +* @param parallelism The parallelism for this sink. +* @return The sink with set parallelism. +*/ + public CassandraSink setParallelism(int parallelism) { + if (useDataStreamSink) { + getSinkTransformation().setParallelism(parallelism); + } else { + getStreamTransformation().setParallelism(parallelism); + } + return this; + } +
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226428#comment-15226428 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58556061 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +import java.util.UUID; + +/** + * This class wraps different Cassandra sink implementations to provide a common interface for all of them. + * + * @param input type + */ +public class CassandraSink { + private static final String jobID = UUID.randomUUID().toString().replace("-", "_"); --- End diff -- `jobID` is used within Flink a lot and has a different meaning there. Maybe it make sense to rename this to "sink id". Why is this field static? This can lead to problems when a user is using two cassandra sinks > Add a connector for streaming data into Cassandra > - > > Key: FLINK-3311 > URL: https://issues.apache.org/jira/browse/FLINK-3311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Andrea Sella > > We had users in the past asking for a Flink+Cassandra integration. > It seems that there is a well-developed java client for connecting into > Cassandra: https://github.com/datastax/java-driver (ASL 2.0) > There are also tutorials out there on how to start a local cassandra instance > (for the tests): > http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html > For the data types, I think we should support TupleX types, and map standard > java types to the respective cassandra types. > In addition, it seems that there is a object mapper from datastax to store > POJOs in Cassandra (there are annotations for defining the primary key and > types) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58556061 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +import java.util.UUID; + +/** + * This class wraps different Cassandra sink implementations to provide a common interface for all of them. + * + * @param input type + */ +public class CassandraSink { + private static final String jobID = UUID.randomUUID().toString().replace("-", "_"); --- End diff -- `jobID` is used within Flink a lot and has a different meaning there. Maybe it make sense to rename this to "sink id". Why is this field static? This can lead to problems when a user is using two cassandra sinks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58548774 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java --- @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { + private ClusterBuilder builder; + private transient Cluster cluster; + private transient Session session; + + private static final String KEYSPACE = "flink_auxiliary"; + private String TABLE = "checkpoints_"; --- End diff -- by convention, (non static) fields are lowercase --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58548996 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java --- @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { + private ClusterBuilder builder; + private transient Cluster cluster; + private transient Session session; + + private static final String KEYSPACE = "flink_auxiliary"; + private String TABLE = "checkpoints_"; + + private transient PreparedStatement deleteStatement; + private transient PreparedStatement updateStatement; + private transient PreparedStatement selectStatement; + + public CassandraCommitter(ClusterBuilder builder) { + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + /** +* Internally used to set the job ID after instantiation. +* +* @param id +* @throws Exception +*/ + public void setJobId(String id) throws Exception { + super.setJobId(id); + TABLE += id; + } + + /** +* Generates the necessary tables to store information. +* +* @return +* @throws Exception +*/ + @Override + public void createResource() throws Exception { + cluster = builder.getCluster(); + session = cluster.connect(); + + session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", KEYSPACE)); + session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", KEYSPACE, TABLE)); --- End diff -- I think it would be better to allow users passing a custom keyspace. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226375#comment-15226375 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58548996 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java --- @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { + private ClusterBuilder builder; + private transient Cluster cluster; + private transient Session session; + + private static final String KEYSPACE = "flink_auxiliary"; + private String TABLE = "checkpoints_"; + + private transient PreparedStatement deleteStatement; + private transient PreparedStatement updateStatement; + private transient PreparedStatement selectStatement; + + public CassandraCommitter(ClusterBuilder builder) { + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + /** +* Internally used to set the job ID after instantiation. +* +* @param id +* @throws Exception +*/ + public void setJobId(String id) throws Exception { + super.setJobId(id); + TABLE += id; + } + + /** +* Generates the necessary tables to store information. +* +* @return +* @throws Exception +*/ + @Override + public void createResource() throws Exception { + cluster = builder.getCluster(); + session = cluster.connect(); + + session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", KEYSPACE)); + session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", KEYSPACE, TABLE)); --- End diff -- I think it would be better to allow users passing a custom keyspace. > Add a connector for streaming data into Cassandra > - > > Key: FLINK-3311 > URL: https://issues.apache.org/jira/browse/FLINK-3311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Andrea Sella > > We had users in the past asking for a Flink+Cassandra integration. > It seems that there is a well-developed java client for connecting into > Cassandra: https://github.com/datastax/java-driver (ASL 2.0) > There are also tutorials out there on how to start a local cassandra instance > (for the tests): > http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html > For the data types, I think we should support TupleX types, and map standard > java types to the respective cassandra types. > In addition, it seems that there is a object mapper from datastax to store > POJOs in Cassandra (there are annotations for defining the primary key and > types) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226373#comment-15226373 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58548774 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java --- @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { + private ClusterBuilder builder; + private transient Cluster cluster; + private transient Session session; + + private static final String KEYSPACE = "flink_auxiliary"; + private String TABLE = "checkpoints_"; --- End diff -- by convention, (non static) fields are lowercase > Add a connector for streaming data into Cassandra > - > > Key: FLINK-3311 > URL: https://issues.apache.org/jira/browse/FLINK-3311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Andrea Sella > > We had users in the past asking for a Flink+Cassandra integration. > It seems that there is a well-developed java client for connecting into > Cassandra: https://github.com/datastax/java-driver (ASL 2.0) > There are also tutorials out there on how to start a local cassandra instance > (for the tests): > http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html > For the data types, I think we should support TupleX types, and map standard > java types to the respective cassandra types. > In addition, it seems that there is a object mapper from datastax to store > POJOs in Cassandra (there are annotations for defining the primary key and > types) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58548581 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.batch.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.base.Strings; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} into Apache Cassandra. + * + * @param type of Tuple + */ +public class CassandraOutputFormat extends RichOutputFormat { + private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class); + + private final String insertQuery; + private final ClusterBuilder builder; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement prepared; + private transient FutureCallback callback; + private transient Throwable exception = null; + + public CassandraOutputFormat(String insertQuery, ClusterBuilder builder) { + if (Strings.isNullOrEmpty(insertQuery)) { + throw new IllegalArgumentException("insertQuery cannot be null or empty"); + } + if (builder == null) { + throw new IllegalArgumentException("Builder cannot be null."); + } + this.insertQuery = insertQuery; + this.builder = builder; + } + + @Override + public void configure(Configuration parameters) { + this.cluster = builder.getCluster(); + } + + /** +* Opens a Session to Cassandra and initializes the prepared statement. +* +* @param taskNumber The number of the parallel instance. +* @throws IOException Thrown, if the output could not be opened due to an +* I/O problem. +*/ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + this.session = cluster.connect(); + this.prepared = session.prepare(insertQuery); + this.callback = new FutureCallback() { + @Override + public void onSuccess(ResultSet ignored) { + } + + @Override + public void onFailure(Throwable t) { + exception = t; + } + }; + } + + @Override + public void writeRecord(OUT record) throws IOException { + if (exception != null) { + throw new IOException("write record failed", exception); + } + + Object[] fields = new Object[record.getArity()]; + for (int i = 0; i < record.getArity(); i++) { + fields[i] = record.getField(i); + } + ResultSetFuture result = session.executeAsync(prepared.bind(fields)); + Futures.addCallback(result, callback); + } + + /** +* Closes all resources used. +*/ + @Override + public void close() throws IOException { + try {
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226369#comment-15226369 ] ASF GitHub Bot commented on FLINK-3311: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r58548581 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.batch.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.base.Strings; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} into Apache Cassandra. + * + * @param type of Tuple + */ +public class CassandraOutputFormat extends RichOutputFormat { + private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class); + + private final String insertQuery; + private final ClusterBuilder builder; + + private transient Cluster cluster; + private transient Session session; + private transient PreparedStatement prepared; + private transient FutureCallback callback; + private transient Throwable exception = null; + + public CassandraOutputFormat(String insertQuery, ClusterBuilder builder) { + if (Strings.isNullOrEmpty(insertQuery)) { + throw new IllegalArgumentException("insertQuery cannot be null or empty"); + } + if (builder == null) { + throw new IllegalArgumentException("Builder cannot be null."); + } + this.insertQuery = insertQuery; + this.builder = builder; + } + + @Override + public void configure(Configuration parameters) { + this.cluster = builder.getCluster(); + } + + /** +* Opens a Session to Cassandra and initializes the prepared statement. +* +* @param taskNumber The number of the parallel instance. +* @throws IOException Thrown, if the output could not be opened due to an +* I/O problem. +*/ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + this.session = cluster.connect(); + this.prepared = session.prepare(insertQuery); + this.callback = new FutureCallback() { + @Override + public void onSuccess(ResultSet ignored) { + } + + @Override + public void onFailure(Throwable t) { + exception = t; + } + }; + } + + @Override + public void writeRecord(OUT record) throws IOException { + if (exception != null) { + throw new IOException("write record failed", exception); + } + + Object[] fields = new Object[record.getArity()]; + for (int i = 0; i < record.getArity(); i++) { + fields[i] = record.getField(i); + } + ResultSetFuture result =
[jira] [Created] (FLINK-3704) JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure unstable
Robert Metzger created FLINK-3704: - Summary: JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure unstable Key: FLINK-3704 URL: https://issues.apache.org/jira/browse/FLINK-3704 Project: Flink Issue Type: Bug Components: JobManager Reporter: Robert Metzger https://s3.amazonaws.com/archive.travis-ci.org/jobs/120882840/log.txt {code} testJobManagerProcessFailure[1](org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase) Time elapsed: 9.302 sec <<< ERROR! java.io.IOException: Actor at akka.tcp://flink@127.0.0.1:55591/user/jobmanager not reachable. Please make sure that the actor is running and its port is reachable. at org.apache.flink.runtime.akka.AkkaUtils$.getActorRef(AkkaUtils.scala:384) at org.apache.flink.runtime.akka.AkkaUtils.getActorRef(AkkaUtils.scala) at org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure(JobManagerHAProcessFailureBatchRecoveryITCase.java:290) Caused by: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:55591/), Path(/user/jobmanager)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointWriter.postStop(Endpoint.scala:561) at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3398) Flink Kafka consumer should support auto-commit opt-outs
[ https://issues.apache.org/jira/browse/FLINK-3398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226294#comment-15226294 ] ASF GitHub Bot commented on FLINK-3398: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1690#issuecomment-205819299 I think the functionality is very desirable. In many cases, you really don't want Flink to write something to Zookeeper for some random GroupId. This floods ZooKeeper with garbage that helps no one. I think that this is actually a kind of nice solution. The `auto.commit.enable` in Flink means not necessary periodically, but "on checkpoint or periodically". > Flink Kafka consumer should support auto-commit opt-outs > > > Key: FLINK-3398 > URL: https://issues.apache.org/jira/browse/FLINK-3398 > Project: Flink > Issue Type: Bug >Reporter: Shikhar Bhushan > > Currently the Kafka source will commit consumer offsets to Zookeeper, either > upon a checkpoint if checkpointing is enabled, otherwise periodically based > on {{auto.commit.interval.ms}} > It should be possible to opt-out of committing consumer offsets to Zookeeper. > Kafka has this config as {{auto.commit.enable}} (0.8) and > {{enable.auto.commit}} (0.9). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3398: Allow for opting-out from Kafka of...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1690#issuecomment-205819299 I think the functionality is very desirable. In many cases, you really don't want Flink to write something to Zookeeper for some random GroupId. This floods ZooKeeper with garbage that helps no one. I think that this is actually a kind of nice solution. The `auto.commit.enable` in Flink means not necessary periodically, but "on checkpoint or periodically". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3541) Clean up workaround in FlinkKafkaConsumer09
[ https://issues.apache.org/jira/browse/FLINK-3541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226271#comment-15226271 ] ASF GitHub Bot commented on FLINK-3541: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1846#issuecomment-205811304 Yes, we can drop the retry loop. > Clean up workaround in FlinkKafkaConsumer09 > > > Key: FLINK-3541 > URL: https://issues.apache.org/jira/browse/FLINK-3541 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Priority: Minor > > In the current {{FlinkKafkaConsumer09}} implementation, we repeatedly start a > new {{KafkaConsumer}} if the method {{KafkaConsumer.partitionsFor}} returns a > NPE. This is due to a bug with the Kafka version 0.9.0.0. See > https://issues.apache.org/jira/browse/KAFKA-2880. The code can be found in > the constructor of {{FlinkKafkaConsumer09.java:208}}. > However, the problem is marked as fixed for version 0.9.0.1, which we also > use for the flink-connector-kafka. Therefore, we should be able to get rid of > the workaround. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3541] [Kafka Connector] Clean up workar...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1846#issuecomment-205811304 Yes, we can drop the retry loop. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3700] [core] Add 'Preconditions' utilit...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1853#issuecomment-205811166 +1 for merging :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3700) Replace Guava Preconditions class with Flink Preconditions
[ https://issues.apache.org/jira/browse/FLINK-3700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226270#comment-15226270 ] ASF GitHub Bot commented on FLINK-3700: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1853#issuecomment-205811166 +1 for merging :-) > Replace Guava Preconditions class with Flink Preconditions > -- > > Key: FLINK-3700 > URL: https://issues.apache.org/jira/browse/FLINK-3700 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > > In order to reduce the dependency on Guava (which has cause us quite a bit of > pain in the past with its version conflicts), I suggest to add a Flink > {{Preconditions}} class. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3700) Replace Guava Preconditions class with Flink Preconditions
[ https://issues.apache.org/jira/browse/FLINK-3700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226250#comment-15226250 ] ASF GitHub Bot commented on FLINK-3700: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/1854 [FLINK-3700] [core] Remove Guava as a dependency from "flink-core" **Note: This builds on top of pull request #1853** Almost all Guava functionality used within `flink-core` has corresponding utils in Flink's codebase, or the JDK library. This replaces the Guava code as follows - Preconditions calls by Flink's Preconditions class - Collection utils by simple Java Collection calls - Iterator's by Flink's Union Iterator - Murmur Hasher calls by Flink's `MathUtil.murmurHash()` - Files by simple util methods around `java.nio.Files` - InetAddresses IPv6 encoding code has been adapted into Flink's NetUtils (with attribution comments) Some util classes where moved from `flink-runtime` to `flink-core`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink guava_free_core Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1854.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1854 commit 535e83a297d39315d9d4b36672444b44f409c81e Author: Stephan EwenDate: 2016-04-05T10:37:33Z [FLINK-3700] [build] Add 'findbugs' (javax.annotation) annotations as a core dependency. commit eac0813d9f7298211bf52aec61ce346a133436f8 Author: Stephan Ewen Date: 2016-04-05T11:23:14Z [FLINK-3700] [core] Add 'Preconditions' utility class. commit 052db9286ad8aa8df3e122c300361dbc384a0190 Author: Stephan Ewen Date: 2016-04-05T13:18:32Z [FLINK-3700] [core] Removes Guava Dependency from flink-core Almost all Guava functionality used within flink-core has corresponding utils in Flink's codebase, or the JDK library. This replaces the Guava code as follows - Preconditions calls by Flink's Preconditions class - Collection utils by simple Java Collection calls - Iterator's by Flink's Union Iterator - Files by simple util methods arount java.nio.Files - InetAddresses IPv6 encoding code has been adapted into Flink's NetUtils (with attribution comments) Some util classes where moved from flink-runtime to flink-core. > Replace Guava Preconditions class with Flink Preconditions > -- > > Key: FLINK-3700 > URL: https://issues.apache.org/jira/browse/FLINK-3700 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > > In order to reduce the dependency on Guava (which has cause us quite a bit of > pain in the past with its version conflicts), I suggest to add a Flink > {{Preconditions}} class. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3700] [core] Remove Guava as a dependen...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/1854 [FLINK-3700] [core] Remove Guava as a dependency from "flink-core" **Note: This builds on top of pull request #1853** Almost all Guava functionality used within `flink-core` has corresponding utils in Flink's codebase, or the JDK library. This replaces the Guava code as follows - Preconditions calls by Flink's Preconditions class - Collection utils by simple Java Collection calls - Iterator's by Flink's Union Iterator - Murmur Hasher calls by Flink's `MathUtil.murmurHash()` - Files by simple util methods around `java.nio.Files` - InetAddresses IPv6 encoding code has been adapted into Flink's NetUtils (with attribution comments) Some util classes where moved from `flink-runtime` to `flink-core`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink guava_free_core Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1854.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1854 commit 535e83a297d39315d9d4b36672444b44f409c81e Author: Stephan EwenDate: 2016-04-05T10:37:33Z [FLINK-3700] [build] Add 'findbugs' (javax.annotation) annotations as a core dependency. commit eac0813d9f7298211bf52aec61ce346a133436f8 Author: Stephan Ewen Date: 2016-04-05T11:23:14Z [FLINK-3700] [core] Add 'Preconditions' utility class. commit 052db9286ad8aa8df3e122c300361dbc384a0190 Author: Stephan Ewen Date: 2016-04-05T13:18:32Z [FLINK-3700] [core] Removes Guava Dependency from flink-core Almost all Guava functionality used within flink-core has corresponding utils in Flink's codebase, or the JDK library. This replaces the Guava code as follows - Preconditions calls by Flink's Preconditions class - Collection utils by simple Java Collection calls - Iterator's by Flink's Union Iterator - Files by simple util methods arount java.nio.Files - InetAddresses IPv6 encoding code has been adapted into Flink's NetUtils (with attribution comments) Some util classes where moved from flink-runtime to flink-core. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3700) Replace Guava Preconditions class with Flink Preconditions
[ https://issues.apache.org/jira/browse/FLINK-3700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226246#comment-15226246 ] ASF GitHub Bot commented on FLINK-3700: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/1853 [FLINK-3700] [core] Add 'Preconditions' utility class. The functionality that Flink uses from Guava is super simple and limited. We get a big dependency that has caused a lot of pain in the past, simply to get access to some simple utility methods. This has cause us quite a bit of pain in the past, because of Guava version conflicts and the necessary dependency shading. In order to reduce the dependency on Guava, this adds a simple Flink Preconditions class. While referencing well established libraries is a good idea for standalone apps, it is a problem for frameworks like Flink, if the dependencies are not well-behaved with respect to version compatibility. Guava is not well behaved in that sense. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink flink_preconditions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1853.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1853 > Replace Guava Preconditions class with Flink Preconditions > -- > > Key: FLINK-3700 > URL: https://issues.apache.org/jira/browse/FLINK-3700 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > > In order to reduce the dependency on Guava (which has cause us quite a bit of > pain in the past with its version conflicts), I suggest to add a Flink > {{Preconditions}} class. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3700] [core] Add 'Preconditions' utilit...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/1853 [FLINK-3700] [core] Add 'Preconditions' utility class. The functionality that Flink uses from Guava is super simple and limited. We get a big dependency that has caused a lot of pain in the past, simply to get access to some simple utility methods. This has cause us quite a bit of pain in the past, because of Guava version conflicts and the necessary dependency shading. In order to reduce the dependency on Guava, this adds a simple Flink Preconditions class. While referencing well established libraries is a good idea for standalone apps, it is a problem for frameworks like Flink, if the dependencies are not well-behaved with respect to version compatibility. Guava is not well behaved in that sense. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink flink_preconditions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1853.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1853 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3689) JobManager blocks cluster shutdown when not connected to ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-3689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226229#comment-15226229 ] ASF GitHub Bot commented on FLINK-3689: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/1852 [FLINK-3689] do not shutdown test ActorSystem Instead of shutting down the ActorSystem created in the test, we simply send a message upon executing the shutdown method of the JobManager, TaskManager, and ResourceManager. This ensures we can check for shutdown code execution without interfering with the test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-3689 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1852.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1852 commit 39e9bb7be3a000cf7e47c4f6dd4856d36832480e Author: Maximilian MichelsDate: 2016-04-05T12:35:47Z [FLINK-3689] do not shutdown test ActorSystem Instead of shutting down the ActorSystem created in the test, we simply send a message upon executing the shutdown method of the JobManager, TaskManager, and ResourceManager. This ensures we can check for shutdown code execution without interfering with the test. > JobManager blocks cluster shutdown when not connected to ResourceManager > > > Key: FLINK-3689 > URL: https://issues.apache.org/jira/browse/FLINK-3689 > Project: Flink > Issue Type: Bug > Components: JobManager, ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)