[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5964 That the example doesn't work isn't related to this PR so I'll merge it as is. But I'll take a look what the problem is, and either fix it or open a JIRA. ---
[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...
Github user ctamisier commented on the issue: https://github.com/apache/flink/pull/5964 Ok great!, I'd like to mention that `CassandraPojoSinkExample.main()` doesn't seem to work: `source.getType()` is computed as a `GenericType` in the `env.fromCollection(messages);`. So when `CassandraSink.addSink(source)` is called the `java.lang.IllegalArgumentException: No support for the type of the given DataStream: GenericType` is thrown because it doesn't match `PojoTypeInfo` (`typeInfo instanceof PojoTypeInfo`, and neither the other candidates) as we would like to have I guess according to the `CassandraPojoSinkExample` javadocs. Maybe we can fix this as well for this PR ? (because CassandraPojoSinkExample.main() seems to be a concrete running example of the `CassandraPojoSinkBuilder` that this PR is modifying) ---
[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...
Github user ctamisier commented on the issue: https://github.com/apache/flink/pull/5964 Okay, I've pushed the changes. ---
[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...
Github user ctamisier commented on the issue: https://github.com/apache/flink/pull/5964 I pushed the changes about renaming the method to setDefaultKeyspace() and the message in the exception. The field is still 'keyspace', we dont rename it to 'defaultKeyspace' right ? ---
[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...
Github user ctamisier commented on the issue: https://github.com/apache/flink/pull/5964 Hi, is there something else we can do for this PR/issue ? ---
[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...
Github user ctamisier commented on the issue: https://github.com/apache/flink/pull/5964 I pushed the changes and refactored to use `sanityCheck` method to check the `keyspace` depending the builder in use. (like it is already done with `query`) ---
[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5964 h so that where it is used... ---
[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...
Github user ctamisier commented on the issue: https://github.com/apache/flink/pull/5964 > The keyspace field in the sink isn't used anywhere It is used in CassandraPojoSinkBuilder.createSink(), the keyspace is an argument of `new CassandraPojoSink<>(...);` ``` @Override public CassandraSink createSink() throws Exception { return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder, mapperOptions, keyspace)).name("Cassandra Sink")); } ``` > and the configuration operations don't have any effect either the keyspace is set in the configuration in the `"keyspace"` key. ``` if (keyspace != null && !keyspace.isEmpty()) { configuration.setString("keyspace", keyspace); } ``` which is then used to connect to cassandra with this keyspace: `cluster.connect(configuration.getString("keyspace", null));` Am I missing something ? PS: Instead of using the configuration object maybe we can just use `session` that is just created to execute a `session.executeAsync("USE " + keyspace);` like in Cluster.connectAsync(final String keyspace) ? ---
[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...
Github user ctamisier commented on the issue: https://github.com/apache/flink/pull/5964 @zentol, @StephanEwen, @Bekreth > We would have to pass the keyspace via the constructor as the Configuration approach doesn't work for streaming. I'm not sure to understand, the new unit test (`testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink`) is in the streaming package (`org.apache.flink.streaming.connectors.cassandra`) and it seems to work. Do I miss something about the general flink setup ? > Generally speaking it isn't a problem to set the keyspace when creating the connection. But I would like to know what happens if a POJO comes along that explicitly sets the keyspace; is it ignored, respected or will it cause an exception? It takes the keyspace from the `@Table`. see `AnnotationParser.parseEntity` method for `EntityMapper` creation using `ksName` It seems the default behavior of cassandra, I'm not sure we can do something here.. ---