Hi thanks for the quick answer,
If I understand correctly the producer needs to be configured before use:
Properties props = new Properties();
props.put("zookeeper.connect", zookeeperhost);
props.put("metadata.broker.list", brokerlist);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "0");
props.put("partioner.class","kafka.producer.DefaultPartitioner"); // If I
understood right it will produce to all partitions
ProducerConfig config = new ProducerConfig(props);
TridentKafkaState state = new TridentKafkaState();
state.prepare(props);
@SuppressWarnings("rawtypes")
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
.withTridentTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("key", "msg"));
rePosition.shuffle().parallelismHint(3).partitionPersist(stateFactory, new
Fields("key","msg"), new TridentKafkaUpdater(), new Fields())
.parallelismHint(3);
However, I don't know how to put the ProducerConfig configuration to use in the
TridentKafkaStateFactory.
Or if you had anything else in mind for producing to Kafka?
Best regards,
Jonas Sandström?
________________________________
Från: Andrew Neilson <[email protected]>
Skickat: den 7 april 2015 20:23
Till: [email protected]
Ämne: Re: Partitioning from Storm Trident to Kafka
You'll need to make sure your Kafka producer is configured to partition the way
you are expecting when you write to your topic. By default it will publish to
the same partition for 10 minutes at a time then switch to a new one. It looks
like you are trying to pass a partition key to the producer but the producer
needs to be set up to use it.
On Tue, Apr 7, 2015 at 7:55 AM Jonas P Sandström
<[email protected]<mailto:[email protected]>> wrote:
Hi,
I'm trying to send processed data from Storm Trident to 3 Partitions of a Kafka
topic.
However, I cannot figure out how to make Trident write to more than one
partition, by selecting the topic with DefaultTopicSelector().
There is a makeState implementation that includes partitionIndex and
numPartitions?, but I cannot find an example of how to use it
or set the Map or IMetricsContext.
My question is if it is possible to use TridentKafkaStateFactory for
partitioned output or if there is some other entity that
can solve the problem? The ugly solution would be to create more topics and
partition the stream thereby.
@SuppressWarnings("rawtypes")
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
.withTridentTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("key", "msg"));
public
State<https://storm.apache.org/javadoc/apidocs/storm/trident/state/State.html>
makeState(Map<http://docs.oracle.com/javase/6/docs/api/java/util/Map.html?is-external=true>
conf,
IMetricsContext<https://storm.apache.org/javadoc/apidocs/backtype/storm/task/IMetricsContext.html>
metrics,
int partitionIndex,
int numPartitions)
Best regards,
Jonas Sandström