Exactly, you must never block in nextTuple(). Read your socket asynchronously (in a separate thread) and fill the data into a queue. Then in nextTuple() emit data from that queue.
On Tue, Feb 18, 2014 at 12:05 AM, Chen Wang <[email protected]>wrote: > If I remove the while loop in my spout, then everything works fine.(well > except data loss in the spout..) > > > On Mon, Feb 17, 2014 at 2:46 PM, Chen Wang <[email protected]>wrote: > >> Its just a BaseRichSpout. >> >> >> On Mon, Feb 17, 2014 at 1:06 PM, Danijel Schiavuzzi >> <[email protected]>wrote: >> >>> What interface does your spout implement? Is it IBatchSpout? >>> >>> >>> On Mon, Feb 17, 2014 at 8:49 PM, Chen Wang >>> <[email protected]>wrote: >>> >>>> Hi, >>>> I might be missing something apparent. In my spout, next tuple method, >>>> I have an infinite loop to read from our socket server: >>>> >>>> @Override >>>> >>>> public void nextTuple() { >>>> >>>> try { >>>> >>>> String line; >>>> >>>> while ((line = socket.readLine()) != null) { >>>> >>>> System.out.println("emitting new entries"); >>>> >>>> _collector.emit(new Values(line)); >>>> >>>> } >>>> >>>> } catch (Exception e) { >>>> >>>> // TODO >>>> >>>> // log the exception with Storm >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> Then in my trident topology, i am doing something like:(psedu code) >>>> >>>> topology.newStream( >>>> >>>> myspout). >>>> >>>> each(filter logic) >>>> >>>> .groupBy( mykey) >>>> >>>> .persistentAggregate( >>>> >>>> >>>> When I run it in local mode, it seems that the trident will not catch >>>> any tuple emitted from my spout. The topology will just spin on the spout >>>> emitting tuples, while the rest of the topology never get executed. How >>>> would I fix that? >>>> >>>> >>>> Using normal bolt/topology will be able to receive the tuple. >>>> >>>> Thanks, >>>> >>>> Chen >>>> >>> >>> >>> >>> -- >>> Danijel Schiavuzzi >>> >> >> > -- Danijel Schiavuzzi
