KeyedMessage<String, byte[]> kmsg = new KeyedMessage<String,
byte[]>(topic, message); You don't have a key , Produce.send sends data
partitioned by key. Since key is not present all the data goes into one
topic. Please check the example here
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
KeyedMessage<String, String> data = newKeyedMessage<String,
String>("page_visits", ip, msg); in place of ip you can probably add a
random UUID for better distribution among topics . Here is bit more info
"In Kafka producer, a partition key can be specified to indicate the
destination partition of the message. By default, a hashing-based
partitioner is used to determine the partition id given the key, and
people can use customized partitioners also. To reduce # of open
sockets, in 0.8.0 (https://issues.apache.org/jira/browse/KAFKA-1017),
when the partitioning key is not specified or null, a producer will pick
a random partition and stick to it for some time (default is 10 mins)
before switching to another one. So, if there are fewer producers than
partitions, at a given point of time, some partitions may not receive
any data. To alleviate this problem, one can either reduce the metadata
refresh interval or specify a message key and a customized random
partitioner.:
On Fri, Nov 14, 2014, at 02:10 AM, 张炜 wrote:
> Hi Harsha, I think you are right. I checked that my Kafka Producer
> is not writing to all the partitions, which I never know until you
> gave a clue.
>
> My producer property looks like the one below:
>
> metadata.broker.list=C0045557:9092, C0045558:9092, C0045559:9093
> compression.codec=snappy request.required.acks=1 producer.type=sync
>
> And I initialize one producer and use it to send all messages:
>
> Part 1) private Producer<String, byte[]> inner; private static
> Properties properties = new Properties();
>
> public RecordProducer() { getProperties();
>
> ProducerConfig config = new ProducerConfig(properties); inner = new
> Producer<String, byte[]>(config); }
>
> Part 2) send method public void send(String topic, byte[] message) {
> if (topic == null || message == null) { return; } KeyedMessage<String,
> byte[]> kmsg = new KeyedMessage<String, byte[]>(topic, message);
> inner.send(kmsg); }
>
> Part 3) Call the send method
>
> for (File file in File[]){ byte[] content=readFile inner.send(topic,
> content) }
>
> I didn't write a partitioner class, because I think default it will
> use the random partition and goes to all the partitions of the topic
> which number is 7.
>
> Could someone help to tell me why it only goes to one partition? Thank
> you very much.
>
> Regards, Sai
>
>
> On Fri, Nov 14, 2014 at 11:46 AM, Harsha <[email protected]> wrote:
>> __
>> Hi Sai, kafkaSpout config and topic partitions looks good to me. Are
>> you sure from your kafka producer you are writing to all the
>> partitions in your topic. -Harsha
>>
>>
>> On Thu, Nov 13, 2014, at 07:39 PM, 张炜 wrote:
>>> I have a question abot Kafka Spout
>>> (https://github.com/apache/storm/tree/master/external/storm-kafka)
>>> that need your help.
>>>
>>> I build Kafka Spout and set to use 7 worker to receive Kafka
>>> messages. builder.setSpout(Utils.TOPIC_DEFAULT, new
>>> KafkaSpout(spoutConfig_Default), 7);
>>>
>>> And finally I find that all messages go to only one worker, which is
>>> shown as the diagram below.
>>>
>>>
>>>
>>> Actually my Kafka topic has multiple partitions.
>>>
>>> $ bin/kafka-topics.sh --topic general_dev --describe --zookeeper
>>> localhost:2181 Topic:general_dev PartitionCount:7
>>> ReplicationFactor:1 Configs: Topic: general_dev Partition: 0 Leader:
>>> 1 Replicas: 1 Isr: 1 Topic: general_dev Partition: 1 Leader: 2
>>> Replicas: 2 Isr: 2 Topic: general_dev Partition: 2 Leader: 3
>>> Replicas: 3 Isr: 3 Topic: general_dev Partition: 3 Leader: 4
>>> Replicas: 4 Isr: 4 Topic: general_dev Partition: 4 Leader: 5
>>> Replicas: 5 Isr: 5 Topic: general_dev Partition: 5 Leader: 6
>>> Replicas: 6 Isr: 6 Topic: general_dev Partition: 6 Leader: 7
>>> Replicas: 7 Isr: 7
>>>
>>> 1. Does the Kafka Spout support multiple worker and how to do it?
>>>
>>> 2. If my topic has multiple partitions, and I want each partition to
>>> be consumed by one worker to achieve the max performance. If the
>>> current KafkaSpout cannot do this, could you please give some
>>> suggestions on how to make it?
>>>
>>>
--
>>> Regards, Sai
>>>
>>> Email had 1 attachment:
>>> * Screenshot.png 39k (image/png)
>>
>
>
>
> --
> 流水不争先o00 Email had 1 attachment:
> * Screenshot.png 39k (image/png)