Is that 60000ms internal setting? where can I see this configuration? thanks
Alec On Wed, Aug 13, 2014 at 9:44 AM, Sa Li <[email protected]> wrote: > Hi, Siddharth > > I only used trident topology for kafka spout, since I thought I can easily > to add .each function to parse the stream. I do add two types of > storm-kafka packages in my pom, See this pom > > <!-- Storm-Kafka compiled --> > <dependency> > <artifactId>storm-kafka</artifactId> > <groupId>org.apache.storm</groupId> > <version>0.9.2-incubating</version> > <!-- > <scope>*compile*</scope> > --> > <!-- exclude the zookeeper package from storm-Kafka --> > <exclusions> > <exclusion> > <groupId>org.apache.zookeeper</groupId> > <artifactId>zookeeper</artifactId> > </exclusion> > </exclusions> > </dependency> > > <dependency> > <groupId>storm</groupId> > <artifactId>storm-kafka</artifactId> > <version>0.9.0-wip16a-scala292</version> > <!-- exclude the zookeeper package from > storm-Kafka --> > <exclusions> > <exclusion> > <groupId>org.apache.zookeeper</groupId> > <artifactId>zookeeper</artifactId> > </exclusion> > </exclusions> > </dependency> > > When I firstly running the kafkaSpout, I got stuck for a while, after > talking to developers here back and forth, I realize the version conflict > is really an issue we need to pay attention, you must be make the > zookeeper, storm, kafka version consistent, otherwise you will have > problem, or you need to exclude it in pom. > > Thanks > > Alec > > > On Wed, Aug 13, 2014 at 7:20 AM, siddharth ubale < > [email protected]> wrote: > >> hi , >> >> Just curious, did u face any isue with using kafka Spout if u did not use >> trident? >> Are u also able to implement the KafkaSpout packaged with Storm ? >> I am asking cos i am unable to use the kafkaSpout(SpoutConfig) to read >> from kafka topic. i am using kafka 8.1.1 and storm is 9.0.1 .... >> Can you lemme know about any issue u faced?? >> >> i get no error while i submitting my program but only a never ending >> sequence of the follwing: >> 68582 [Thread-33-words] INFO backtype.storm.daemon.executor - Processing >> received message source: __system:-1, stream: __metrics_tick, id: {}, [60] >> 68588 [Thread-33-words] INFO backtype.storm.daemon.task - Emitting: >> words __metrics [#<TaskInfo >> backtype.storm.metric.api.IMetricsConsumer$TaskInfo@a92bbf9> >> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, >> read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__complete-latency >> = {}]> #<DataPoint [__receive = {write_pos=3, read_pos=2, capacity=1024, >> population=1}]> #<DataPoint [kafkaOffset = {totalLatestTime=0, >> totalSpoutLag=0, totalLatestEmittedOffset=0}]> #<DataPoint >> [__transfer-count = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint >> [__emit-count = {}]>]] >> 68649 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Refreshing >> partition manager connections >> 68649 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Refreshing >> partition manager connections >> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Deleted >> partition managers: [] >> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - New partition >> managers: [] >> 68664 [Thread-33-words] INFO storm.kafka.ZkCoordinator - Finished >> refreshing >> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Deleted >> partition managers: [] >> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - New partition >> managers: [] >> 68665 [Thread-34-words] INFO storm.kafka.ZkCoordinator - Finished >> refreshing >> >> Thanks, >> Siddharth >> >> >> >> On Wed, Aug 13, 2014 at 5:33 AM, Sa Li <[email protected]> wrote: >> >>> Hi, All >>> >>> I am reading the messages from producer and print the "time" and >>> "userhostaddress", but I am getting such warning once in a while: >>> >>> 184.146.220.124 >>> 1403070062 >>> 24.79.224.172 >>> 1403070063 >>> 71.199.4.138 >>> 2644780 [Thread-16-spout0] WARN storm.artemis.kafka.KafkaUtils - No >>> data found in Kafka Partition partition_2 >>> 1403070064 >>> 172.4.221.83 >>> 2647191 [Thread-16-spout0] INFO >>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing >>> because 60000ms have expired >>> 2647195 [Thread-16-spout0] INFO >>> storm.artemis.kafka.DynamicBrokersReader - Read partition info from >>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, >>> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4= >>> 10.100.70.128:9092}} >>> 2648569 [Thread-8-$spoutcoord-spout0] INFO >>> storm.artemis.kafka.trident.ZkBrokerReader - brokers need refreshing >>> because 60000ms have expired >>> 2648573 [Thread-8-$spoutcoord-spout0] INFO >>> storm.artemis.kafka.DynamicBrokersReader - Read partition info from >>> zookeeper: GlobalPartitionInformation{partitionMap={0=10.100.70.128:9092, >>> 1=10.100.70.128:9092, 2=10.100.70.128:9092, 3=10.100.70.128:9092, 4= >>> 10.100.70.128:9092}} >>> 1403070068 >>> 24.85.157.225 >>> 1403070070 >>> 24.114.78.75 >>> 1403070070 >>> 76.219.198.176 >>> 1403070071 >>> 142.166.228.205 >>> 1403070071 >>> 76.66.155.166 >>> 1403070071 >>> 172.56.10.86 >>> 1403070071 >>> >>> . >>> . >>> . >>> It says brokers need refreshing because 60000ms have expired, I didn't >>> see any 60000ms being configured anywhere, I wonder what this issue is. In >>> addition, I started 3 kafka brokers, and 5 partitions for the "topictest", >>> >>> topic: topictest partition: 0 leader: 1 replicas: 1,3,2 >>> isr: 1,2,3 >>> topic: topictest partition: 1 leader: 1 replicas: 2,1,3 >>> isr: 1,2,3 >>> topic: topictest partition: 2 leader: 1 replicas: 3,2,1 >>> isr: 1,2,3 >>> topic: topictest partition: 3 leader: 1 replicas: 1,2,3 >>> isr: 1,2,3 >>> topic: topictest partition: 4 leader: 1 replicas: 2,3,1 >>> isr: 1,2,3 >>> >>> What I don't understand is that I couldn't see BrokerReader for the >>> brokers 9093 and 9094. And also what that means "No data found in Kafka >>> Partition partition_2". >>> >>> >>> thanks >>> >>> Alec >>> >>> >> >
