[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5964 ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user ctamisier commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r189056714 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -429,6 +429,26 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception { Assert.assertEquals(20, rs.all().size()); } + @Test + public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Exception { --- End diff -- Resolved. It was used in CassandraPojoSinkBuilder.createSink(), ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user ctamisier commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188727153 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -381,6 +393,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { --- End diff -- Oh yes! so i will change for CassandraTupleSinkBuilder, CassandraRowSinkBuilder and CassandraScalaProductSinkBuilder ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188722661 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -381,6 +393,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { --- End diff -- we eagerly check for empty strings for the query since that will definitely fail later on, since an empty query doesn't make sense. The check here is to prevent non-pojo sinks to be configured with pojo-specific options. An empty keyspace _should_ throw an exception in this case, as the method isn't even supposed to be used. ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user ctamisier commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188717750 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -381,6 +393,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { --- End diff -- there is a check on query.length() just above, shouldn't we have the same for the keyspace ? ( .setDefaultKeyspace("") will raise an exception) ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188545721 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -258,6 +259,17 @@ public CassandraSinkBuilder(DataStream input, TypeInformation typeInfo, return this; } + /** +* Sets the keyspace to be used. +* +* @param keyspace keyspace to use +* @return this builder +*/ + public CassandraSinkBuilder setKeyspace(String keyspace) { --- End diff -- rename to `setDefaultKeyspace` ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188545817 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -410,6 +425,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { + throw new IllegalArgumentException("Specifying a keyspace is only allowed when using a Pojo-Stream as input."); --- End diff -- same as above ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188546432 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -429,6 +429,25 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception { Assert.assertEquals(20, rs.all().size()); } + @Test + public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Exception { + session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test2")); --- End diff -- let's use a more descriptive table name, to avoid conflicts in the future. ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188545761 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -381,6 +393,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { + throw new IllegalArgumentException("Specifying a keyspace is only allowed when using a Pojo-Stream as input."); --- End diff -- "default keyspace" ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188546038 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -470,6 +488,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { + throw new IllegalArgumentException("Specifying a keyspace is only allowed when using a Pojo-Stream as input."); --- End diff -- same as above ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r188545921 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -381,6 +393,9 @@ protected void sanityCheck() { if (query == null || query.length() == 0) { throw new IllegalArgumentException("Query must not be null or empty."); } + if (keyspace != null && keyspace.length() != 0) { --- End diff -- we only have to check for non-null. ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user ctamisier commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r186714037 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java --- @@ -85,7 +85,7 @@ public void onFailure(Throwable t) { } }; this.cluster = builder.getCluster(); - this.session = cluster.connect(); + this.session = cluster.connect(configuration.getString("keyspace", null)); --- End diff -- indeed! ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r186700920 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java --- @@ -85,7 +85,7 @@ public void onFailure(Throwable t) { } }; this.cluster = builder.getCluster(); - this.session = cluster.connect(); + this.session = cluster.connect(configuration.getString("keyspace", null)); --- End diff -- a cleaner approach would be to add a `protected Session createSession(Cluster cluster)` method. The default implementation returns `cluster.connect()`, and the pojo sink `cluster.connect(keyspace)`. ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r186673807 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java --- @@ -50,18 +51,31 @@ * @param clazz Class instance */ public CassandraPojoSink(Class clazz, ClusterBuilder builder) { - this(clazz, builder, null); + this(clazz, builder, null, null); } public CassandraPojoSink(Class clazz, ClusterBuilder builder, @Nullable MapperOptions options) { + this(clazz, builder, options, null); + } + + public CassandraPojoSink(Class clazz, ClusterBuilder builder, String keyspace) { + this(clazz, builder, null, keyspace); + } + + public CassandraPojoSink(Class clazz, ClusterBuilder builder, @Nullable MapperOptions options, String keyspace) { super(builder); this.clazz = clazz; this.options = options; + this.keyspace = keyspace; } @Override public void open(Configuration configuration) { + if (keyspace != null && !keyspace.isEmpty()) { + configuration.setString("keyspace", keyspace); --- End diff -- this has no effect as the configuration is never used anywhere, and thus can be removed. ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r186674717 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -429,6 +429,26 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception { Assert.assertEquals(20, rs.all().size()); } + @Test + public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Exception { --- End diff -- I don't understand how this test can succeed. We never set the keyspace anywhere, and the POJO's don't contain it either. ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5964#discussion_r186673889 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -429,6 +429,26 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception { Assert.assertEquals(20, rs.all().size()); } + @Test + public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() throws Exception { + session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test2")); + + CassandraPojoSink sink = new CassandraPojoSink<>(PojoNoAnnotatedKeyspace.class, builder); + + Configuration configuration = new Configuration(); + configuration.setString("keyspace", "flink"); --- End diff -- this has no effect and can be removed ---
[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...
GitHub user ctamisier opened a pull request: https://github.com/apache/flink/pull/5964 [FLINK-8655] [Cassandra Connector] add keyspace in cassandra sink builder ## What is the purpose of the change This PR is an alternative to https://github.com/apache/flink/pull/5538. ## Brief change log A discussion started on https://github.com/apache/flink/pull/5538 that i'm quoting here: > What about using the Configuration that is provided in RichFunction.open(Configuration parameters) for the CassandraSinkBase.open(Configuration configuration) {...} implementation ? > > I saw in old docs that Configuration can be used in the open(...) method but today (1.4+) it might not be a good practice anymore. > > What about adding keyspace attribute in CassandraPojoSink and CassandraSinkBuilder (throwing exception when not using a CassandraPojoSinkBuilder for the moment). > And create a new Configuration() with this keyspace in CassandraPojoSink. > And finally do a cluster.connect(keyspace); > > I've done this here if you can have a look. > I've updated CassandraConnectorITCase with a new test. > I would like to run CassandraPojoSinkExample.main() to cover the CassandraSink.addSink() mechanism, but it doesn't work for me (even on flink master branch). > > Can this be a candidate for a PR, I am new to flink so it might break the flink good practice principles... > Let me know! The reply from zentol (Chesnay) (**I will check this point**): > We would have to pass the keyspace via the constructor as the Configuration approach doesn't work for streaming. > > Generally speaking it isn't a problem to set the keyspace when creating the connection. But I would like to know what happens if a POJO comes along that explicitly sets the keyspace; is it ignored, respected or will it cause an exception? ## Verifying this change This change added tests and can be verified as follows: - `testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink()` in `CassandraConnectorITCase.java` - Need to run CassandraPojoSinkExample.main() to cover the CassandraSink.addSink() mechanism, but it doesn't work for me (even on flink master branch, **I will investigate**) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **don't know** - The runtime per-record code paths (performance sensitive): **don't know** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **yes** - If yes, how is the feature documented? **JavaDocs** You can merge this pull request into a Git repository by running: $ git pull https://github.com/ctamisier/flink pojo-keyspace Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5964.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5964 commit 1ed3518a5f80c5e6bbbf70ab0b3c3d20b60f2e2f Author: Clément TamisierDate: 2018-05-06T15:04:18Z [FLINK-8655] add keyspace in cassandra sink builder ---