Hi david,

Can I know how many partitions you are having?
statement I have given to you is default.if you are  running with no of
partitions make sure you give same number eg: if you are running with two
partitions change the number to 2 in the statement .
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2 );


*Best regards,*
*K.Sai Dilip Reddy.*

On Wed, Mar 30, 2016 at 3:00 PM, david kavanagh <[email protected]>
wrote:

> Thanks for the reply!
>
> I added the line as you suggested but there is still no difference
> unfortunately.
> I am just guessing at this stage but judging by the output below it, it
> seems like it is something to do with the partitioning or the offset.
> The warnings start by staying that  there are more tasks than partitions.
> Task 1 is assigned the partition that is created in the code (highlighted
> in green), then the rest of the tasks are not assigned any partitions.
> Eventually is states 'Read partition information from:
> /twitter/twitter-topic-id/partition_0  --> null'
>
> So it seems like it is not reading data from Kafka at all. I really don't
> understand what is going on here.
> Any ideas?
>
>
> Kind Regards
>
> David
>
> --------------------------------------------------
>
> *Storm Output:*
>
> Thread-9-print] INFO  backtype.storm.daemon.executor - Prepared bolt
> print:(2)
> 32644 [Thread-11-words] INFO
>  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
> 32685 [Thread-19-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-19-words] WARN  storm.kafka.KafkaUtils - Task [5/10] no
> partitions assigned
> 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - Task [4/10] no
> partitions assigned
> 32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - Task [3/10] no
> partitions assigned
> 32686 [Thread-11-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32686 [Thread-11-words] INFO  storm.kafka.KafkaUtils - Task [1/10]
> assigned [Partition{host=127.0.0.1:9092, partition=0}]
> 32687 [Thread-29-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32697 [Thread-19-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-25-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-29-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-13-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-27-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32697 [Thread-15-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 32689 [Thread-19-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(7)
> 32689 [Thread-25-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32689 [Thread-15-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(5)
> 32738 [Thread-25-words] WARN  storm.kafka.KafkaUtils - Task [8/10] no
> partitions assigned
> 32738 [Thread-25-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(10)
> 32689 [Thread-17-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(6)
> 32688 [Thread-13-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32688 [Thread-21-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32739 [Thread-21-words] WARN  storm.kafka.KafkaUtils - Task [6/10] no
> partitions assigned
> 32739 [Thread-21-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(8)
> 32687 [Thread-27-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-27-words] WARN  storm.kafka.KafkaUtils - Task [9/10] no
> partitions assigned
> 32740 [Thread-27-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(11)
> 32687 [Thread-23-words] WARN  storm.kafka.KafkaUtils - there are more
> tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle
> 32740 [Thread-23-words] WARN  storm.kafka.KafkaUtils - Task [7/10] no
> partitions assigned
> 32736 [Thread-29-words] WARN  storm.kafka.KafkaUtils - Task [10/10] no
> partitions assigned
> 32742 [Thread-17-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(6)
> 32872 [Thread-29-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(12)
> 32742 [Thread-25-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(10)
> 32742 [Thread-21-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(8)
> 32742 [Thread-27-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(11)
> 32742 [Thread-19-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(7)
> 32741 [Thread-15-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(5)
> 32740 [Thread-13-words] WARN  storm.kafka.KafkaUtils - Task [2/10] no
> partitions assigned
> 32873 [Thread-29-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(12)
> 32872 [Thread-23-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(9)
> 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(4)
> 32873 [Thread-23-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(9)
> 32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(4)
> 37756 [Thread-23-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 37757 [Thread-17-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 37757 [Thread-21-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 37757 [Thread-11-words-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 37773 [Thread-11-words] INFO  storm.kafka.PartitionManager - Read
> partition information from: /twitter/twitter-topic-id/partition_0  --> null
> 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - No partition
> information found, using configuration to determine offset
> 37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - Starting
> Kafka 127.0.0.1:0 from offset 185
> 37916 [Thread-11-words] INFO  backtype.storm.daemon.executor - Opened
> spout words:(3)
> 37917 [Thread-11-words] INFO  backtype.storm.daemon.executor - Activating
> spout words:(3)
> 62005 [Thread-11-words] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
> 62013 [Thread-13-words] INFO  backtype.storm.daemon.executor - Processing
> received message source: __system:-1, stream: __tick, id: {}, [30]
>
>
> ------------------------------
> Date: Wed, 30 Mar 2016 10:10:54 +0530
> Subject: Re: Storm KafkaSpout Integration
> From: [email protected]
> To: [email protected]
>
>
> Hi david,
>
> I think everything is good but you are missing a statement
> config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line
> config.setDebug(true);
>
> *Best regards,*
> *K.Sai Dilip Reddy.*
>
> On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <[email protected]>
> wrote:
>
> Hi all,
>
> I am currently trying use TestTopologyStaticHosts to try connect the
> KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance
> running on my localhost. I have a topic named "twitter-topic" that has some
> tweets in it. This is all working as expected. I can run the consumer in
> the terminal and it returns the tweets. I want to use the KafkaSpout to
> connect to the Kafka topic and pull the tweets into a topology. I have
> been working on this a few days now and no success.
>
> So far i have learned that when Storm is run in local mode that it uses an
> in memory zookeeper on port 2000, which would not allow it to connect to
> the Kafka topic. I have tried to get around this using the following syntax
> that i found online:
>
> LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
>
> It is still not working but it seems to be connecting to Kafka as it gives
> a 'closed socket connection' message when i cancel the operation (after it
> does not work and hangs open). It also says in the storm output that it is
> connected to localhost 2181 so it seems to be getting that far. I have
> included the full output from Storm in a txt file attached.
>
> Here is the code i am using in the TestTopologyStaticHosts class:
>
>  public static void main(String[] args) throws Exception {
>
>         //String zkConnString = "localhost:2181";
>
>         GlobalPartitionInformation hostsAndPartitions = new
> GlobalPartitionInformation();
>         hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
>         BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
>         // BrokerHosts brokerHosts = new ZkHosts(zkConnString, "/brokers");
>
>         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts,
> "twitter-topic","/twitter","twitter-topic-id");
>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>         //kafkaConfig.forceStartOffsetTime(-2);
>
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
>         builder.setBolt("print", new
> PrinterBolt()).shuffleGrouping("words");
>         LocalCluster cluster = new LocalCluster("localhost", new
> Long(2181));
>         Config config = new Config();
>         config.setDebug(true);
>         // config.put("storm.zookeeper.servers", "localhost");
>         // config.put("storm.zookeeper.port", "2181");
>         cluster.submitTopology("kafka-test", config,
> builder.createTopology());
>
>         Thread.sleep(600000);
>
>     }
>
> Judging by the output it seems that there is a problem with connecting to
> the Kafka partitions.
> I have tried many different things to get it to work but no luck. I have
> also been looking at using the KafkaSpoutTestTopology class but it is
> expecting arguments including 'dockerIp' which i don't understand.
>
> Should i be using Storm in localmode?
> Should i be using the TestTopologyStaticHosts class or would the 
> KafkaSpoutTestTopology
> class be better?
>
> Any help at all would be greatly appreciated because i am really stuck.
>
> Kind Regards
> David Kavanagh
>
>
>

Reply via email to