Thank you. It works with the bellow code. Just 2 questions
1. Why if I initialise session inside the constructor FeatureToCassandraDoFn,
it becomes null every time I access it inside processElement?
2. Is this function applied in parallel for elements of unbounded collection?
The pipeline look like this:
KafkaIO.read().apply(TransformKafkaMessageToFeature).apply(FeatureToCassandraDoFn)
I am wondering if we write in parallel to Cassandra for all elements of
unbounded collection. If yes, how to control the parallelism.
Regards
Dinh
---
public class FeatureToCassandraDoFn extends DoFn<FeatureRow, Void> {
private transient Session session;
private transient PreparedStatement ps;
public FeatureToCassandraDoFn(CassandraConfig cassandraConfig) {
this.port = cassandraConfig.getPort();
this.hosts = Arrays.asList(cassandraConfig.getBootstrapHosts().split(","));
}
@Setup
public void setup() {
// Cassandra
Cluster cluster = getCluster(this.hosts, this.port);
this.session = cluster.newSession();
this.ps = session.prepare("insert into metis.store (entities, feature,
value) values(?,?,?) using TTL ?");
}
@ProcessElement
public void processElement(ProcessContext context) {
Rows rows = context.element();
BatchStatement batchStatement = new BatchStatement(Type.UNLOGGED);
// Add rows to batchStatement ...
// Execute it
ResultSetFuture resultSetFuture = this.session.executeAsync(batchStatement);
}
private Cluster getCluster() {...}
}
> On 20 Jul BE 2563, at 22:10, Alexey Romanenko <[email protected]>
> wrote:
>
> You can make them “transient” and instantiate in @Setup method of your DoFn
> (similar to what current CassandraIO's WriteFn does [1]).
>
> [1]
> https://github.com/apache/beam/blob/8b84720631c8f454881d20fc1aa7cec2bc380edf/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1139
>
> <https://github.com/apache/beam/blob/8b84720631c8f454881d20fc1aa7cec2bc380edf/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1139>
>
>> On 20 Jul 2020, at 12:49, wang Wu <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> But unfortunately that way will not work as Session and/or Cluster is not
>> serialisable.
>>
>> Regards
>> Dinh
>>
>>> On 20 Jul BE 2563, at 17:42, wang Wu <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> Hi,
>>>
>>> We are thinking of tuning connection pooling like this:
>>> https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/
>>> <https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/>
>>>
>>> I agree that current CassandraIO code does not open up for such
>>> modification/extension. Thus, we are trying to use DoFn instead.
>>>
>>> public class CustomCassandraWriteFn extends DoFn<CassandraBatch, Void> {
>>> Cluster cluster;
>>> Session session;
>>>
>>> public CustomCassandraWriteFn(CassandraConfig cassandraConfig) {
>>> PoolingOptions poolingOptions = new PoolingOptions();
>>> this.cluster = getCluster(
>>> config,
>>> poolingOptions
>>> );
>>> this.session = this.cluster.newSession();
>>> }
>>>
>>> @ProcessElement
>>> public void processElement(ProcessContext context) {
>>> CassandraBatch batch = context.element();
>>> for (CassandraMutation o : batch.rows) {
>>> this.session.executeAsync("xxx");
>>> }
>>>
>>> }
>>> }
>>>
>>>
>>> Regards
>>> Dinh
>>>
>>>> On 20 Jul BE 2563, at 17:34, Alexey Romanenko <[email protected]
>>>> <mailto:[email protected]>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> Could you tell, what kind of driver customisation you’d like to implement?
>>>>
>>>> Taking a look on current implementation of CassandraIO, I think that one
>>>> of the option could be just to add another configuration
>>>> “withSomeOption(...)” method and pass it to new Cluster instance
>>>> initialisation method.
>>>>
>>>> Another one, more sophisticated, is to implement a
>>>> “withClusterProvider(…)” method, which will allow to user to implement and
>>>> provide custom Cluster instance with all required configuration.
>>>>
>>>> In both cases, it will require CassandraIO modification.
>>>>
>>>>
>>>>> On 18 Jul 2020, at 13:11, wang Wu <[email protected]
>>>>> <mailto:[email protected]>> wrote:
>>>>>
>>>>> I notice that the standard Cassandra IO setup Cluster with basics
>>>>> settings. Is it possible to implement custom Cassandra IO in which I can
>>>>> customise Datastax driver? Any sample code will be helpful. Thanks
>>>>
>>>
>>
>