Sorry again, just wanted to provide more info.

The other threads seem fine - it's always Thread 11 that errors. Here is
Thread 13 for instance:

7040 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Opened spout
kafka:(3)
7044 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Activating
spout kafka:(3)
7045 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Refreshing partition manager connections
7047 [Thread-13-kafka-EventThread] INFO
 org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
7047 [ConnectionStateManager-0] WARN
 org.apache.curator.framework.state.ConnectionStateManager - There are no
ConnectionStateListeners registered.
7052 [Thread-13-kafka] INFO  storm.kafka.DynamicBrokersReader - Read
partition info from zookeeper:
GlobalPartitionInformation{partitionMap={0=localhost:9092,
1=localhost:9092}}
7054 [Thread-13-kafka] INFO  storm.kafka.KafkaUtils - Task [1/1] assigned
[Partition{host=localhost:9092, partition=0},
Partition{host=localhost:9092, partition=1}]
7054 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1] Deleted
partition managers: []
7054 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
partition managers: [Partition{host=localhost:9092, partition=1},
Partition{host=localhost:9092, partition=0}]

On Fri, Oct 3, 2014 at 2:57 PM, Martin Arrowsmith <
[email protected]> wrote:

> Hi again,
>
> Does my zkRoot need to equal the value that is actually in zookeeper ?
> (I'm also new to zookeeper0
>
> Here is Zookeeper:
>
> [zk: 127.0.0.1:2181(CONNECTED) 8] ls /brokers/topics/mytopic/partitions
> [1, 0]
>
> And here is what I pass into spoutConfig:
>
>   SpoutConfig spoutConfig = new SpoutConfig(hosts, "mytopic",
> "/brokers/mytopic", "1");
>
> Right before the crash, debugging statements say:
>
> 7188 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Read partition
> information from: /brokers/mytopic/1/partition_1  --> null
> 7332 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - No partition
> information found, using configuration to determine offset
> 7332 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Starting Kafka
> localhost:1 from offset 60
> 7334 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Read partition
> information from: /brokers/mytopic/1/partition_0  --> null
> 7335 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - No partition
> information found, using configuration to determine offset
> 7335 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Starting Kafka
> localhost:0 from offset 60
> 7335 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Finished refreshing
> 17001 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __tick, id: {},
> [10]
> 17007 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
> java.lang.RuntimeException: java.lang.ClassCastException:
> java.lang.Integer cannot be cast to java.lang.String
> ....
>
>
>
> On Fri, Oct 3, 2014 at 11:20 AM, Martin Arrowsmith <
> [email protected]> wrote:
>
>> Hi Storm Experts!
>>
>> I'm running Storm 0.9.2-incubating and kafka_2.10-0.8.1.1
>>
>> I'm trying to get the ExclamationTopolgy to run with a Kafka Spout (using
>> the storm-kafka project included with the Github - not storm-kafka-starter)
>>
>> Here is the relevant portion of my ExcalamationTopology:
>>
>>     BrokerHosts hosts = new ZkHosts("localhost:2181");
>>     SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test",
>> "discovery");
>>     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>     KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>>
>>     builder.setSpout("kafka", kafkaSpout);
>>
>>     builder.setBolt("exclaim1", new
>> ExclamationBolt()).shuffleGrouping("kafka");
>>
>> When I run the topology on Local mode for a 1 minute, it works for the
>> first 5 seconds, and then it breaks.
>>
>> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>
>> So not sure why it says I'm using String on an Int, and why it works
>> before but now now.
>>
>> In my kafka server.properties:
>>
>> num.network.threads=2
>> num.io.threads=8
>> num.partitions=2
>>
>> Should I be doing something like this?
>>
>> builder.setSpout("kafka", kafkaSpout, 2);
>>
>> Here is the output for when it's working, and then when it starts to
>> break. Any help would be much appreciated!!
>>
>> As you can see - it starts to die on Thread 11
>>
>> Best,
>>
>> Martin
>>
>> =================
>> ...
>>
>> 13684 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>> kafka default [why will you]
>> 13684 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>> kafka __ack_init [-2810730987442558521 -8063320413415537181 3]
>> 13684 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>> Processing received message source: kafka:3, stream: default, id:
>> {-2810730987442558521=-8063320413415537181}, [why will you]
>> 13684 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>> exclaim1 default [why will you!!!]
>> 13685 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>> exclaim1 __ack_ack [-2810730987442558521 -8063320413415537181]
>> 13685 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>> Processing received message source: kafka:3, stream: __ack_init, id: {},
>> [-2810730987442558521 -8063320413415537181 3]
>> 13685 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>> Processing received message source: exclaim1:2, stream: __ack_ack, id: {},
>> [-2810730987442558521 -8063320413415537181]
>> 13686 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting
>> direct: 3; __acker __ack_ack [-2810730987442558521]
>> 13687 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Processing
>> received message source: __acker:1, stream: __ack_ack, id: {},
>> [-2810730987442558521]
>> 13687 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Acking
>> message storm.kafka.PartitionManager$KafkaMessageId@6fec0cbf
>> 15686 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>> kafka default [hit soon]
>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>> Processing received message source: kafka:3, stream: default, id:
>> {2311641896827057642=-6472974563298617532}, [hit soon]
>> 15687 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>> kafka __ack_init [2311641896827057642 -6472974563298617532 3]
>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>> exclaim1 default [hit soon!!!]
>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>> exclaim1 __ack_ack [2311641896827057642 -6472974563298617532]
>> 15688 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>> Processing received message source: kafka:3, stream: __ack_init, id: {},
>> [2311641896827057642 -6472974563298617532 3]
>> 15689 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>> Processing received message source: exclaim1:2, stream: __ack_ack, id: {},
>> [2311641896827057642 -6472974563298617532]
>> 15689 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting
>> direct: 3; __acker __ack_ack [2311641896827057642]
>> 15691 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Processing
>> received message source: __acker:1, stream: __ack_ack, id: {},
>> [2311641896827057642]
>> 15692 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Acking
>> message storm.kafka.PartitionManager$KafkaMessageId@5638c9df
>> 17490 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>> Processing received message source: __system:-1, stream: __tick, id: {},
>> [10]
>> 17496 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
>> java.lang.RuntimeException: java.lang.ClassCastException:
>> java.lang.Integer cannot be cast to java.lang.String
>> at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>> at
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>> at
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>> at
>> backtype.storm.daemon.executor$fn__3353$fn__3365$fn__3412.invoke(executor.clj:746)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>> at backtype.storm.util$async_loop$fn__450.invoke(util.clj:431)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
>> ...
>>
>
>

Reply via email to