Re: poolingOptions not serializable?

2017-11-06 Thread Andrea Giordano
Without pooling options I have no errors and all works correctly (with a
light throughput ).

Trying to raise it, flink gave me a pool busy error about Cassandra So I
used pooling options. Now when I start the program I have the problem
described here
El 6 nov. 2017 9:48, "Nicolas Guyomar"  escribió:

> Hi Andrea,
>
> Do you have the error using the builder ?
>
> PoolingOptions poolingOptions = new PoolingOptions();
> poolingOptions
> .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
> .setMaxRequestsPerConnection(HostDistance.REMOTE, 1);
>
>
> Builder builder = Cluster.builder();
> builder.addContactPoint(CASSANDRA_ADDRESS);
> builder.withPort(CASSANDRA_PORT);
> builder.withPoolingOptions(poolingOptions);
>
>
> sinkBuilderNormalStream
> .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
> + " (user, sensor, timestamp, rdf_stream, observed_value, value)"
> + " VALUES (?, ?, ?, ?, ?, ?);")
> .setClusterBuilder(builder)
> .build();
>
>
> On 4 November 2017 at 19:27, Andrea Giordano <
> andrea.giordano@gmail.com> wrote:
>
>> Hi,
>> I’m using datastax driver to use Cassandra as sink for some data streams
>> with Apache Flink:
>> I have a problem executing my application raising an error about the full
>> queue. I discovered that the default value is 256, probably too low for my
>> load, so I have raised it using poolingOptions setting
>> maxRequestsPerConnection as suggested here: http://docs.datastax.com
>> /en/developer/java-driver/3.1/manual/pooling/.
>>
>> Unfortunately with the following code I obtain the following error when I
>> launch it:
>>
>> The implementation of the ClusterBuilder is not serializable.
>> The object probably contains or references non serializable fields.
>>
>>
>> My code:
>>
>>
>> PoolingOptions poolingOptions = new PoolingOptions();
>> poolingOptions
>>   .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
>>   .setMaxRequestsPerConnection(HostDistance.REMOTE, 1);
>>
>>
>> ClusterBuilder cassandraBuilder = new ClusterBuilder() {
>> private static final long serialVersionUID = 1L;
>>
>> @Override
>> public Cluster buildCluster(Cluster.Builder builder) {
>> return builder.addContactPoint(CASSANDRA_ADDRESS).withPort(CASSANDRA_PORT
>> )..withPoolingOptions(poolingOptions).build();
>> }
>> };
>>
>>
>> sinkBuilderNormalStream
>> .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
>> + " (user, sensor, timestamp, rdf_stream, observed_value, value)"
>> + " VALUES (?, ?, ?, ?, ?, ?);")
>> .setClusterBuilder(cassandraBuilder)
>> .build();
>>
>>
>> How can I deal with it?
>>
>
>


Re: poolingOptions not serializable?

2017-11-06 Thread Nicolas Guyomar
Hi Andrea,

Do you have the error using the builder ?

PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions
.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
.setMaxRequestsPerConnection(HostDistance.REMOTE, 1);


Builder builder = Cluster.builder();
builder.addContactPoint(CASSANDRA_ADDRESS);
builder.withPort(CASSANDRA_PORT);
builder.withPoolingOptions(poolingOptions);


sinkBuilderNormalStream
.setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
+ " (user, sensor, timestamp, rdf_stream, observed_value, value)"
+ " VALUES (?, ?, ?, ?, ?, ?);")
.setClusterBuilder(builder)
.build();


On 4 November 2017 at 19:27, Andrea Giordano 
wrote:

> Hi,
> I’m using datastax driver to use Cassandra as sink for some data streams
> with Apache Flink:
> I have a problem executing my application raising an error about the full
> queue. I discovered that the default value is 256, probably too low for my
> load, so I have raised it using poolingOptions setting
> maxRequestsPerConnection as suggested here: http://docs.datastax.
> com/en/developer/java-driver/3.1/manual/pooling/.
>
> Unfortunately with the following code I obtain the following error when I
> launch it:
>
> The implementation of the ClusterBuilder is not serializable.
> The object probably contains or references non serializable fields.
>
>
> My code:
>
>
> PoolingOptions poolingOptions = new PoolingOptions();
> poolingOptions
>   .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
>   .setMaxRequestsPerConnection(HostDistance.REMOTE, 1);
>
>
> ClusterBuilder cassandraBuilder = new ClusterBuilder() {
> private static final long serialVersionUID = 1L;
>
> @Override
> public Cluster buildCluster(Cluster.Builder builder) {
> return builder.addContactPoint(CASSANDRA_ADDRESS).withPort(CASSANDRA_PORT
> )..withPoolingOptions(poolingOptions).build();
> }
> };
>
>
> sinkBuilderNormalStream
> .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
> + " (user, sensor, timestamp, rdf_stream, observed_value, value)"
> + " VALUES (?, ?, ?, ?, ?, ?);")
> .setClusterBuilder(cassandraBuilder)
> .build();
>
>
> How can I deal with it?
>