Hi, I was 100% sure that Kafka broker didn't compress data and I didn't think that I had to upgrade my broker to 8.2.2.
I tried the upgrade and It works right now! I still don't understand, why the broker need to compress data again (if the data compression is already done in the producer). Have you a link for wiki, documentation or other to share about that? Anyway, thanks for your help to solve this mistake. Regards, Jérôme 2015-10-20 1:26 GMT+02:00 Jun Rao <j...@confluent.io>: > You will need to upgrade the broker to 0.8.2.2. Broker currently > recompresses messages. In 0.8.2.1, the snappy jar has a bug that causes > data explosion. We fixed the snappy jar in 0.8.2.2. If you upgrade the > broker to 0.8.2.2, it will pick up the fixed snappy jar. > > Thanks, > > Jun > > On Sat, Oct 17, 2015 at 1:21 AM, Jérôme BAROTIN <jer...@barotin.fr> wrote: > >> Hello, >> >> I want to check if the snappy compression works well with the java Kafka >> client. >> >> In order to handle this, I set up a small program. This program >> generate 1024 messages of readable data. Their size are of 1024 bytes >> each. I send these messages on tree new topics and after I check the >> size of these topic directly on the broker filesystem. >> >> You can find this program through the following java code : >> >> package unit_test.testCompress; >> >> import java.util.HashMap; >> import java.util.Map; >> import java.util.Random; >> import java.util.concurrent.Future; >> >> import org.apache.kafka.clients.producer.KafkaProducer; >> import org.apache.kafka.clients.producer.ProducerRecord; >> import org.apache.kafka.clients.producer.RecordMetadata; >> >> >> /** >> * Can be use in order to execute some unit test on compression >> */ >> public class TestCompress { >> >> public static void compress(String type, String version){ >> Map<String,Object> configs = new HashMap<String,Object>(); >> configs.put("key.serializer", >> "org.apache.kafka.common.serialization.StringSerializer"); >> configs.put("producer.type", "async"); >> configs.put("compression.type", type); >> configs.put("value.serializer", >> "org.apache.kafka.common.serialization.ByteArraySerializer"); >> configs.put("partitioner.class", >> "com.astellia.astkafkaproducer.RecordPartitioner"); >> configs.put("bootstrap.servers", "kafka:9092"); >> >> >> KafkaProducer<String, byte[]> producer = new >> KafkaProducer<String, byte[]>(configs); >> >> Random r = new Random(15415485); >> int size = 1024; //1 Ko >> byte[] buffer = new byte[size]; >> for(int i = 0; i < size; i++){ >> buffer[i] = (byte) ('A' + (r.nextInt() % 26)); >> } >> buffer[size-1] = 0; >> //System.out.println(new String(buffer)); >> for(int i = 0; i < size; i++ ){ >> Future<RecordMetadata> result = producer.send( new >> ProducerRecord<String, byte[]>("unit_test_compress_"+version+ "_" + >> type , buffer)); >> } >> >> producer.close(); >> } >> >> public static void main(String[] args) { >> >> String version = "v10"; >> compress("snappy",version); >> compress("gzip",version); >> compress("none",version); >> >> } >> >> } >> >> >> I'm compiling this code with this following maven pom file : >> >> <project xmlns="http://maven.apache.org/POM/4.0.0" >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >> <modelVersion>4.0.0</modelVersion> >> >> <groupId>unit_test</groupId> >> <artifactId>testCompress</artifactId> >> <version>0.0.1-SNAPSHOT</version> >> <packaging>jar</packaging> >> >> <name>testCompress</name> >> <url>http://maven.apache.org</url> >> >> <properties> >> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> >> </properties> >> >> <dependencies> >> <dependency> >> <groupId>org.apache.kafka</groupId> >> <artifactId>kafka_2.10</artifactId> >> <version>0.8.2.2</version> >> </dependency> >> </dependencies> >> </project> >> >> This program executes very well on my computer. >> >> But when I check the results directly on my kafka broker through the >> command line tool "du" the space took by each topics. I found : >> - gzip topic is compressed that's ok >> - none topic is not compressed that's ok >> - but snappy topic is not compressed, that's not ok >> (screenshot can be found here : http://i.stack.imgur.com/7W1f5.png) >> >> >> I checked though vi the stored file and data are still clear. >> >> I'm aware about this issue on Kafka 8.2.1 : >> https://issues.apache.org/jira/browse/KAFKA-2189 >> >> But I'm using Kafka 8.2.2 on producer and kafka 8.2.1 on broker. >> >> I checked the dependency of Snappy as well. I'm using the 1.1.1.7 >> >> Have you an idea of how to enable snappy compression on Kafak ? >> Did I forget a parameter to enable snappy compression on kafka ? >> Are my kafka version not compatible ? >>