You'll need to make sure your Kafka producer is configured to partition the
way you are expecting when you write to your topic. By default it will
publish to the same partition for 10 minutes at a time then switch to a new
one. It looks like you are trying to pass a partition key to the producer
but the producer needs to be set up to use it.

On Tue, Apr 7, 2015 at 7:55 AM Jonas P Sandström <[email protected]>
wrote:

>  Hi,
>
>
>  I'm trying to send processed data from Storm Trident to 3 Partitions of
> a Kafka topic.
>
> However, I cannot figure out how to make Trident write to more than one
> partition, by selecting the topic with DefaultTopicSelector().
>
> There is a makeState implementation that includes  partitionIndex and
> numPartitions​, but I cannot find an example of how to use it
>
> or set the Map or IMetricsContext.
>
>
>  My question is if it is possible to use TridentKafkaStateFactory for
> partitioned output or if there is some other entity that
>
> can solve the problem? The ugly solution would be to create more topics
> and partition the stream thereby.
>
>
>
>  @SuppressWarnings("rawtypes")
> TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
> .withKafkaTopicSelector(new DefaultTopicSelector(outputTopic))
>  .withTridentTupleToKafkaMapper(
> new FieldNameBasedTupleToKafkaMapper("key", "msg"));
>
>
>  public State 
> <https://storm.apache.org/javadoc/apidocs/storm/trident/state/State.html> 
> makeState(Map 
> <http://docs.oracle.com/javase/6/docs/api/java/util/Map.html?is-external=true>
>  conf,
>                        IMetricsContext 
> <https://storm.apache.org/javadoc/apidocs/backtype/storm/task/IMetricsContext.html>
>  metrics,
>                        int partitionIndex,
>                        int numPartitions)
>
> Best regards,
>
> Jonas Sandström
>

Reply via email to