After some internet and forum searching, it looks like my Local mode isn't
connecting to an external zookeeper. Does that sound familiar ? If so, how
come I can still see some spouts ?

If it's true, any idea how I can connect to an external zookeeper in Local
mode ?

Martin

On Fri, Oct 3, 2014 at 3:01 PM, Martin Arrowsmith <
[email protected]> wrote:

> Sorry again, just wanted to provide more info.
>
> The other threads seem fine - it's always Thread 11 that errors. Here is
> Thread 13 for instance:
>
> 7040 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Opened spout
> kafka:(3)
> 7044 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Activating
> spout kafka:(3)
> 7045 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Refreshing partition manager connections
> 7047 [Thread-13-kafka-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 7047 [ConnectionStateManager-0] WARN
>  org.apache.curator.framework.state.ConnectionStateManager - There are no
> ConnectionStateListeners registered.
> 7052 [Thread-13-kafka] INFO  storm.kafka.DynamicBrokersReader - Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=localhost:9092,
> 1=localhost:9092}}
> 7054 [Thread-13-kafka] INFO  storm.kafka.KafkaUtils - Task [1/1] assigned
> [Partition{host=localhost:9092, partition=0},
> Partition{host=localhost:9092, partition=1}]
> 7054 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Deleted partition managers: []
> 7054 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
> partition managers: [Partition{host=localhost:9092, partition=1},
> Partition{host=localhost:9092, partition=0}]
>
> On Fri, Oct 3, 2014 at 2:57 PM, Martin Arrowsmith <
> [email protected]> wrote:
>
>> Hi again,
>>
>> Does my zkRoot need to equal the value that is actually in zookeeper ?
>> (I'm also new to zookeeper0
>>
>> Here is Zookeeper:
>>
>> [zk: 127.0.0.1:2181(CONNECTED) 8] ls /brokers/topics/mytopic/partitions
>> [1, 0]
>>
>> And here is what I pass into spoutConfig:
>>
>>   SpoutConfig spoutConfig = new SpoutConfig(hosts, "mytopic",
>> "/brokers/mytopic", "1");
>>
>> Right before the crash, debugging statements say:
>>
>> 7188 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Read
>> partition information from: /brokers/mytopic/1/partition_1  --> null
>> 7332 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - No partition
>> information found, using configuration to determine offset
>> 7332 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Starting
>> Kafka localhost:1 from offset 60
>> 7334 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Read
>> partition information from: /brokers/mytopic/1/partition_0  --> null
>> 7335 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - No partition
>> information found, using configuration to determine offset
>> 7335 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Starting
>> Kafka localhost:0 from offset 60
>> 7335 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1]
>> Finished refreshing
>> 17001 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>> Processing received message source: __system:-1, stream: __tick, id: {},
>> [10]
>> 17007 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
>> java.lang.RuntimeException: java.lang.ClassCastException:
>> java.lang.Integer cannot be cast to java.lang.String
>> ....
>>
>>
>>
>> On Fri, Oct 3, 2014 at 11:20 AM, Martin Arrowsmith <
>> [email protected]> wrote:
>>
>>> Hi Storm Experts!
>>>
>>> I'm running Storm 0.9.2-incubating and kafka_2.10-0.8.1.1
>>>
>>> I'm trying to get the ExclamationTopolgy to run with a Kafka Spout
>>> (using the storm-kafka project included with the Github - not
>>> storm-kafka-starter)
>>>
>>> Here is the relevant portion of my ExcalamationTopology:
>>>
>>>     BrokerHosts hosts = new ZkHosts("localhost:2181");
>>>     SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test",
>>> "discovery");
>>>     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>     KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>>>
>>>     builder.setSpout("kafka", kafkaSpout);
>>>
>>>     builder.setBolt("exclaim1", new
>>> ExclamationBolt()).shuffleGrouping("kafka");
>>>
>>> When I run the topology on Local mode for a 1 minute, it works for the
>>> first 5 seconds, and then it breaks.
>>>
>>> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>
>>> So not sure why it says I'm using String on an Int, and why it works
>>> before but now now.
>>>
>>> In my kafka server.properties:
>>>
>>> num.network.threads=2
>>> num.io.threads=8
>>> num.partitions=2
>>>
>>> Should I be doing something like this?
>>>
>>> builder.setSpout("kafka", kafkaSpout, 2);
>>>
>>> Here is the output for when it's working, and then when it starts to
>>> break. Any help would be much appreciated!!
>>>
>>> As you can see - it starts to die on Thread 11
>>>
>>> Best,
>>>
>>> Martin
>>>
>>> =================
>>> ...
>>>
>>> 13684 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>>> kafka default [why will you]
>>> 13684 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>>> kafka __ack_init [-2810730987442558521 -8063320413415537181 3]
>>> 13684 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: kafka:3, stream: default, id:
>>> {-2810730987442558521=-8063320413415537181}, [why will you]
>>> 13684 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>>> exclaim1 default [why will you!!!]
>>> 13685 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>>> exclaim1 __ack_ack [-2810730987442558521 -8063320413415537181]
>>> 13685 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: kafka:3, stream: __ack_init, id: {},
>>> [-2810730987442558521 -8063320413415537181 3]
>>> 13685 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: exclaim1:2, stream: __ack_ack, id: {},
>>> [-2810730987442558521 -8063320413415537181]
>>> 13686 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting
>>> direct: 3; __acker __ack_ack [-2810730987442558521]
>>> 13687 [Thread-13-kafka] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: __acker:1, stream: __ack_ack, id: {},
>>> [-2810730987442558521]
>>> 13687 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Acking
>>> message storm.kafka.PartitionManager$KafkaMessageId@6fec0cbf
>>> 15686 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>>> kafka default [hit soon]
>>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: kafka:3, stream: default, id:
>>> {2311641896827057642=-6472974563298617532}, [hit soon]
>>> 15687 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>>> kafka __ack_init [2311641896827057642 -6472974563298617532 3]
>>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>>> exclaim1 default [hit soon!!!]
>>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>>> exclaim1 __ack_ack [2311641896827057642 -6472974563298617532]
>>> 15688 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: kafka:3, stream: __ack_init, id: {},
>>> [2311641896827057642 -6472974563298617532 3]
>>> 15689 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: exclaim1:2, stream: __ack_ack, id: {},
>>> [2311641896827057642 -6472974563298617532]
>>> 15689 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting
>>> direct: 3; __acker __ack_ack [2311641896827057642]
>>> 15691 [Thread-13-kafka] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: __acker:1, stream: __ack_ack, id: {},
>>> [2311641896827057642]
>>> 15692 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Acking
>>> message storm.kafka.PartitionManager$KafkaMessageId@5638c9df
>>> 17490 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: __system:-1, stream: __tick, id: {},
>>> [10]
>>> 17496 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
>>> java.lang.RuntimeException: java.lang.ClassCastException:
>>> java.lang.Integer cannot be cast to java.lang.String
>>> at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>> at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>> at
>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>> at
>>> backtype.storm.daemon.executor$fn__3353$fn__3365$fn__3412.invoke(executor.clj:746)
>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>> at backtype.storm.util$async_loop$fn__450.invoke(util.clj:431)
>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
>>> ...
>>>
>>
>>
>

Reply via email to