You can make them “transient” and instantiate in @Setup method of your DoFn (similar to what current CassandraIO's WriteFn does [1]).
[1] https://github.com/apache/beam/blob/8b84720631c8f454881d20fc1aa7cec2bc380edf/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1139 > On 20 Jul 2020, at 12:49, wang Wu <[email protected]> wrote: > > But unfortunately that way will not work as Session and/or Cluster is not > serialisable. > > Regards > Dinh > >> On 20 Jul BE 2563, at 17:42, wang Wu <[email protected] >> <mailto:[email protected]>> wrote: >> >> Hi, >> >> We are thinking of tuning connection pooling like this: >> https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/ >> <https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/> >> >> I agree that current CassandraIO code does not open up for such >> modification/extension. Thus, we are trying to use DoFn instead. >> >> public class CustomCassandraWriteFn extends DoFn<CassandraBatch, Void> { >> Cluster cluster; >> Session session; >> >> public CustomCassandraWriteFn(CassandraConfig cassandraConfig) { >> PoolingOptions poolingOptions = new PoolingOptions(); >> this.cluster = getCluster( >> config, >> poolingOptions >> ); >> this.session = this.cluster.newSession(); >> } >> >> @ProcessElement >> public void processElement(ProcessContext context) { >> CassandraBatch batch = context.element(); >> for (CassandraMutation o : batch.rows) { >> this.session.executeAsync("xxx"); >> } >> >> } >> } >> >> >> Regards >> Dinh >> >>> On 20 Jul BE 2563, at 17:34, Alexey Romanenko <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> Hi, >>> >>> Could you tell, what kind of driver customisation you’d like to implement? >>> >>> Taking a look on current implementation of CassandraIO, I think that one of >>> the option could be just to add another configuration “withSomeOption(...)” >>> method and pass it to new Cluster instance initialisation method. >>> >>> Another one, more sophisticated, is to implement a “withClusterProvider(…)” >>> method, which will allow to user to implement and provide custom Cluster >>> instance with all required configuration. >>> >>> In both cases, it will require CassandraIO modification. >>> >>> >>>> On 18 Jul 2020, at 13:11, wang Wu <[email protected] >>>> <mailto:[email protected]>> wrote: >>>> >>>> I notice that the standard Cassandra IO setup Cluster with basics >>>> settings. Is it possible to implement custom Cassandra IO in which I can >>>> customise Datastax driver? Any sample code will be helpful. Thanks >>> >> >
