Exactly, you must never block in nextTuple().

Read your socket asynchronously (in a separate thread) and fill the data
into a queue. Then in nextTuple() emit data from that queue.

On Tue, Feb 18, 2014 at 12:05 AM, Chen Wang <[email protected]>wrote:

> If I remove the while loop in my spout, then everything works fine.(well
> except data loss in the spout..)
>
>
> On Mon, Feb 17, 2014 at 2:46 PM, Chen Wang <[email protected]>wrote:
>
>> Its just a BaseRichSpout.
>>
>>
>> On Mon, Feb 17, 2014 at 1:06 PM, Danijel Schiavuzzi 
>> <[email protected]>wrote:
>>
>>> What interface does your spout implement? Is it IBatchSpout?
>>>
>>>
>>> On Mon, Feb 17, 2014 at 8:49 PM, Chen Wang 
>>> <[email protected]>wrote:
>>>
>>>> Hi,
>>>> I might be missing something apparent. In my spout, next tuple method,
>>>> I have an infinite loop to read from our socket server:
>>>>
>>>> @Override
>>>>
>>>> public void nextTuple() {
>>>>
>>>>  try {
>>>>
>>>>  String line;
>>>>
>>>>  while ((line = socket.readLine()) != null) {
>>>>
>>>>   System.out.println("emitting new entries");
>>>>
>>>>   _collector.emit(new Values(line));
>>>>
>>>>  }
>>>>
>>>>  } catch (Exception e) {
>>>>
>>>>  // TODO
>>>>
>>>>  // log the exception with Storm
>>>>
>>>>  }
>>>>
>>>> }
>>>>
>>>>
>>>> Then in my trident topology, i am doing something like:(psedu code)
>>>>
>>>> topology.newStream(
>>>>
>>>> myspout).
>>>>
>>>>   each(filter logic)
>>>>
>>>> .groupBy( mykey)
>>>>
>>>>   .persistentAggregate(
>>>>
>>>>
>>>> When I run it in local mode, it seems that the trident will not catch
>>>> any tuple emitted from my spout. The topology will just spin on the spout
>>>> emitting tuples, while the rest of the topology never get executed. How
>>>> would I fix that?
>>>>
>>>>
>>>> Using normal bolt/topology will be able to receive the tuple.
>>>>
>>>> Thanks,
>>>>
>>>> Chen
>>>>
>>>
>>>
>>>
>>> --
>>> Danijel Schiavuzzi
>>>
>>
>>
>


-- 
Danijel Schiavuzzi

Reply via email to