Hello Istvan,
I recommend limiting the input queue size that is used by the stage
driver. The DedicatedThreadStageDriverFactory uses a blocking queue. If
you are configuring your pipeline with a Digester configuration file the
XML would look like this (look for the word "capacity" near the end):
<?xml version="1.0" encoding="UTF-8"?>
<!--
Document : conf_examplepipeline.xml
Description:
Configuration file for data loading pipeline
-->
<pipeline>
<driverFactory
className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
id="df1" faultToleranceLevel="ALL"/>
<!-- Find the data file(s), surveyData.YYYY-MM-DD.
** FAST STAGE **
Input: starting directory
Output: Data File object
-->
<!--
================================================================== -->
<stage className="org.apache.commons.pipeline.stage.FileFinderStage"
filePattern="surveyData.*"
driverFactoryId="df1" />
<feed>
<value>/data/prod/ingestNow</value>
</feed>
<!-- Unpack the data file. Each input file becomes a bunch of data
bean objects.
** FAST STAGE **
Input: Data File object
Output: Data point object
-->
<!--
================================================================== -->
<stage className="gov.noaa.eds.SurveyReaderStage"
driverFactoryId="df1" />
<!-- Write the data beans to the database
** SLOW STAGE - Set ArrayBlockingQueueFactory capacity to 10 **
The queueFactory must be a blocking type that implements the
java.util.concurrent.BlockingQueue interface.
Input: Data point object
Output: Data point object
-->
<!--
================================================================== -->
<stage className="gov.noaa.eds.WriteSurveyToDatabaseStage"
driverFactoryId="df1" >
<property propName="queueFactory"
className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
capacity="10" fair="false"/>
</stage>
</pipeline>
Alternatively, you could set up the driver factory with the same
property so that all stages have the same input queue size:
<driverFactory
className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
id="df1" faultToleranceLevel="ALL">
<property propName="queueFactory"
className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
capacity="10" fair="false"/>
</driverFactory>
Aquator wrote:
Hi,
I am playing with pipeline, and I have some questions regarding to usage.
What is the suggested method of avoiding "traffic-jam" in the pipe? I mean,
when a stage produces results fast, followed by a long-running stage. I will run out of
stack space in large amount of input data.
Currently, my solution uses the context raise/registerListener methods. The slow stage
notifies it's "feeder" stage, that new data can be processed. Is there any
better ideas for this problem?
My other issue is about branches. Is there a way to attach separated branches
together? For example, a stage needs input from two different branches. Is
there any solution to apply a synchronized data flow? (Lets say I have two
branches, one produces A-s, and the other produces B-s. I want a stage, that is
being fed by those two branches, and produces a sequence of ABABAB...) Is there
an implementation for such behaviour?
Normally each branch should carry just one type of data, otherwise all
the stages on a branch of pipeline processing end up have the same
conditional logic at the beginning to sort out what to do with the
incoming objects. If you do need to combine information back into
another branch, you should do it by raising events, which can bring
along with them the data transport beans you wish combine. The receiving
stage needs to register a listener to bring those in. I have some
documentation on this, but it's not publicly available yet. If you want
I can email you the HTML directly.
Finally, I am interested about the various StageDrivers. I'd like some more
detailed informations
then the API. Especially usage advices, samples, to help choose the best
stagedriver for the certain stages.
Thanks in advance for your time,
Istvan Cseh
______________________________________________________________________
Olcsó repülőjegyet mindenkinek!
Repjegyek a legjobb napi áron akár BBP és sztornó biztosítással is.
repulojegy.budavartours.hu
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]