Yes my issue is the lag increasing. We are using Spark Runner. Source is
Kafka and Sink is Cassandra. We tune the batch interval and max records per
batch but the batch interval still less than the processing time of each
batch. So it causes the latency. We tried to apply Reshuffle withRandomKey
on the collection after we read from Kafka but it does not help.

On Tue, Jul 28, 2020, 00:27 Chamikara Jayalath <[email protected]> wrote:

> Probably you should apply the Partition[1] transform on the output
> PCollection of your read. Note though that the exact parallelization is
> runner dependent (for example, runner might autoscale up resulting in more
> writers).
> Did you run into issues when just reading from Kafka and writing to Cassadra
> (without manually controlling the parallelization) ?
>
> Thanks,
> Cham
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
>
> On Thu, Jul 23, 2020 at 5:01 AM wang Wu <[email protected]> wrote:
>
>> Hi,
>> Supposed that a Kafka topic has 3 partitions only. Now we want to
>> partition it into 20 partition, each one will produce an output collection.
>> The purpose is to write to the sink in parallel from all 20 output
>> collections.
>>
>> Will this code achieve that purpose?
>>
>>  KafkaIO.Read<byte[], Long> reader =
>>       KafkaIO.<byte[], Long>read()
>>           .withConsumerFactoryFn(
>>               new ConsumerFactoryFn(
>>                   topic, 10, numElements, OffsetResetStrategy.EARLIEST))
>> // 10 partitions
>>
>>   PCollection<Long> input =
>> p.apply(reader.withoutMetadata()).apply(KafkaToCassandraRow).apply(CassadraIO.write);
>>
>> Regards
>> Dinh
>>
>>

Reply via email to