Arvindan,
Based on what you have it looks like shuffle is not needed between
Kafka->ParquetWriter. The decision to use parallel partiion should be
ideally based the need to shuffle. If so option [1] should not be used per
se. Why even bother to shuffle if you do not need to.

Assuming the ask is really between option [2] and [3], the bottleneck in
your mini-dags (each parallel partition) will dictate the number of
partitions. So here are the questions I would ask
1. What are the minimum number of ParquetWriters you need to meet your SLA?
// benchmark each to find out
2. Are you using 120 KafkaConsumers because there are 120 topics? or is it
N topics/120 (where N >> 120) to balance the load? i.e. do you have a
choice to have more kafka partitions

In general for parallel partiion, the number of partitions should as per
the bottleneck operator. Assuming in this case it is Parquet and the answer
is 200 partitions, why would 200 Kafka->200 Parquet with container local
setup not be ok. If they share containers, the memory may actually be same
or lower than 300 separate containers (100 Kafka + 200 Parquet). Kafka and
Parquet most likely do not compete for same resource (I/O vs CPU), so
option [4] as follows should work

4.
- Figure out minimum Parquet writers to meet your SLA
- Bump up Kafka to those many partitions
- Use container local + parallel partition
- Experiment with container memory size to bring net memory down
In this case, you will save on I/O between Kafka->Parquet routing through
NIC; no serialization between Kafka->Parquet. There is a small chance that
thread-local between Kafka->Parquet may work, which will move back-pressure
(from some topic skew, spikes, ....) to Kafka publisher.

Thks
Amol



On Thu, Dec 22, 2016 at 11:06 AM, Pramod Immaneni <[email protected]>
wrote:

> Arvindan,
>
> When you had the MxN case with 100 kafka consumers sending to 120 parquet
> writers what was the cpu utilization of the parquet containers. Was it
> close to 100% or did you have spare cycles? I am trying to determine if it
> is an IO bottleneck or processing.
>
> Thanks
>
> On Thu, Dec 22, 2016 at 10:50 AM, Arvindan Thulasinathan <
> [email protected]> wrote:
>
>> Hi,
>>   We have an Apex Application which has a DAG structure like this:
>>
>> KafkaConsumer —>  ParquetWriter
>>
>> The KafkaConsumer is running at a scale where we have 100 containers for
>> consumer consuming from a Kafka-Cluster with an incoming rate of 300K
>> msg/sec and each message is about 1KB (Each message is a highly nested Avro
>> message). We arrived at the 100 container number for the consumer in order
>> to keep-up with the incoming traffic. The ParquetWriter is a bit CPU
>> intensive in our case and we thought we may require about 120 -
>> 130Containers.
>>
>>
>> We have 3 different observations here:
>>   1. Using 100 KafkaConsumer and 120 ParquetWriter Without Partition
>> Parallel:
>>                  In this case, Apex automatically introduces a
>> pass-through unifier.  In this scenario, we have seen that invariably
>> ParquetWriter processes tuples at a lesser rate than KafkaConsumer’s emit
>> rate. That is, if Consumer emits Tuples at the rate of 20 million/sec, the
>> ParquetWriter will only write at the rate of 17 million/sec. Also, it
>> invariably leads to backpressure and makes the consumer consume at a lower
>> rate. I have tried going beyond 120 containers as well and
>> I believe a possible reason could be - Unifier and Writer are in the same
>> container and presumably share the same core. And hence they are slower? Is
>> this observation correct? I tried tuning by increasing the
>> Writer.Inputport.QUEUE_SIZE to 10K. The queue is not even getting half
>> full, but still the back-pressure is created for the consumer. Is there any
>> additional tune-up that I can do, to:
>>    A. Make the writer process tuples at almost the same pace as Consumer
>> without backpressure on the Consumer
>>
>>   2. Using Partition Parallel with 120 KafkaConsumer and 120
>> ParquetWriter:
>>         In this scenario as well, we have seen that ParquetWriter
>> processes tuples at a lesser rate than KafkaConsumer’s emit rate. That is,
>> if Consumer emits Tuples at the rate of 20 million/sec, the ParquetWriter
>> will only write at the rate of 19 million/sec. This behavior is true even
>> if we keep increasing the Consumers and writers to 130 or 140 containers. I
>> believe this behavior is because we think the wrier is a bit CPU intensive.
>>
>> 3. Using a different DAG structure like this:
>>
>>      KafkaConsumer —> ParquetWriter1
>>               |
>>               |——————> ParquetWriter2
>>
>>   In this case, both ParquetWriter1 and ParquetWriter2 are same writers.
>> But each have a Partition_Parallel relationship with KafkaConsumer. Each is
>> of 100 containers. So 300 containers in total. The KafkaConsumer sends 1
>> half of messages to ParquetWriter1 and the other half to ParquetWriter2.
>> That is, out of 20 million/sec emitted by Consumer, 10Million is handled by
>> each Writer group per sec. This setup works fine for our traffic. But, in
>> the place where we would have required 20-30 additional Writer contianers,
>> we are using 100 because partition_parallel seems to avoid all the slowness
>> that we faced in approach[1].
>>
>> We wanted to know if there is a way to optimize approach [1] or [2] and
>> hence use lesser containers than go with [3].
>>
>>
>> Thanks,
>> Aravindan
>>
>>
>>
>>
>>
>>
>>
>

Reply via email to