This is the collection type we are using for sharing between the Spout class and the thread that it spawns.
protected transient SortedSet<OurKeyObject> expiredKeysSet = Collections.synchronizedSortedSet(new TreeSet<OurKeyObject>()); In the spout open() method we call: expiredStateWorker = new ExpiredStateDBPollerWorker(spoutConfig); expiredStateWorker.setSharedSet(expiredKeysSet); workerThread = new Thread(expiredStateWorker, "Expired State Worker Thread"); workerThread.start(); where ExpiredStateDBPollerWorker's run method is a loop that does its processing and a Thread.sleep(). There is code in the loop checking for things like if the spout's stop/close methods have been called to clean up and end... also we have checks to make sure the worker thread doesn't endlessly fill the collection if the spout isn't pulling things off on the other end. Hope this helps. On Mon, Sep 21, 2015 at 9:52 AM, Nick R. Katsipoulakis < [email protected]> wrote: > Stephen, > > I do not have any problems spawning the thread inside the spout thread. > The only issue is that the spawned thread does not seem to run most of the > time. > > To give you more details, I have the spawned thread read tuples from a > file and adding them to an ArrayBlockingQueue, so that data are available > (cached in-memory) for the main Spout thread to emit. Unfortunately, the > spawned thread does not seem to fill out the buffer as fast as it should. > Therefore, the spout thread blocks and does not send as many tuples as it > needs to. > > Do you have similar performance problems? > > Thank you, > Nick > > On Fri, Sep 18, 2015 at 4:16 PM, Stephen Powis <[email protected]> > wrote: > >> Reading the documentation on spouts: >> https://storm.apache.org/documentation/Concepts.html#spouts We took the >> following sentence -- "It is imperative that nextTuple does not block >> for any spout implementation, because Storm calls all the spout methods on >> the same thread." -- to indicate it would be a good idea to fire up a >> separate thread in our custom spouts in what sounds like a similar way to >> you. So far we haven't had any issues doing this. >> >> >> On Fri, Sep 18, 2015 at 9:51 AM, Nick R. Katsipoulakis < >> [email protected]> wrote: >> >>> Hello all, >>> >>> I have spouts that read input from files and send the data inside my >>> topology. In order to achieve higher input rates, I do some buffering of >>> data, by having them read by a thread, spawned after the spout is initiated >>> (in the open() function). The data are stored in an ArrayBlockingQueue of >>> fixed size. >>> >>> Unfortunately, it seems that the thread is starving and does not execute >>> as it would in a stand-alone JVM. >>> >>> First of all, is my approach (spawning a thread in my spout) considered >>> a good practice for Storm? If not, how else do you suggest I could overcome >>> the IO delay from reading the data directly from the file. >>> >>> Thank you, >>> Nick >>> >> >> > > > -- > Nikolaos Romanos Katsipoulakis, > University of Pittsburgh, PhD student >
