Hi Chen,
You should remove the while() loop from nextTuple(),initialise the socket
opening code in your Spout's prepare() method(which gets called only once)
and in your nextTuple() you should have something like this:
String line;
if(line = socket.readLine()) != null) {
_collector.emit(new Values(line));
}
....
..
And,if you want to batch the tuples before emitting,you can maintain a
counter at class level,which gets incremented on every line!=null check in
nextTuple() and you emit only when that count is = BATCH_SIZE; e.g.
String line;
if(line = socket.readLine()) != null) {
.......Add the *line *to a List,etc.
lineCount++;
if(lineCount==BATCH_SIZE){
_collector.emit(new Values(line));
lineCount=0;
}
}
Hope this helps.
On Tue, Feb 18, 2014 at 1:19 AM, Chen Wang <[email protected]>wrote:
> Hi,
> I might be missing something apparent. In my spout, next tuple method, I
> have an infinite loop to read from our socket server:
>
> @Override
>
> public void nextTuple() {
>
> try {
>
> String line;
>
> while ((line = socket.readLine()) != null) {
>
> System.out.println("emitting new entries");
>
> _collector.emit(new Values(line));
>
> }
>
> } catch (Exception e) {
>
> // TODO
>
> // log the exception with Storm
>
> }
>
> }
>
>
> Then in my trident topology, i am doing something like:(psedu code)
>
> topology.newStream(
>
> myspout).
>
> each(filter logic)
>
> .groupBy( mykey)
>
> .persistentAggregate(
>
>
> When I run it in local mode, it seems that the trident will not catch any
> tuple emitted from my spout. The topology will just spin on the spout
> emitting tuples, while the rest of the topology never get executed. How
> would I fix that?
>
>
> Using normal bolt/topology will be able to receive the tuple.
>
> Thanks,
>
> Chen
>