Also, Trident is batch-based, so in nextTuple() you should really emit batches of tuples, not single tuples, for maximum performance.
On Tue, Feb 18, 2014 at 12:47 AM, Danijel Schiavuzzi <[email protected]>wrote: > Exactly, you must never block in nextTuple(). > > Read your socket asynchronously (in a separate thread) and fill the data > into a queue. Then in nextTuple() emit data from that queue. > > On Tue, Feb 18, 2014 at 12:05 AM, Chen Wang <[email protected]>wrote: > >> If I remove the while loop in my spout, then everything works fine.(well >> except data loss in the spout..) >> >> >> On Mon, Feb 17, 2014 at 2:46 PM, Chen Wang <[email protected]>wrote: >> >>> Its just a BaseRichSpout. >>> >>> >>> On Mon, Feb 17, 2014 at 1:06 PM, Danijel Schiavuzzi >>> <[email protected]>wrote: >>> >>>> What interface does your spout implement? Is it IBatchSpout? >>>> >>>> >>>> On Mon, Feb 17, 2014 at 8:49 PM, Chen Wang >>>> <[email protected]>wrote: >>>> >>>>> Hi, >>>>> I might be missing something apparent. In my spout, next tuple method, >>>>> I have an infinite loop to read from our socket server: >>>>> >>>>> @Override >>>>> >>>>> public void nextTuple() { >>>>> >>>>> try { >>>>> >>>>> String line; >>>>> >>>>> while ((line = socket.readLine()) != null) { >>>>> >>>>> System.out.println("emitting new entries"); >>>>> >>>>> _collector.emit(new Values(line)); >>>>> >>>>> } >>>>> >>>>> } catch (Exception e) { >>>>> >>>>> // TODO >>>>> >>>>> // log the exception with Storm >>>>> >>>>> } >>>>> >>>>> } >>>>> >>>>> >>>>> Then in my trident topology, i am doing something like:(psedu code) >>>>> >>>>> topology.newStream( >>>>> >>>>> myspout). >>>>> >>>>> each(filter logic) >>>>> >>>>> .groupBy( mykey) >>>>> >>>>> .persistentAggregate( >>>>> >>>>> >>>>> When I run it in local mode, it seems that the trident will not catch >>>>> any tuple emitted from my spout. The topology will just spin on the spout >>>>> emitting tuples, while the rest of the topology never get executed. How >>>>> would I fix that? >>>>> >>>>> >>>>> Using normal bolt/topology will be able to receive the tuple. >>>>> >>>>> Thanks, >>>>> >>>>> Chen >>>>> >>>> >>>> >>>> >>>> -- >>>> Danijel Schiavuzzi >>>> >>> >>> >> > > > -- > Danijel Schiavuzzi > -- Danijel Schiavuzzi
