Re: 答复: kafka performance question

2014-05-26 Thread svante karlsson
Do you read from the file in the callback from kafka? I just implemented
c++ bindings and in one of the tests i did I got the following results:

1000 messages per batch (fairly small messages ~150 bytes) and then wait
for the network layer to ack the send (not server ack)'s before putting
another message on the tcp socket. This seems to give me a average latency
of 17 ms. Througput about 10MB/s .

If you are serializing your requests and is reading data from disk between
calls to kafka then that would easily explain some added milliseconds in
each call and thus a reduced throughput. Partitioning will not reduce
latency.

/svante






2014-05-26 6:18 GMT+02:00 Zhujie (zhujie, Smartcare) 
first.zhu...@huawei.com:

 Only one broker,and eight partitions, async mode.

 Increase the number of batch.num.messages is useless.

 We split the whole file into 1K per block.


 -邮件原件-
 发件人: robairrob...@gmail.com [mailto:robairrob...@gmail.com] 代表 Robert
 Turner
 发送时间: 2014年5月16日 13:45
 收件人: users@kafka.apache.org
 主题: Re: kafka performance question

 A couple of thoughts spring to mind, are you sending the whole file as 1
 message? and is your producer code using sync or async mode?

 Cheers
Rob.


 On 14 May 2014 15:49, Jun Rao jun...@gmail.com wrote:

  How many brokers and partitions do you have? You may try increasing
  batch.num.messages.
 
  Thanks,
 
  Jun
 
 
  On Tue, May 13, 2014 at 5:56 PM, Zhujie (zhujie, Smartcare) 
  first.zhu...@huawei.com wrote:
 
   Dear all,
  
   We want to use kafka to collect and dispatch data file, but the
   performance is maybe lower than we want.
  
   In our cluster,there is a provider and a broker. We use a one thread
   read file from local disk of provider and send it to broker. The
   average throughput is only 3 MB/S~4MB/S.
   But if we just use java NIO API to send file ,the throughput can
   exceed 200MB/S.
   Why the kafka performance is so bad in our test, are we missing
  something??
  
  
  
   Our server:
   Cpu: Intel(R) Xeon(R) CPU E5-4650 0 @ 2.70GHz*4 Mem:300G Disk:600G
   15K RPM SAS*8
  
   Configuration of provider:
   props.put(serializer.class, kafka.serializer.NullEncoder);
   props.put(metadata.broker.list, 169.10.35.57:9092);
   props.put(request.required.acks, 0); props.put(producer.type,
   async);//异步
   props.put(queue.buffering.max.ms,500);
   props.put(queue.buffering.max.messages,10);
   props.put(batch.num.messages, 1200);
   props.put(queue.enqueue.timeout.ms, -1);
   props.put(send.buffer.bytes, 10240);
  
   Configuration of broker:
  
   # Licensed to the Apache Software Foundation (ASF) under one or more
   # contributor license agreements.  See the NOTICE file distributed
   with # this work for additional information regarding copyright
 ownership.
   # The ASF licenses this file to You under the Apache License,
   Version 2.0 # (the License); you may not use this file except in
   compliance with # the License.  You may obtain a copy of the License
   at #
   #http://www.apache.org/licenses/LICENSE-2.0
   #
   # Unless required by applicable law or agreed to in writing,
   software # distributed under the License is distributed on an AS
   IS BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
   express or
  implied.
   # See the License for the specific language governing permissions
   and # limitations under the License.
   # see kafka.server.KafkaConfig for additional details and defaults
  
   # Server Basics
   #
  
   # The id of the broker. This must be set to a unique integer for
   each broker.
   broker.id=0
  
   # Socket Server Settings
   #
  
   # The port the socket server listens on
   port=9092
  
   # Hostname the broker will bind to. If not set, the server will bind
   to all interfaces #host.name=localhost
  
   # Hostname the broker will advertise to producers and consumers. If
   not set, it uses the # value for host.name if configured.
   Otherwise, it will use the value returned from #
   java.net.InetAddress.getCanonicalHostName().
   #advertised.host.name=hostname routable by clients
  
   # The port to publish to ZooKeeper for clients to use. If this is
   not
  set,
   # it will publish the same port that the broker binds to.
   #advertised.port=port accessible by clients
  
   # The number of threads handling network requests
   #num.network.threads=2
   # The number of threads doing disk I/O
   #num.io.threads=8
  
   # The send buffer (SO_SNDBUF) used by the socket server
   #socket.send.buffer.bytes=1048576
  
   # The receive buffer (SO_RCVBUF) used by the socket server
   #socket.receive.buffer.bytes=1048576
  
   # The maximum size of a request that the socket server will accept
   (protection against OOM)
   #socket.request.max.bytes=104857600
  
  
   # Log Basics
   #
  
   # A comma seperated

答复: kafka performance question

2014-05-25 Thread Zhujie (zhujie, Smartcare)
Only one broker,and eight partitions, async mode.

Increase the number of batch.num.messages is useless.

We split the whole file into 1K per block.

 
-邮件原件-
发件人: robairrob...@gmail.com [mailto:robairrob...@gmail.com] 代表 Robert Turner
发送时间: 2014年5月16日 13:45
收件人: users@kafka.apache.org
主题: Re: kafka performance question

A couple of thoughts spring to mind, are you sending the whole file as 1 
message? and is your producer code using sync or async mode?

Cheers
   Rob.


On 14 May 2014 15:49, Jun Rao jun...@gmail.com wrote:

 How many brokers and partitions do you have? You may try increasing 
 batch.num.messages.

 Thanks,

 Jun


 On Tue, May 13, 2014 at 5:56 PM, Zhujie (zhujie, Smartcare)  
 first.zhu...@huawei.com wrote:

  Dear all,
 
  We want to use kafka to collect and dispatch data file, but the 
  performance is maybe lower than we want.
 
  In our cluster,there is a provider and a broker. We use a one thread 
  read file from local disk of provider and send it to broker. The 
  average throughput is only 3 MB/S~4MB/S.
  But if we just use java NIO API to send file ,the throughput can 
  exceed 200MB/S.
  Why the kafka performance is so bad in our test, are we missing
 something??
 
 
 
  Our server:
  Cpu: Intel(R) Xeon(R) CPU E5-4650 0 @ 2.70GHz*4 Mem:300G Disk:600G 
  15K RPM SAS*8
 
  Configuration of provider:
  props.put(serializer.class, kafka.serializer.NullEncoder); 
  props.put(metadata.broker.list, 169.10.35.57:9092); 
  props.put(request.required.acks, 0); props.put(producer.type, 
  async);//异步
  props.put(queue.buffering.max.ms,500);
  props.put(queue.buffering.max.messages,10);
  props.put(batch.num.messages, 1200); 
  props.put(queue.enqueue.timeout.ms, -1); 
  props.put(send.buffer.bytes, 10240);
 
  Configuration of broker:
 
  # Licensed to the Apache Software Foundation (ASF) under one or more 
  # contributor license agreements.  See the NOTICE file distributed 
  with # this work for additional information regarding copyright ownership.
  # The ASF licenses this file to You under the Apache License, 
  Version 2.0 # (the License); you may not use this file except in 
  compliance with # the License.  You may obtain a copy of the License 
  at #
  #http://www.apache.org/licenses/LICENSE-2.0
  #
  # Unless required by applicable law or agreed to in writing, 
  software # distributed under the License is distributed on an AS 
  IS BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
  express or
 implied.
  # See the License for the specific language governing permissions 
  and # limitations under the License.
  # see kafka.server.KafkaConfig for additional details and defaults
 
  # Server Basics 
  #
 
  # The id of the broker. This must be set to a unique integer for 
  each broker.
  broker.id=0
 
  # Socket Server Settings 
  #
 
  # The port the socket server listens on
  port=9092
 
  # Hostname the broker will bind to. If not set, the server will bind 
  to all interfaces #host.name=localhost
 
  # Hostname the broker will advertise to producers and consumers. If 
  not set, it uses the # value for host.name if configured.  
  Otherwise, it will use the value returned from # 
  java.net.InetAddress.getCanonicalHostName().
  #advertised.host.name=hostname routable by clients
 
  # The port to publish to ZooKeeper for clients to use. If this is 
  not
 set,
  # it will publish the same port that the broker binds to.
  #advertised.port=port accessible by clients
 
  # The number of threads handling network requests
  #num.network.threads=2
  # The number of threads doing disk I/O
  #num.io.threads=8
 
  # The send buffer (SO_SNDBUF) used by the socket server
  #socket.send.buffer.bytes=1048576
 
  # The receive buffer (SO_RCVBUF) used by the socket server
  #socket.receive.buffer.bytes=1048576
 
  # The maximum size of a request that the socket server will accept 
  (protection against OOM)
  #socket.request.max.bytes=104857600
 
 
  # Log Basics 
  #
 
  # A comma seperated list of directories under which to store log 
  files log.dirs=/data/kafka-logs
 
  # The default number of log partitions per topic. More partitions 
  allow greater # parallelism for consumption, but this will also 
  result in more files across # the brokers.
  #num.partitions=2
 
  # Log Flush Policy 
  #
 
  # Messages are immediately written to the filesystem but by default 
  we only fsync() to sync # the OS cache lazily. The following 
  configurations control the flush of data to disk.
  # There are a few important trade-offs here:
  #1. Durability: Unflushed data may be lost if you are not using
  replication.
  #2. Latency: Very large flush intervals may lead to latency spikes
  when the flush does occur as there will be a lot

Re: kafka performance question

2014-05-16 Thread Robert Turner
A couple of thoughts spring to mind, are you sending the whole file as 1
message? and is your producer code using sync or async mode?

Cheers
   Rob.


On 14 May 2014 15:49, Jun Rao jun...@gmail.com wrote:

 How many brokers and partitions do you have? You may try increasing
 batch.num.messages.

 Thanks,

 Jun


 On Tue, May 13, 2014 at 5:56 PM, Zhujie (zhujie, Smartcare) 
 first.zhu...@huawei.com wrote:

  Dear all,
 
  We want to use kafka to collect and dispatch data file, but the
  performance is maybe lower than we want.
 
  In our cluster,there is a provider and a broker. We use a one thread read
  file from local disk of provider and send it to broker. The average
  throughput is only 3 MB/S~4MB/S.
  But if we just use java NIO API to send file ,the throughput can exceed
  200MB/S.
  Why the kafka performance is so bad in our test, are we missing
 something??
 
 
 
  Our server:
  Cpu: Intel(R) Xeon(R) CPU E5-4650 0 @ 2.70GHz*4
  Mem:300G
  Disk:600G 15K RPM SAS*8
 
  Configuration of provider:
  props.put(serializer.class, kafka.serializer.NullEncoder);
  props.put(metadata.broker.list, 169.10.35.57:9092);
  props.put(request.required.acks, 0);
  props.put(producer.type, async);//异步
  props.put(queue.buffering.max.ms,500);
  props.put(queue.buffering.max.messages,10);
  props.put(batch.num.messages, 1200);
  props.put(queue.enqueue.timeout.ms, -1);
  props.put(send.buffer.bytes, 10240);
 
  Configuration of broker:
 
  # Licensed to the Apache Software Foundation (ASF) under one or more
  # contributor license agreements.  See the NOTICE file distributed with
  # this work for additional information regarding copyright ownership.
  # The ASF licenses this file to You under the Apache License, Version 2.0
  # (the License); you may not use this file except in compliance with
  # the License.  You may obtain a copy of the License at
  #
  #http://www.apache.org/licenses/LICENSE-2.0
  #
  # Unless required by applicable law or agreed to in writing, software
  # distributed under the License is distributed on an AS IS BASIS,
  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 implied.
  # See the License for the specific language governing permissions and
  # limitations under the License.
  # see kafka.server.KafkaConfig for additional details and defaults
 
  # Server Basics #
 
  # The id of the broker. This must be set to a unique integer for each
  broker.
  broker.id=0
 
  # Socket Server Settings
  #
 
  # The port the socket server listens on
  port=9092
 
  # Hostname the broker will bind to. If not set, the server will bind to
  all interfaces
  #host.name=localhost
 
  # Hostname the broker will advertise to producers and consumers. If not
  set, it uses the
  # value for host.name if configured.  Otherwise, it will use the value
  returned from
  # java.net.InetAddress.getCanonicalHostName().
  #advertised.host.name=hostname routable by clients
 
  # The port to publish to ZooKeeper for clients to use. If this is not
 set,
  # it will publish the same port that the broker binds to.
  #advertised.port=port accessible by clients
 
  # The number of threads handling network requests
  #num.network.threads=2
  # The number of threads doing disk I/O
  #num.io.threads=8
 
  # The send buffer (SO_SNDBUF) used by the socket server
  #socket.send.buffer.bytes=1048576
 
  # The receive buffer (SO_RCVBUF) used by the socket server
  #socket.receive.buffer.bytes=1048576
 
  # The maximum size of a request that the socket server will accept
  (protection against OOM)
  #socket.request.max.bytes=104857600
 
 
  # Log Basics #
 
  # A comma seperated list of directories under which to store log files
  log.dirs=/data/kafka-logs
 
  # The default number of log partitions per topic. More partitions allow
  greater
  # parallelism for consumption, but this will also result in more files
  across
  # the brokers.
  #num.partitions=2
 
  # Log Flush Policy
  #
 
  # Messages are immediately written to the filesystem but by default we
  only fsync() to sync
  # the OS cache lazily. The following configurations control the flush of
  data to disk.
  # There are a few important trade-offs here:
  #1. Durability: Unflushed data may be lost if you are not using
  replication.
  #2. Latency: Very large flush intervals may lead to latency spikes
  when the flush does occur as there will be a lot of data to flush.
  #3. Throughput: The flush is generally the most expensive operation,
  and a small flush interval may lead to exceessive seeks.
  # The settings below allow one to configure the flush policy to flush
 data
  after a period of time or
  # every N messages (or both). This can be done globally and overridden on
  a per-topic basis.
 
  # The 

Re: kafka performance question

2014-05-15 Thread Jun Rao
How many brokers and partitions do you have? You may try increasing
batch.num.messages.

Thanks,

Jun


On Tue, May 13, 2014 at 5:56 PM, Zhujie (zhujie, Smartcare) 
first.zhu...@huawei.com wrote:

 Dear all,

 We want to use kafka to collect and dispatch data file, but the
 performance is maybe lower than we want.

 In our cluster,there is a provider and a broker. We use a one thread read
 file from local disk of provider and send it to broker. The average
 throughput is only 3 MB/S~4MB/S.
 But if we just use java NIO API to send file ,the throughput can exceed
 200MB/S.
 Why the kafka performance is so bad in our test, are we missing something??



 Our server:
 Cpu: Intel(R) Xeon(R) CPU E5-4650 0 @ 2.70GHz*4
 Mem:300G
 Disk:600G 15K RPM SAS*8

 Configuration of provider:
 props.put(serializer.class, kafka.serializer.NullEncoder);
 props.put(metadata.broker.list, 169.10.35.57:9092);
 props.put(request.required.acks, 0);
 props.put(producer.type, async);//异步
 props.put(queue.buffering.max.ms,500);
 props.put(queue.buffering.max.messages,10);
 props.put(batch.num.messages, 1200);
 props.put(queue.enqueue.timeout.ms, -1);
 props.put(send.buffer.bytes, 10240);

 Configuration of broker:

 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the License); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
 #
 #http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an AS IS BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 # see kafka.server.KafkaConfig for additional details and defaults

 # Server Basics #

 # The id of the broker. This must be set to a unique integer for each
 broker.
 broker.id=0

 # Socket Server Settings
 #

 # The port the socket server listens on
 port=9092

 # Hostname the broker will bind to. If not set, the server will bind to
 all interfaces
 #host.name=localhost

 # Hostname the broker will advertise to producers and consumers. If not
 set, it uses the
 # value for host.name if configured.  Otherwise, it will use the value
 returned from
 # java.net.InetAddress.getCanonicalHostName().
 #advertised.host.name=hostname routable by clients

 # The port to publish to ZooKeeper for clients to use. If this is not set,
 # it will publish the same port that the broker binds to.
 #advertised.port=port accessible by clients

 # The number of threads handling network requests
 #num.network.threads=2
 # The number of threads doing disk I/O
 #num.io.threads=8

 # The send buffer (SO_SNDBUF) used by the socket server
 #socket.send.buffer.bytes=1048576

 # The receive buffer (SO_RCVBUF) used by the socket server
 #socket.receive.buffer.bytes=1048576

 # The maximum size of a request that the socket server will accept
 (protection against OOM)
 #socket.request.max.bytes=104857600


 # Log Basics #

 # A comma seperated list of directories under which to store log files
 log.dirs=/data/kafka-logs

 # The default number of log partitions per topic. More partitions allow
 greater
 # parallelism for consumption, but this will also result in more files
 across
 # the brokers.
 #num.partitions=2

 # Log Flush Policy
 #

 # Messages are immediately written to the filesystem but by default we
 only fsync() to sync
 # the OS cache lazily. The following configurations control the flush of
 data to disk.
 # There are a few important trade-offs here:
 #1. Durability: Unflushed data may be lost if you are not using
 replication.
 #2. Latency: Very large flush intervals may lead to latency spikes
 when the flush does occur as there will be a lot of data to flush.
 #3. Throughput: The flush is generally the most expensive operation,
 and a small flush interval may lead to exceessive seeks.
 # The settings below allow one to configure the flush policy to flush data
 after a period of time or
 # every N messages (or both). This can be done globally and overridden on
 a per-topic basis.

 # The number of messages to accept before forcing a flush of data to disk
 #log.flush.interval.messages=1

 # The maximum amount of time a message can sit in a log before we force a
 flush
 #log.flush.interval.ms=1000

 # Log Retention Policy
 #

 # The following configurations control 

kafka performance question

2014-05-13 Thread Zhujie (zhujie, Smartcare)
Dear all,

We want to use kafka to collect and dispatch data file, but the performance is 
maybe lower than we want.

In our cluster,there is a provider and a broker. We use a one thread read file 
from local disk of provider and send it to broker. The average throughput is 
only 3 MB/S~4MB/S.
But if we just use java NIO API to send file ,the throughput can exceed 200MB/S.
Why the kafka performance is so bad in our test, are we missing something??



Our server:
Cpu: Intel(R) Xeon(R) CPU E5-4650 0 @ 2.70GHz*4
Mem:300G
Disk:600G 15K RPM SAS*8

Configuration of provider:
props.put(serializer.class, kafka.serializer.NullEncoder);
props.put(metadata.broker.list, 169.10.35.57:9092);
props.put(request.required.acks, 0);
props.put(producer.type, async);//异步
props.put(queue.buffering.max.ms,500);
props.put(queue.buffering.max.messages,10);
props.put(batch.num.messages, 1200);
props.put(queue.enqueue.timeout.ms, -1);
props.put(send.buffer.bytes, 10240);

Configuration of broker:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the License); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an AS IS BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

# Server Basics #

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# Socket Server Settings 
#

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all 
interfaces
#host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it 
uses the
# value for host.name if configured.  Otherwise, it will use the value 
returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=hostname routable by clients

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=port accessible by clients

# The number of threads handling network requests
#num.network.threads=2
# The number of threads doing disk I/O
#num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
#socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
#socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection 
against OOM)
#socket.request.max.bytes=104857600


# Log Basics #

# A comma seperated list of directories under which to store log files
log.dirs=/data/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
#num.partitions=2

# Log Flush Policy #

# Messages are immediately written to the filesystem but by default we only 
fsync() to sync
# the OS cache lazily. The following configurations control the flush of data 
to disk.
# There are a few important trade-offs here:
#1. Durability: Unflushed data may be lost if you are not using replication.
#2. Latency: Very large flush intervals may lead to latency spikes when the 
flush does occur as there will be a lot of data to flush.
#3. Throughput: The flush is generally the most expensive operation, and a 
small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data 
after a period of time or
# every N messages (or both). This can be done globally and overridden on a 
per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=1

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

# Log Retention Policy #

# The following configurations control the disposal of log segments. The policy 
can
# be set to delete segments after a period of time, or after a given size has 
accumulated.
# A segment will be deleted whenever *either* of these criteria are met. 
Deletion always happens
# from the end of the log.

# The minimum age of a