Hi thanks for the quick answer,

If I understand correctly the producer needs to be configured before use:


Properties props = new Properties();
props.put("zookeeper.connect", zookeeperhost);
props.put("metadata.broker.list", brokerlist);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "0");
props.put("partioner.class","kafka.producer.DefaultPartitioner"); // If I 
understood right it will produce to all partitions
ProducerConfig config = new ProducerConfig(props);
TridentKafkaState state = new TridentKafkaState();
state.prepare(props);
@SuppressWarnings("rawtypes")
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
.withTridentTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("key", "msg"));

rePosition.shuffle().parallelismHint(3).partitionPersist(stateFactory, new 
Fields("key","msg"), new TridentKafkaUpdater(), new Fields())
.parallelismHint(3);

However, I don't know how to put the ProducerConfig configuration to use in the 
TridentKafkaStateFactory.
Or if you had anything else in mind for producing to Kafka?



Best regards,

Jonas Sandström?

________________________________
Från: Andrew Neilson <[email protected]>
Skickat: den 7 april 2015 20:23
Till: [email protected]
Ämne: Re: Partitioning from Storm Trident to Kafka

You'll need to make sure your Kafka producer is configured to partition the way 
you are expecting when you write to your topic. By default it will publish to 
the same partition for 10 minutes at a time then switch to a new one. It looks 
like you are trying to pass a partition key to the producer but the producer 
needs to be set up to use it.

On Tue, Apr 7, 2015 at 7:55 AM Jonas P Sandström 
<[email protected]<mailto:[email protected]>> wrote:

Hi,


I'm trying to send processed data from Storm Trident to 3 Partitions of a Kafka 
topic.

However, I cannot figure out how to make Trident write to more than one 
partition, by selecting the topic with DefaultTopicSelector().

There is a makeState implementation that includes  partitionIndex and 
numPartitions?, but I cannot find an example of how to use it

or set the Map or IMetricsContext.


My question is if it is possible to use TridentKafkaStateFactory for 
partitioned output or if there is some other entity that

can solve the problem? The ugly solution would be to create more topics and 
partition the stream thereby.



@SuppressWarnings("rawtypes")
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
.withTridentTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("key", "msg"));


public 
State<https://storm.apache.org/javadoc/apidocs/storm/trident/state/State.html> 
makeState(Map<http://docs.oracle.com/javase/6/docs/api/java/util/Map.html?is-external=true>
 conf,
                       
IMetricsContext<https://storm.apache.org/javadoc/apidocs/backtype/storm/task/IMetricsContext.html>
 metrics,
                       int partitionIndex,
                       int numPartitions)

Best regards,

Jonas Sandström

Reply via email to