hi,
i think the problem that you have is that you have stup one partition per
topic, but you try to conume with 10 kafka task spouts.
check this lines builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
10 represents the task parslellism for the spout, that shoul be in the case
of kafka the same number as the partition you have setup for kafka topic.
you use more than one kafka partition when you would like to consume in
parallel the data from the topic. please check the very good documentation
on ksfka partition on confluent site.
in my opinon, set up your hint parallelism to 1 would solve the problem.
tne max spout pending has a different meaning.
regards,
florin
On Wednesday, March 30, 2016, david kavanagh <[email protected]> wrote:
> I am only creating one partition in code here:
> GlobalPartitionInformation hostsAndPartitions = new
GlobalPartitionInformation();
> hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
> BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
> I hope that answered your question. I am new to both Storm and Kafka so i
am not sure exactly how it works.
> If i am understanding you correctly, the line you told me to add in the
first email should work because i am only creating one partition?
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
> Thanks again for the help :-)
> David
> ________________________________
> Date: Wed, 30 Mar 2016 15:36:19 +0530
> Subject: Re: Storm KafkaSpout Integration
> From: [email protected]
> To: [email protected]
>
>
> Hi david,
>
> Can I know how many partitions you are having?
> statement I have given to you is default.if you are running with no of
partitions make sure you give same number eg: if you are running with two
partitions change the number to 2 in the statement .
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2 );
>
> Best regards,
> K.Sai Dilip Reddy.
> On Wed, Mar 30, 2016 at 3:00 PM, david kavanagh <[email protected]>
wrote:
>
> Thanks for the reply!
> I added the line as you suggested but there is still no difference
unfortunately.
> I am just guessing at this stage but judging by the output below it, it
seems like it is something to do with the partitioning or the offset.
> The warnings start by staying that there are more tasks than partitions.
> Task 1 is assigned the partition that is created in the code (highlighted
in green), then the rest of the tasks are not assigned any partitions.
> Eventually is states 'Read partition information from:
/twitter/twitter-topic-id/partition_0 --> null'
> So it seems like it is not reading data from Kafka at all. I really don't
understand what is going on here.
> Any ideas?
>
> Kind Regards
> David
> --------------------------------------------------
> Storm Output:
> Thread-9-print] INFO backtype.storm.daemon.executor - Prepared bolt
print:(2)
> 32644 [Thread-11-words] INFO
org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
> 32685 [Thread-19-words] WARN storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-19-words] WARN storm.kafka.KafkaUtils - Task [5/10] no
partitions assigned
> 32686 [Thread-17-words] WARN storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-15-words] WARN storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-17-words] WARN storm.kafka.KafkaUtils - Task [4/10] no
partitions assigned
> 32686 [Thread-15-words] WARN storm.kafka.KafkaUtils - Task [3/10] no
partitions assigned
> 32686 [Thread-11-words] WARN storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-11-words] INFO storm.kafka.KafkaUtils - Task [1/10]
assigned [Partition{host=127.0.0.1:9092, partition=0}]
> 32687 [Thread-29-words] WARN storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32697 [Thread-19-words-EventThread] INFO
org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 32697 [Thread-25-words-EventThread] INFO
org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 32697 [Thread-29-words-EventThread] INFO
org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 32697 [Thread-13-words-EventThread] INFO
org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 32697 [Thread-27-words-EventThread] INFO
org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 32697 [Thread-15-words-EventThread] INFO
org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 32689 [Thread-19-words] INFO backtype.storm.daemon.executor - Opened
spout words:(7)
> 32689 [Thread-25-words] WARN storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32689 [Thread-15-words] INFO backtype.storm.daemon.executor - Opened
spout words:(5)
> 32738 [Thread-25-words] WARN storm.kafka.KafkaUtils - Task [8/10] no
partitions assigned
> 32738 [Thread-25-words] INFO backtype.storm.daemon.executor - Opened
spout words:(10)
> 32689 [Thread-17-words] INFO backtype.storm.daemon.executor - Opened
spout words:(6)
> 32688 [Thread-13-words] WARN storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32688 [Thread-21-words] WARN storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32739 [Thread-21-words] WARN storm.kafka.KafkaUtils - Task [6/10] no
partitions assigned
> 32739 [Thread-21-words] INFO backtype.storm.daemon.executor - Opened
spout words:(8)
> 32687 [Thread-27-words] WARN storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-27-words] WARN storm.kafka.KafkaUtils - Task [9/10] no
partitions assigned
> 32740 [Thread-27-words] INFO backtype.storm.daemon.executor - Opened
spout words:(11)
> 32687 [Thread-23-words] WARN storm.kafka.KafkaUtils - there are more
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-23-words] WARN storm.kafka.KafkaUtils - Task [7/10] no
partitions assigned
> 32736 [Thread-29-words] WARN storm.kafka.KafkaUtils - Task [10/10] no
partitions assigned
> 32742 [Thread-17-words] INFO backtype.storm.daemon.executor - Activating
spout words:(6)
> 32872 [Thread-29-words] INFO backtype.storm.daemon.executor - Opened
spout words:(12)
> 32742 [Thread-25-words] INFO backtype.storm.daemon.executor - Activating
spout words:(10)
> 32742 [Thread-21-words] INFO backtype.storm.daemon.executor - Activating
spout words:(8)
> 32742 [Thread-27-words] INFO backtype.storm.daemon.executor - Activating
spout words:(11)
> 32742 [Thread-19-words] INFO backtype.storm.daemon.executor - Activating
spout words:(7)
> 32741 [Thread-15-words] INFO backtype.storm.daemon.executor - Activating
spout words:(5)
> 32740 [Thread-13-words] WARN storm.kafka.KafkaUtils - Task [2/10] no
partitions assigned
> 32873 [Thread-29-words] INFO backtype.storm.daemon.executor - Activating
spout words:(12)
> 32872 [Thread-23-words] INFO backtype.storm.daemon.executor - Opened
spout words:(9)
> 32873 [Thread-13-words] INFO backtype.storm.daemon.executor - Opened
spout words:(4)
> 32873 [Thread-23-words] INFO backtype.storm.daemon.executor - Activating
spout words:(9)
> 32873 [Thread-13-words] INFO backtype.storm.daemon.executor - Activating
spout words:(4)
> 37756 [Thread-23-words-EventThread] INFO
org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 37757 [Thread-17-words-EventThread] INFO
org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 37757 [Thread-21-words-EventThread] INFO
org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 37757 [Thread-11-words-EventThread] INFO
org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
> 37773 [Thread-11-words] INFO storm.kafka.PartitionManager - Read
partition information from: /twitter/twitter-topic-id/partition_0 --> null
> 37915 [Thread-11-words] INFO storm.kafka.PartitionManager - No partition
information found, using configuration to determine offset
> 37915 [Thread-11-words] INFO storm.kafka.PartitionManager - Starting
Kafka 127.0.0.1:0 from offset 185
> 37916 [Thread-11-words] INFO backtype.storm.daemon.executor - Opened
spout words:(3)
> 37917 [Thread-11-words] INFO backtype.storm.daemon.executor - Activating
spout words:(3)
> 62005 [Thread-11-words] INFO backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
> 62013 [Thread-13-words] INFO backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
>
> ________________________________
> Date: Wed, 30 Mar 2016 10:10:54 +0530
> Subject: Re: Storm KafkaSpout Integration
> From: [email protected]
> To: [email protected]
>
> Hi david,
>
> I think everything is good but you are missing a statement
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line
config.setDebug(true);
>
> Best regards,
> K.Sai Dilip Reddy.
> On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <[email protected]>
wrote:
>
> Hi all,
> I am currently trying use TestTopologyStaticHosts to try connect the
KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance
running on my localhost. I have a topic named "twitter-topic" that has some
tweets in it. This is all working as expected. I can run the consumer in
the terminal and it returns the tweets. I want to use the KafkaSpout to
connect to the Kafka topic and pull the tweets into a topology. I have been
working on this a few days now and no success.
> So far i have learned that when Storm is run in local mode that it uses
an in memory zookeeper on port 2000, which would not allow it to connect to
the Kafka topic. I have tried to get around this using the following syntax
that i found online:
> LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
> It is still not working but it seems to be connecting to Kafka as it
gives a 'closed socket connection' message when i cancel the operation
(after it does not work and hangs open). It also says in the storm output
that it is connected to localhost 2181 so it seems to be getting that far.
I have included the full output from Storm in a txt file attached.
> Here is the code i am using in the TestTopologyStaticHosts class:
> public static void main(String[] args) throws Exception {
> //String zkConnString = "localhost:2181";
> GlobalPartitionInformation hostsAndPartitions = new
GlobalPartitionInformation();
> hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
> BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
> // BrokerHosts brokerHosts = new ZkHosts(zkConnString,
"/brokers");
> SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts,
"twitter-topic","/twitter","twitter-topic-id");
> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> //kafkaConfig.forceStartOffsetTime(-2);
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
> builder.setBolt("print", new
PrinterBolt()).shuffleGrouping("words");
> LocalCluster cluster = new LocalCluster("localhost", new
Long(2181));
> Config config = new Config();
> config.setDebug(true);
> // config.put("storm.zookeeper.servers", "localhost");
> // config.put("storm.zookeeper.port", "2181");
> cluster.submitTopology("kafka-test", config,
builder.createTopology());
> Thread.sleep(600000);
> }
> Judging by the output it seems that there is a problem with connecting to
the Kafka partitions.
> I have tried many different things to get it to work but no luck. I have
also been looking at using the KafkaSpoutTestTopology class but it is
expecting arguments including 'dockerIp' which i don't understand.
> Should i be using Storm in localmode?
> Should i be using the TestTopologyStaticHosts class or would
the KafkaSpoutTestTopology class be better?
> Any help at all would be greatly appreciated because i am really stuck.
> Kind Regards
> David Kavanagh
>
>
>