If I remove the while loop in my spout, then everything works fine.(well except data loss in the spout..)
On Mon, Feb 17, 2014 at 2:46 PM, Chen Wang <[email protected]>wrote: > Its just a BaseRichSpout. > > > On Mon, Feb 17, 2014 at 1:06 PM, Danijel Schiavuzzi <[email protected]>wrote: > >> What interface does your spout implement? Is it IBatchSpout? >> >> >> On Mon, Feb 17, 2014 at 8:49 PM, Chen Wang <[email protected]>wrote: >> >>> Hi, >>> I might be missing something apparent. In my spout, next tuple method, I >>> have an infinite loop to read from our socket server: >>> >>> @Override >>> >>> public void nextTuple() { >>> >>> try { >>> >>> String line; >>> >>> while ((line = socket.readLine()) != null) { >>> >>> System.out.println("emitting new entries"); >>> >>> _collector.emit(new Values(line)); >>> >>> } >>> >>> } catch (Exception e) { >>> >>> // TODO >>> >>> // log the exception with Storm >>> >>> } >>> >>> } >>> >>> >>> Then in my trident topology, i am doing something like:(psedu code) >>> >>> topology.newStream( >>> >>> myspout). >>> >>> each(filter logic) >>> >>> .groupBy( mykey) >>> >>> .persistentAggregate( >>> >>> >>> When I run it in local mode, it seems that the trident will not catch >>> any tuple emitted from my spout. The topology will just spin on the spout >>> emitting tuples, while the rest of the topology never get executed. How >>> would I fix that? >>> >>> >>> Using normal bolt/topology will be able to receive the tuple. >>> >>> Thanks, >>> >>> Chen >>> >> >> >> >> -- >> Danijel Schiavuzzi >> > >
