[ 
https://issues.apache.org/jira/browse/KAFKA-4169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152281#comment-17152281
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-4169:
-----------------------------------------------------

I'm wondering if this is still a valid issue.

>From Joel's comments: Point 1 seems to be already solved. Comment now 
>mentions: "cap on the maximum uncompressed record batch size"

Current implementation main issue is "fail-fast" approach: if message size is 
higher than max, then fail.

The other alternative seems to be harder to implement as it will give feedback 
once final batch size is known–right before sending msg.

 

I have been exploring a simple way to improve the current implementation:

Keep the current fail fast approach with an optimistic validation based on 
CompressionRationEstimator value:
{code:java}
private void ensureValidRecordSize(String topic, CompressionType 
compressionType, int size) {
    float ratio = CompressionRatioEstimator.estimation(topic, compressionType);
    float expectedSize = size * ratio;
    if (expectedSize > maxRequestSize)
        throw new RecordTooLargeException("The message expected size using 
compression type " + compressionType + " is " + (int) expectedSize +
            " bytes when serialized which is larger than " + maxRequestSize + 
", which is the value of the " +
            ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration. (original 
size = " + size + " and expected ratio = " + ratio + ")");
    if (expectedSize > totalMemorySize)
        throw new RecordTooLargeException("The message expected size using 
compression type " + compressionType + " is " + (int) expectedSize +
            " bytes when serialized which is larger than the total memory 
buffer you have configured with the " +
            ProducerConfig.BUFFER_MEMORY_CONFIG +
            " configuration. (original size = " + size + " and expected ratio = 
" + ratio + ")");
}
{code}
This way, if a msg is at the edge of max message size, and previous msgs have 
give enough feedback, a msg will pass and be sent to Kafka.

There could be scenarios where msgs pass this validation but fail on the broker 
as compression didn't compress as much as expected.

 

This seems to be a better than the current approach but won't give consistent 
results as Estimator needs to warm up.

I'd like to check if this approach would provide enough value to be 
implemented, or if looking into the ProducerBatch would be the way to move 
forward here.

> Calculation of message size is too conservative for compressed messages
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-4169
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4169
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.10.0.1
>            Reporter: Dustin Cote
>            Priority: Major
>
> Currently the producer uses the uncompressed message size to check against 
> {{max.request.size}} even if a {{compression.type}} is defined.  This can be 
> reproduced as follows:
> {code}
> # dd if=/dev/zero of=/tmp/outsmaller.dat bs=1024 count=1000
> # cat /tmp/out.dat | bin/kafka-console-producer --broker-list localhost:9092 
> --topic tester --producer-property compression.type=gzip
> {code}
> The above code creates a file that is the same size as the default for 
> {{max.request.size}} and the added overhead of the message pushes the 
> uncompressed size over the limit.  Compressing the message ahead of time 
> allows the message to go through.  When the message is blocked, the following 
> exception is produced:
> {code}
> [2016-09-14 08:56:19,558] ERROR Error when sending message to topic tester 
> with key: null, value: 1048576 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1048610 bytes when serialized which is larger than the maximum request size 
> you have configured with the max.request.size configuration.
> {code}
> For completeness, I have confirmed that the console producer is setting 
> {{compression.type}} properly by enabling DEBUG so this appears to be a 
> problem in the size estimate of the message itself.  I would suggest we 
> compress before we serialize instead of the other way around to avoid this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to