thanks Suurabh for clarification.

You are describing a common challenge in distributed systems like Spark
Streaming: stragglers. These are slow-running tasks that significantly
impact the overall performance of a job. In the context of Spark Streaming,
a slow executor processing a batch can delay the entire stream processing
pipeline. Let me think about your design and come back.

HTH

Mich Talebzadeh,
Architect | Data Science | Financial Crime | Forensic Analysis | GDPR

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>





On Mon, 27 Jan 2025 at 07:18, Saurabh Agrawal <agrawalsaurabh...@gmail.com>
wrote:

> Thanks Mich for the response,
>
> We are using Spark DStream with Kafka Integration here.
>
> THis behavior is not observed on other env we have the same application
> business logic. So not sure if it's related to the applicative business
> logic.
>
> One query, as we have 1200 partitions on this topic, and we have 120
> executors processing. Also we have Dynamic Resource Allocation (DRA)
> enabled here, so executors do scale up and scale down on load.
> So, due to this scale up/down also we see delay in processing as may be
> due to repartitioning of topic partition over executors as new consumers
> are getting added/deleted.
> Do you see this as a cause of the above problem? We tried to disable the
> DRA, and have better results so just asking.
>
> Also, need your thought on design, means Spark DStream work on batches
> which is created depending on polling time (in our case 0.5 - 3sec). Unless
> one batch gets completed, the new batch is not picked for processing.
> And, if one of the executors is slow on processing a task of one batch,
> though the other executors are done with its task, it has to wait for this
> executor to complete before picking the next batch for processing. Do you
> have any thoughts / suggestions over this?
> Thanks & Regards,
> Saurabh Agrawal
> Cell: (+91) 9049700365
>
>
> On Sun, Jan 26, 2025 at 8:12 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Ok let us have a look
>>
>> Your response is verbose. My assumption is that you are using Spark
>> Structured Streaming (SSS) which is possibly causing the high CPU
>> utilization for executors when processing messages. So SSS is used as a
>> streaming ETL pipeline. It consumes messages from Kafka, performs some
>> processing, and publishes the results to other Kafka topics.
>>
>> observation
>>
>>
>>    1. Small batches, high processing time: Smaller batches seem to take
>>    longer to process compared to larger batches. This is not expected for
>>    streaming applications, where smaller batches are generally expected to be
>>    faster to process.
>>    2. Increased processing time over time: Even with consistent input
>>    traffic, the processing time per batch increases over time, leading to lag
>>    buildup in Kafka.
>>    3. High CPU usage for some executors: Not all executors experience
>>    high CPU utilization, suggesting some tasks within the batches might be
>>    causing delays.
>>
>> From my experience, these observations point towards potential issues
>> with the Spark application's logic or configuration.
>>
>>
>>    - Complex Transformations: If the business logic applied to messages
>>    within SSS is complex, it could lead to increased processing time,
>>    especially for smaller batches. This might explain why smaller batches 
>> take
>>    longer than expected.
>>    - State Management: Spark Structured Streaming can maintain state
>>    information across micro-batches. If the state management is not 
>> optimized,
>>    it could lead to performance degradation over time, especially when 
>> dealing
>>    with continuous data streams.
>>    - Skewed Data or Slow Tasks: If there's skewed data distribution or
>>    specific tasks within the processing logic that are slow, it could 
>> overload
>>    some executors while others remain idle. This could explain the uneven CPU
>>    utilization across executors.
>>    - Network Timeouts: The document mentions Kafka connection timeout
>>    exceptions during message publishing. These timeouts could further
>>    contribute to increased processing time and lag build-up.
>>
>> While the document does not explicitly mention Spark Structured
>> Streaming, the characteristics of the application and the observed
>> behaviour strongly suggest its involvement. The high CPU utilization for
>> executors is likely a consequence of the application logic or configuration
>> issues within SSS that lead to slow processing of messages.
>>
>> My suggestions
>>
>>    - Analyze the business logic applied to messages within SSS to
>>    identify any potential bottlenecks or inefficiencies. Consider simplifying
>>    complex transformations or optimizing state management if necessary. Use
>>    Spark GUI on 4040 to see apparent issues such as slow or skewed tasks
>>    that might be overloading specific executors. Optimize these tasks or
>>    redistribute them for better load balancing.
>>    - Investigate Network Timeouts: Address the root cause of the Kafka
>>    connection timeout exceptions to prevent delays during message publishing.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>>
>>
>> On Sat, 25 Jan 2025 at 13:31, Saurabh Agrawal <
>> agrawalsaurabh...@gmail.com> wrote:
>>
>>>
>>>
>>>
>>>
>>> *Hi Team,I have been using Spark v3.5 Spark Stream functionality for the
>>> below use case. I am observing the issue below on one of the environments
>>> with Spark Stream. Please if I can get some assistance with the same will
>>> be of valuable help.Use case: *
>>>
>>> Smart is used as Spark streaming to consume messages from one source
>>> topic, business logic to filter out some of these messages, and publish
>>> accordingly on target topics.
>>>
>>>
>>>
>>> *Initial Configuration:*
>>>
>>> Kafka Source topic: 24 brokers, 400 partition
>>>
>>> Kafka Target topic: 200 partition
>>>
>>> executorsInstances: 15
>>>
>>> executors cores: 3
>>>
>>> Input rate: 200-250 messages/sec
>>>
>>> BackPressure=true
>>>
>>> BackPressureInitalRate: 200000(default in pom)
>>>
>>>
>>>
>>>
>>>
>>> Now they have a request that there will be some trigger of ban refresh
>>> occasionally which will be triggering messages at 800-1000 messages/sec.
>>>
>>>
>>>
>>> *New Configuration done:*
>>>
>>>
>>>
>>> *Stage 1:*
>>>
>>> Kafka Source topic: 24 brokers, 1200 partition
>>>
>>> Kafka Target topic: 800 partition
>>>
>>> executorsInstances: 120
>>>
>>> executors cores: 10
>>>
>>> Input rate: 200-250 messages/sec -  800-1000 messages/sec
>>>
>>> BackPressure=true
>>>
>>> BackPressureInitalRate: 100
>>>
>>> Spark batch schedule time – 0.5 sec
>>>
>>> maxRatePerPartition: 1000
>>>
>>>
>>>
>>>
>>>
>>> All 120 Executors come up at time of startup due to lag already piled up
>>> on Kafka.
>>>
>>>
>>>
>>> Problem:
>>>
>>>    1. When there is huge lag already in the system, and on startup of
>>>    application, due to backpressure initial rate, only 1 message were picked
>>>    up from each partition, and hence batch size of records in Spark stream
>>>    were 1200 only.
>>>    2. Over a period of time these delays get cleared, and the system
>>>    works fine without scheduling delays for a few hours with incoming 
>>> traffic
>>>    consistent to 100-200 tps.  This time the batch size goes to 50-100
>>>    records/batch
>>>    3. Now suddenly ban refresh gets triggered and incoming rate
>>>    increases to 800-1000 tps. This time, the executors are not able to cope 
>>> up
>>>    and the lags keep on increasing on Kafka side. And the processing time of
>>>    batches keeps getting reduced, eventually making the scheduling delay and
>>>    hence reducing the batch size to almost 15-20 records/batches.
>>>    4. Observation is small batches were taking more processing time
>>>    than big batches.
>>>    5. Another issue with NETWORK_CONNECTION error to get metadata from
>>>    Kafka while publishing
>>>
>>>
>>>
>>>
>>>
>>> Solution:
>>>
>>>    1. Removed the initial rate parameter of backpressure so that when
>>>    there are millions of records already piled up on Kafka, the batches
>>>    created will be of more records/batch than the original 1200 
>>> records/batch.
>>>    This is due to batches with more records that seem to have better
>>>    processing time than small records.
>>>    2. Also disabled the backpressure (though not ideal), so we have
>>>    consistent records/batch size to deal with and improve processing time.
>>>    3. Handled the NETWORK_CONNECTION issue.
>>>
>>>
>>>
>>>
>>>
>>> *Stage 2:*
>>>
>>> Kafka Source topic: 24 brokers, 1200 partition
>>>
>>> Kafka Target topic: 800 partition
>>>
>>> executorsInstances: 120
>>>
>>> executors cores: 10
>>>
>>> Input rate: 200-250 messages/sec -  800-1000 messages/sec
>>>
>>> BackPressure=false
>>>
>>> maxRatePerPartition: 1000
>>>
>>> Spark batch schedule time – 0.5 sec
>>>
>>>
>>>
>>> What went well:
>>>
>>>    1. Lags already accumulated get released faster with higher records
>>>    per batch (in millions).
>>>    2. NETWORK_CONECTION issue resolved
>>>
>>>
>>>
>>> Problem persist:
>>>
>>>    1. Once lag is cleared, and input traffic is consistent, still after
>>>    a period of time, the processing time keeps increasing and lag starts to
>>>    increase on Kafka.
>>>    2. And at times of ban refresh where traffic increases, the lag
>>>    keeps getting accumulated.
>>>    3. When the batches were of high records, and in the initial stage,
>>>    the processing time from smart is 2k-4k messages per sec. But over the
>>>    period of time, when the incoming data flow is less 200-400 mps and 
>>> batches
>>>    of ~300 records, this processing time reduces to 100-200 messages per 
>>> sec.
>>>    4. This also leads to an increase in lags when we have high load, as
>>>    the processing time is reduced.
>>>    5. *Not all executors are on high CPU usage, some at high speed and
>>>    some on low CPU.*
>>>
>>> Solution:
>>>
>>>    1. As more records/batches have better processing time, and overall
>>>    processing time is greater for batch, increase the scheduling delay of
>>>    batches from 0.5s to 3sec so the batches will get created of 600-700
>>>    records/batch.
>>>    2. Reduced the executorsInstance to 40 so will see the effectiveness
>>>    of CPU utilization and processing.
>>>
>>>
>>>
>>>
>>>
>>> *Stage 3:*
>>>
>>> Kafka Source topic: 24 brokers, 1200 partition
>>>
>>> Kafka Target topic: 800 partition
>>>
>>> executorsInstances: 40
>>>
>>> executors cores: 10
>>>
>>> Input rate: 200-250 messages/sec -  800-1000 messages/sec
>>>
>>> BackPressure=false
>>>
>>> maxRatePerPartition: 1000
>>>
>>> Spark batch polling time – 3 sec
>>>
>>>
>>>
>>> What went well:
>>>
>>>    1. Lags already accumulated get released faster with higher records
>>>    per batch (in millions).
>>>    2. With 3 sec polling time, less batches get created with more
>>>    records / batch
>>>
>>>
>>>
>>> Problem persist:
>>>
>>>    1. As we increase the polling time of batches, when the load is less
>>>    and eventually *processing time/batch duration* reduces, some
>>>    executors scale down due to Dynamic resource allocation enabled on Smart.
>>>    2. Once again, if the input load increases, new executors do scale
>>>    up, but not see they are on high CPU usage as with other executors.
>>>    3. *We also see heartbeat interval exception between driver and
>>>    executors connection in some executors*
>>>    4. *Some of the tasks in batches, takes 30 sec and more on one of
>>>    the executors, and hence impacting the overall processing time, and hence
>>>    delaying the next batch to execute. This lead to increase lag in Kafka*
>>>
>>>
>>>
>>> Reason for connection timeout:
>>>
>>>    1. Reason for the 30sec task in batch, we saw the logs and there
>>>    were some Kafka connection timeout exceptions that happen on publish in
>>>    these executors which delay the processing time of overall batch. Need to
>>>    resolve the same.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Overall memory Utilization was not found causing any issue. CPU
>>> utilization does go high for executors.
>>>
>>>
>>>
>>> Looking forward for the response.
>>> Thanks & Regards,
>>> Saurabh Agrawal
>>> Cell: (+91) 9049700365
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>

Reply via email to