[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5964


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-17 Thread ctamisier
Github user ctamisier commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r189056714
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -429,6 +429,26 @@ public void testCassandraPojoAtLeastOnceSink() throws 
Exception {
Assert.assertEquals(20, rs.all().size());
}
 
+   @Test
+   public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() 
throws Exception {
--- End diff --

Resolved. It was used in CassandraPojoSinkBuilder.createSink(),


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread ctamisier
Github user ctamisier commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188727153
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -381,6 +393,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
--- End diff --

Oh yes! so i will change for CassandraTupleSinkBuilder, 
CassandraRowSinkBuilder and CassandraScalaProductSinkBuilder


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188722661
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -381,6 +393,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
--- End diff --

we eagerly check for empty strings for the query since that will definitely 
fail later on, since an empty query doesn't make sense.

The check here is to prevent non-pojo sinks to be configured with 
pojo-specific options. An empty keyspace _should_ throw an exception in this 
case, as the method isn't even supposed to be used.


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread ctamisier
Github user ctamisier commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188717750
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -381,6 +393,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
--- End diff --

there is a check on query.length() just above, shouldn't we have the same 
for the keyspace ?
( .setDefaultKeyspace("") will raise an exception)


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188545721
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -258,6 +259,17 @@ public CassandraSinkBuilder(DataStream input, 
TypeInformation typeInfo,
return this;
}
 
+   /**
+* Sets the keyspace to be used.
+*
+* @param keyspace keyspace to use
+* @return this builder
+*/
+   public CassandraSinkBuilder setKeyspace(String keyspace) {
--- End diff --

rename to `setDefaultKeyspace`


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188545817
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -410,6 +425,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
+   throw new IllegalArgumentException("Specifying 
a keyspace is only allowed when using a Pojo-Stream as input.");
--- End diff --

same as above


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188546432
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -429,6 +429,25 @@ public void testCassandraPojoAtLeastOnceSink() throws 
Exception {
Assert.assertEquals(20, rs.all().size());
}
 
+   @Test
+   public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() 
throws Exception {
+   session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"test2"));
--- End diff --

let's use a more descriptive table name, to avoid conflicts in the future.


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188545761
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -381,6 +393,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
+   throw new IllegalArgumentException("Specifying 
a keyspace is only allowed when using a Pojo-Stream as input.");
--- End diff --

"default keyspace"


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188546038
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -470,6 +488,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
+   throw new IllegalArgumentException("Specifying 
a keyspace is only allowed when using a Pojo-Stream as input.");
--- End diff --

same as above


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188545921
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -381,6 +393,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
--- End diff --

we only have to check for non-null.


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-08 Thread ctamisier
Github user ctamisier commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r186714037
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ---
@@ -85,7 +85,7 @@ public void onFailure(Throwable t) {
}
};
this.cluster = builder.getCluster();
-   this.session = cluster.connect();
+   this.session = 
cluster.connect(configuration.getString("keyspace", null));
--- End diff --

indeed!


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r186700920
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ---
@@ -85,7 +85,7 @@ public void onFailure(Throwable t) {
}
};
this.cluster = builder.getCluster();
-   this.session = cluster.connect();
+   this.session = 
cluster.connect(configuration.getString("keyspace", null));
--- End diff --

a cleaner approach would be to add a `protected Session 
createSession(Cluster cluster)` method. The default implementation returns 
`cluster.connect()`, and the pojo sink `cluster.connect(keyspace)`.


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r186673807
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
 ---
@@ -50,18 +51,31 @@
 * @param clazz Class instance
 */
public CassandraPojoSink(Class clazz, ClusterBuilder builder) {
-   this(clazz, builder, null);
+   this(clazz, builder, null, null);
}
 
public CassandraPojoSink(Class clazz, ClusterBuilder builder, 
@Nullable MapperOptions options) {
+   this(clazz, builder, options, null);
+   }
+
+   public CassandraPojoSink(Class clazz, ClusterBuilder builder, 
String keyspace) {
+   this(clazz, builder, null, keyspace);
+   }
+
+   public CassandraPojoSink(Class clazz, ClusterBuilder builder, 
@Nullable MapperOptions options, String keyspace) {
super(builder);
this.clazz = clazz;
this.options = options;
+   this.keyspace = keyspace;
}
 
@Override
public void open(Configuration configuration) {
+   if (keyspace != null && !keyspace.isEmpty()) {
+   configuration.setString("keyspace", keyspace);
--- End diff --

this has no effect as the configuration is never used anywhere, and thus 
can be removed.


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r186674717
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -429,6 +429,26 @@ public void testCassandraPojoAtLeastOnceSink() throws 
Exception {
Assert.assertEquals(20, rs.all().size());
}
 
+   @Test
+   public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() 
throws Exception {
--- End diff --

I don't understand how this test can succeed. We never set the keyspace 
anywhere, and the POJO's don't contain it either.


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-08 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r186673889
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -429,6 +429,26 @@ public void testCassandraPojoAtLeastOnceSink() throws 
Exception {
Assert.assertEquals(20, rs.all().size());
}
 
+   @Test
+   public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() 
throws Exception {
+   session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"test2"));
+
+   CassandraPojoSink sink = new 
CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class, builder);
+
+   Configuration configuration = new Configuration();
+   configuration.setString("keyspace", "flink");
--- End diff --

this has no effect and can be removed


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-07 Thread ctamisier
GitHub user ctamisier opened a pull request:

https://github.com/apache/flink/pull/5964

[FLINK-8655] [Cassandra Connector] add keyspace in cassandra sink builder

## What is the purpose of the change
This PR is an alternative to https://github.com/apache/flink/pull/5538.

## Brief change log
A discussion started on https://github.com/apache/flink/pull/5538 that i'm 
quoting here:
> What about using the Configuration that is provided in 
RichFunction.open(Configuration parameters) for the 
CassandraSinkBase.open(Configuration configuration) {...} implementation ?
> 
> I saw in old docs that Configuration can be used in the open(...) method 
but today (1.4+) it might not be a good practice anymore.
> 
> What about adding keyspace attribute in CassandraPojoSink and 
CassandraSinkBuilder (throwing exception when not using a 
CassandraPojoSinkBuilder for the moment).
> And create a new Configuration() with this keyspace in CassandraPojoSink.
> And finally do a cluster.connect(keyspace);
> 
> I've done this here if you can have a look.
> I've updated CassandraConnectorITCase with a new test.
> I would like to run CassandraPojoSinkExample.main() to cover the 
CassandraSink.addSink() mechanism, but it doesn't work for me (even on flink 
master branch).
> 
> Can this be a candidate for a PR, I am new to flink so it might break the 
flink good practice principles...
> Let me know!

The reply from zentol (Chesnay) (**I will check this point**):

> We would have to pass the keyspace via the constructor as the 
Configuration approach doesn't work for streaming.
> 
> Generally speaking it isn't a problem to set the keyspace when creating 
the connection. But I would like to know what happens if a POJO comes along 
that explicitly sets the keyspace; is it ignored, respected or will it cause an 
exception?

## Verifying this change

This change added tests and can be verified as follows:
- `testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink()` in 
`CassandraConnectorITCase.java`
- Need to run CassandraPojoSinkExample.main() to cover the 
CassandraSink.addSink() mechanism, but it doesn't work for me (even on flink 
master branch, **I will investigate**)

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **don't know**
  - The runtime per-record code paths (performance sensitive): **don't 
know**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **yes**
  - If yes, how is the feature documented? **JavaDocs**

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ctamisier/flink pojo-keyspace

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5964.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5964


commit 1ed3518a5f80c5e6bbbf70ab0b3c3d20b60f2e2f
Author: Clément Tamisier 
Date:   2018-05-06T15:04:18Z

[FLINK-8655] add keyspace in cassandra sink builder




---