[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...

2018-05-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5964
  
That the example doesn't work isn't related to this PR so I'll merge it as 
is.
But I'll take a look what the problem is, and either fix it or open a JIRA.


---


[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...

2018-05-22 Thread ctamisier
Github user ctamisier commented on the issue:

https://github.com/apache/flink/pull/5964
  
Ok great!, I'd like to mention that `CassandraPojoSinkExample.main()` 
doesn't seem to work:
`source.getType()` is computed as a `GenericType` in the 
`env.fromCollection(messages);`.

So when `CassandraSink.addSink(source)` is called the  
`java.lang.IllegalArgumentException: No support for the type of the given 
DataStream: 
GenericType` 
is thrown because it doesn't match `PojoTypeInfo` (`typeInfo instanceof 
PojoTypeInfo`, and neither the other candidates) as we would like to have I 
guess according to the `CassandraPojoSinkExample` javadocs.

Maybe we can fix this as well for this PR ? (because 
CassandraPojoSinkExample.main() seems to be a concrete running example of the 
`CassandraPojoSinkBuilder` that this PR is modifying)


---


[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...

2018-05-17 Thread ctamisier
Github user ctamisier commented on the issue:

https://github.com/apache/flink/pull/5964
  
Okay, I've pushed the changes.


---


[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...

2018-05-16 Thread ctamisier
Github user ctamisier commented on the issue:

https://github.com/apache/flink/pull/5964
  
I pushed the changes about renaming the method to setDefaultKeyspace() and 
the message in the exception. The field is still 'keyspace', we dont rename it 
to 'defaultKeyspace' right ? 


---


[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...

2018-05-14 Thread ctamisier
Github user ctamisier commented on the issue:

https://github.com/apache/flink/pull/5964
  
Hi, is there something else we can do for this PR/issue ?


---


[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...

2018-05-08 Thread ctamisier
Github user ctamisier commented on the issue:

https://github.com/apache/flink/pull/5964
  
I pushed the changes and refactored to use `sanityCheck` method to check 
the `keyspace` depending the builder in use. (like it is already done with 
`query`)


---


[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...

2018-05-08 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5964
  
h so that where it is used...


---


[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...

2018-05-08 Thread ctamisier
Github user ctamisier commented on the issue:

https://github.com/apache/flink/pull/5964
  
> The keyspace field in the sink isn't used anywhere

It is used in CassandraPojoSinkBuilder.createSink(), the keyspace is an 
argument of `new CassandraPojoSink<>(...);`
```
@Override
public CassandraSink createSink() throws Exception {
return new CassandraSink<>(input.addSink(new 
CassandraPojoSink<>(typeInfo.getTypeClass(), builder, mapperOptions, 
keyspace)).name("Cassandra Sink"));
}
```

> and the configuration operations don't have any effect either

the keyspace is set in the configuration in the `"keyspace"` key.
```
if (keyspace != null && !keyspace.isEmpty()) {
configuration.setString("keyspace", keyspace);
}
```
which is then used to connect to cassandra with this keyspace:
`cluster.connect(configuration.getString("keyspace", null));`

Am I missing something ?

PS:
Instead of using the configuration object maybe we can just use `session` 
that is just created to execute a `session.executeAsync("USE " + keyspace);` 
like in Cluster.connectAsync(final String keyspace) ?



---


[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...

2018-05-08 Thread ctamisier
Github user ctamisier commented on the issue:

https://github.com/apache/flink/pull/5964
  
@zentol, @StephanEwen, @Bekreth
> We would have to pass the keyspace via the constructor as the 
Configuration approach doesn't work for streaming.

I'm not sure to understand, the new unit test 
(`testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink`) is in the streaming 
package (`org.apache.flink.streaming.connectors.cassandra`) and it seems to 
work. Do I miss something about the general flink setup ?

> Generally speaking it isn't a problem to set the keyspace when creating 
the connection. But I would like to know what happens if a POJO comes along 
that explicitly sets the keyspace; is it ignored, respected or will it cause an 
exception?

It takes the keyspace from the `@Table`. see `AnnotationParser.parseEntity` 
method for `EntityMapper` creation using `ksName`
It seems the default behavior of cassandra, I'm not sure we can do 
something here..


---