Thank you. It works with the bellow code. Just 2 questions
1. Why if I initialise session inside the constructor FeatureToCassandraDoFn, 
it becomes null every time I access it inside processElement?
2. Is this function applied in parallel for elements of unbounded collection? 
The pipeline look like this:

KafkaIO.read().apply(TransformKafkaMessageToFeature).apply(FeatureToCassandraDoFn)

I am wondering if we write in parallel to Cassandra for all elements of 
unbounded collection. If yes, how to control the parallelism.

Regards
Dinh

---

public class FeatureToCassandraDoFn extends DoFn<FeatureRow, Void> {

  private transient Session session;
  private transient PreparedStatement ps;

  public FeatureToCassandraDoFn(CassandraConfig cassandraConfig) {
    this.port = cassandraConfig.getPort();
    this.hosts = Arrays.asList(cassandraConfig.getBootstrapHosts().split(","));
  }

  @Setup
  public void setup() {
    // Cassandra
    Cluster cluster = getCluster(this.hosts, this.port);
    this.session = cluster.newSession();
    this.ps = session.prepare("insert into metis.store (entities, feature, 
value) values(?,?,?) using TTL ?");
  }
  
  @ProcessElement
  public void processElement(ProcessContext context) {
    Rows rows = context.element();
    BatchStatement batchStatement = new BatchStatement(Type.UNLOGGED);
    // Add rows to batchStatement ...
        
        // Execute it
    ResultSetFuture resultSetFuture = this.session.executeAsync(batchStatement);
  }

  private Cluster getCluster() {...}
}

> On 20 Jul BE 2563, at 22:10, Alexey Romanenko <[email protected]> 
> wrote:
> 
> You can make them “transient” and instantiate in @Setup method of your DoFn 
> (similar to what current CassandraIO's WriteFn does [1]).
> 
> [1] 
> https://github.com/apache/beam/blob/8b84720631c8f454881d20fc1aa7cec2bc380edf/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1139
>  
> <https://github.com/apache/beam/blob/8b84720631c8f454881d20fc1aa7cec2bc380edf/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1139>
> 
>> On 20 Jul 2020, at 12:49, wang Wu <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> But unfortunately that way will not work as Session and/or Cluster is not 
>> serialisable.
>> 
>> Regards
>> Dinh
>> 
>>> On 20 Jul BE 2563, at 17:42, wang Wu <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> 
>>> Hi,
>>> 
>>> We are thinking of tuning connection pooling like this:
>>> https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/ 
>>> <https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/>
>>> 
>>> I agree that current CassandraIO code does not open up for such 
>>> modification/extension. Thus, we are trying to use DoFn instead.
>>> 
>>> public class CustomCassandraWriteFn extends DoFn<CassandraBatch, Void> {
>>>   Cluster cluster;
>>>   Session session;
>>> 
>>>   public CustomCassandraWriteFn(CassandraConfig cassandraConfig) {
>>>     PoolingOptions poolingOptions = new PoolingOptions();
>>>     this.cluster = getCluster(
>>>         config,
>>>     poolingOptions
>>>     );
>>>     this.session = this.cluster.newSession();
>>>   }
>>> 
>>>   @ProcessElement
>>>   public void processElement(ProcessContext context) {
>>>     CassandraBatch batch = context.element();
>>>     for (CassandraMutation o : batch.rows) {
>>>       this.session.executeAsync("xxx");
>>>     }
>>> 
>>>   }
>>> }
>>> 
>>> 
>>> Regards
>>> Dinh
>>> 
>>>> On 20 Jul BE 2563, at 17:34, Alexey Romanenko <[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> Could you tell, what kind of driver customisation you’d like to implement? 
>>>> 
>>>> Taking a look on current implementation of CassandraIO, I think that one 
>>>> of the option could be just to add another configuration 
>>>> “withSomeOption(...)” method and pass it to new Cluster instance 
>>>> initialisation method. 
>>>> 
>>>> Another one, more sophisticated, is to implement a 
>>>> “withClusterProvider(…)” method, which will allow to user to implement and 
>>>> provide custom Cluster instance with all required configuration.
>>>> 
>>>> In both cases, it will require CassandraIO modification.
>>>> 
>>>> 
>>>>> On 18 Jul 2020, at 13:11, wang Wu <[email protected] 
>>>>> <mailto:[email protected]>> wrote:
>>>>> 
>>>>> I notice that the standard Cassandra IO setup Cluster with basics 
>>>>> settings. Is it possible to implement custom Cassandra IO in which I can 
>>>>> customise Datastax driver? Any sample code will be helpful. Thanks
>>>> 
>>> 
>> 
> 

Reply via email to