You can make them “transient” and instantiate in @Setup method of your DoFn 
(similar to what current CassandraIO's WriteFn does [1]).

[1] 
https://github.com/apache/beam/blob/8b84720631c8f454881d20fc1aa7cec2bc380edf/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1139

> On 20 Jul 2020, at 12:49, wang Wu <[email protected]> wrote:
> 
> But unfortunately that way will not work as Session and/or Cluster is not 
> serialisable.
> 
> Regards
> Dinh
> 
>> On 20 Jul BE 2563, at 17:42, wang Wu <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> Hi,
>> 
>> We are thinking of tuning connection pooling like this:
>> https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/ 
>> <https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/>
>> 
>> I agree that current CassandraIO code does not open up for such 
>> modification/extension. Thus, we are trying to use DoFn instead.
>> 
>> public class CustomCassandraWriteFn extends DoFn<CassandraBatch, Void> {
>>   Cluster cluster;
>>   Session session;
>> 
>>   public CustomCassandraWriteFn(CassandraConfig cassandraConfig) {
>>     PoolingOptions poolingOptions = new PoolingOptions();
>>     this.cluster = getCluster(
>>         config,
>>      poolingOptions
>>     );
>>     this.session = this.cluster.newSession();
>>   }
>> 
>>   @ProcessElement
>>   public void processElement(ProcessContext context) {
>>     CassandraBatch batch = context.element();
>>     for (CassandraMutation o : batch.rows) {
>>       this.session.executeAsync("xxx");
>>     }
>> 
>>   }
>> }
>> 
>> 
>> Regards
>> Dinh
>> 
>>> On 20 Jul BE 2563, at 17:34, Alexey Romanenko <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> 
>>> Hi,
>>> 
>>> Could you tell, what kind of driver customisation you’d like to implement? 
>>> 
>>> Taking a look on current implementation of CassandraIO, I think that one of 
>>> the option could be just to add another configuration “withSomeOption(...)” 
>>> method and pass it to new Cluster instance initialisation method. 
>>> 
>>> Another one, more sophisticated, is to implement a “withClusterProvider(…)” 
>>> method, which will allow to user to implement and provide custom Cluster 
>>> instance with all required configuration.
>>> 
>>> In both cases, it will require CassandraIO modification.
>>> 
>>> 
>>>> On 18 Jul 2020, at 13:11, wang Wu <[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>> 
>>>> I notice that the standard Cassandra IO setup Cluster with basics 
>>>> settings. Is it possible to implement custom Cassandra IO in which I can 
>>>> customise Datastax driver? Any sample code will be helpful. Thanks
>>> 
>> 
> 

Reply via email to