Hi, I have just one consumer per topic. But I have quite a lot of topics.
>Also, to which servers are you seeing the timeouts - Zookeeper or kafka? Zookeeper. >Also, any reason why you are lowering the zk session timeout in >your code? It was copy pasted from some tutorial. Cheers, Klaus On Thu, Jan 9, 2014 at 4:04 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > Are all the consumers in the same group? If so there could be longer > rebalance latencies; but in general you should be able to have that > many consumers. > > Also, to which servers are you seeing the timeouts - Zookeeper or > kafka? Also, any reason why you are lowering the zk session timeout in > your code? Can you share some of your logs? > > Joel > > On Thu, Jan 9, 2014 at 5:07 AM, Klaus Schaefers > <klaus.schaef...@ligatus.com> wrote: > > Hi, > > > > how many consumers can I connect to a kafka cluster? I am running a kafka > > server with a separate zookeeper server in my development box. For > testing > > purpose I am now trying to create a a< large number of consumers, e.g. > 100 > > or so. But once I created the 40th consumer I start to get timeouts. > Also I > > have noticed that the conencton time get longer an longer. > > > > > > My consumer code looks like this: > > > > > > > > public class KafkaConsumer implements Runnable{ > > > > public long count = 0; > > > > public long errors = 0; > > > > private IMessageQueue context; > > > > private ConsumerConnector consumer; > > > > private int a_numThreads = 1; > > > > private KafkaStream<byte[], byte[]> stream; > > > > private volatile boolean running = true; > > > > private String topic; > > > > > > @Override > > public void run() { > > Logger.log(KafkaConsumer.class, "run()", "enter > " + > this.topic); > > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > > while (running && it.hasNext()){ > > try { > > > > // do something > > count++; > > > > } catch (Exception e) { > > errors++; > > } > > } > > > > } > > > > > > > > > > > > public KafkaConsumer(IMQListener<ILogEvent> fct, IMessageQueue > context, > > String zookeper, String topic) { > > this.fct = fct; > > this.context = context; > > this.topic = topic; > > > > > > /** > > * get the kafka streams > > */ > > ConsumerConfig consumerConfig = createConsumerConfig(zookeper, > > "group1"); > > consumer = > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); > > Map<String, Integer> topicCountMap = new HashMap<String, > Integer>(); > > topicCountMap.put(topic, new Integer(a_numThreads)); > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > > consumer.createMessageStreams(topicCountMap); > > List<KafkaStream<byte[], byte[]>> streams = > consumerMap.get(topic); > > this.stream = streams.get(0); > > > > > > /** > > * start one thread reading from the queue > > */ > > Thread t = new Thread(this); > > t.start(); > > > > > > } > > > > > > > > > > public void shutdown() { > > Logger.log(KafkaConsumer.class, "shutdown()", "enter > " + > > this.topic); > > > > if (consumer != null) > > consumer.shutdown(); > > > > > > running = false; > > } > > > > > > private static ConsumerConfig createConsumerConfig(String > a_zookeeper, > > String a_groupId) { > > Properties props = new Properties(); > > props.put("zookeeper.connect", a_zookeeper); > > props.put("group.id", a_groupId); > > props.put("zookeeper.session.timeout.ms", "400"); > > props.put("zookeeper.sync.time.ms", "200"); > > props.put("auto.commit.interval.ms", "1000"); > > > > return new ConsumerConfig(props); > > } > > > > > > > > } > > > > > > > > Cheers, > > > > KLaus > > > > -- > > > > -- > > > > Klaus Schaefers > > Senior Optimization Manager > > > > Ligatus GmbH > > Hohenstaufenring 30-32 > > D-50674 Köln > > > > Tel.: +49 (0) 221 / 56939 -784 > > Fax: +49 (0) 221 / 56 939 - 599 > > E-Mail: klaus.schaef...@ligatus.com > > Web: www.ligatus.de > > > > HRB Köln 56003 > > Geschäftsführung: > > Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann, > > Dipl.-Wirtschaftsingenieur Arne Wolter > -- -- Klaus Schaefers Senior Optimization Manager Ligatus GmbH Hohenstaufenring 30-32 D-50674 Köln Tel.: +49 (0) 221 / 56939 -784 Fax: +49 (0) 221 / 56 939 - 599 E-Mail: klaus.schaef...@ligatus.com Web: www.ligatus.de HRB Köln 56003 Geschäftsführung: Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann, Dipl.-Wirtschaftsingenieur Arne Wolter