Please, see my answers inline. > On 23 Jul 2020, at 17:10, wang Wu <[email protected]> wrote: > > Thank you. It works with the bellow code. Just 2 questions > 1. Why if I initialise session inside the constructor FeatureToCassandraDoFn, > it becomes null every time I access it inside processElement?
Because it’s defined as “transient”. When your ParDo transform “executes” DoFn on worker, it uses “serialised” instance of this - so transient members won’t be serialised. That is why it’s recommended to use @Setup method for initialisation of non-serialisable members. Also, the same instance of DoFn can be used to process one or more bundles on the same worker but there is no guarantee on how many. I’d recommend to take a look on sequence diagram “DoFn lifecycle" on ParDo doc page for more details [1] > 2. Is this function applied in parallel for elements of unbounded collection? > The pipeline look like this: Yes. The elements of input PCollection (bounded or unbounded) will be divided into bundles and every bundle will be processed on specific worker by DoFn instance. ParDo Javadoc explains it more details [2] > > KafkaIO.read().apply(TransformKafkaMessageToFeature).apply(FeatureToCassandraDoFn) > > I am wondering if we write in parallel to Cassandra for all elements of > unbounded collection. If yes, how to control the parallelism. If you mean the number of parallel instances, then it should be your data processing engine and runner responsibility. Beam doesn’t expose it to user afaik. [1] https://beam.apache.org/documentation/programming-guide/#pardo <https://beam.apache.org/documentation/programming-guide/#pardo> [2] https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/transforms/ParDo.html <https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/transforms/ParDo.html> Alexey > > Regards > Dinh > > --- > > public class FeatureToCassandraDoFn extends DoFn<FeatureRow, Void> { > > private transient Session session; > private transient PreparedStatement ps; > > public FeatureToCassandraDoFn(CassandraConfig cassandraConfig) { > this.port = cassandraConfig.getPort(); > this.hosts = > Arrays.asList(cassandraConfig.getBootstrapHosts().split(",")); > } > > @Setup > public void setup() { > // Cassandra > Cluster cluster = getCluster(this.hosts, this.port); > this.session = cluster.newSession(); > this.ps = session.prepare("insert into metis.store (entities, feature, > value) values(?,?,?) using TTL ?"); > } > > @ProcessElement > public void processElement(ProcessContext context) { > Rows rows = context.element(); > BatchStatement batchStatement = new BatchStatement(Type.UNLOGGED); > // Add rows to batchStatement ... > > // Execute it > ResultSetFuture resultSetFuture = > this.session.executeAsync(batchStatement); > } > > private Cluster getCluster() {...} > } > >> On 20 Jul BE 2563, at 22:10, Alexey Romanenko <[email protected] >> <mailto:[email protected]>> wrote: >> >> You can make them “transient” and instantiate in @Setup method of your DoFn >> (similar to what current CassandraIO's WriteFn does [1]). >> >> [1] >> https://github.com/apache/beam/blob/8b84720631c8f454881d20fc1aa7cec2bc380edf/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1139 >> >> <https://github.com/apache/beam/blob/8b84720631c8f454881d20fc1aa7cec2bc380edf/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1139> >> >>> On 20 Jul 2020, at 12:49, wang Wu <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> But unfortunately that way will not work as Session and/or Cluster is not >>> serialisable. >>> >>> Regards >>> Dinh >>> >>>> On 20 Jul BE 2563, at 17:42, wang Wu <[email protected] >>>> <mailto:[email protected]>> wrote: >>>> >>>> Hi, >>>> >>>> We are thinking of tuning connection pooling like this: >>>> https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/ >>>> <https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/> >>>> >>>> I agree that current CassandraIO code does not open up for such >>>> modification/extension. Thus, we are trying to use DoFn instead. >>>> >>>> public class CustomCassandraWriteFn extends DoFn<CassandraBatch, Void> { >>>> Cluster cluster; >>>> Session session; >>>> >>>> public CustomCassandraWriteFn(CassandraConfig cassandraConfig) { >>>> PoolingOptions poolingOptions = new PoolingOptions(); >>>> this.cluster = getCluster( >>>> config, >>>> poolingOptions >>>> ); >>>> this.session = this.cluster.newSession(); >>>> } >>>> >>>> @ProcessElement >>>> public void processElement(ProcessContext context) { >>>> CassandraBatch batch = context.element(); >>>> for (CassandraMutation o : batch.rows) { >>>> this.session.executeAsync("xxx"); >>>> } >>>> >>>> } >>>> } >>>> >>>> >>>> Regards >>>> Dinh >>>> >>>>> On 20 Jul BE 2563, at 17:34, Alexey Romanenko <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> Could you tell, what kind of driver customisation you’d like to >>>>> implement? >>>>> >>>>> Taking a look on current implementation of CassandraIO, I think that one >>>>> of the option could be just to add another configuration >>>>> “withSomeOption(...)” method and pass it to new Cluster instance >>>>> initialisation method. >>>>> >>>>> Another one, more sophisticated, is to implement a >>>>> “withClusterProvider(…)” method, which will allow to user to implement >>>>> and provide custom Cluster instance with all required configuration. >>>>> >>>>> In both cases, it will require CassandraIO modification. >>>>> >>>>> >>>>>> On 18 Jul 2020, at 13:11, wang Wu <[email protected] >>>>>> <mailto:[email protected]>> wrote: >>>>>> >>>>>> I notice that the standard Cassandra IO setup Cluster with basics >>>>>> settings. Is it possible to implement custom Cassandra IO in which I can >>>>>> customise Datastax driver? Any sample code will be helpful. Thanks >>>>> >>>> >>> >> >
