Hello Istvan,

I recommend limiting the input queue size that is used by the stage driver. The DedicatedThreadStageDriverFactory uses a blocking queue. If you are configuring your pipeline with a Digester configuration file the XML would look like this (look for the word "capacity" near the end):

<?xml version="1.0" encoding="UTF-8"?>

<!--
   Document   : conf_examplepipeline.xml
   Description:
       Configuration file for data loading pipeline
-->

<pipeline>
<driverFactory className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
       id="df1" faultToleranceLevel="ALL"/>
<!-- Find the data file(s), surveyData.YYYY-MM-DD.
        ** FAST STAGE **
        Input:  starting directory
        Output: Data File object
   -->
<!-- ================================================================== -->
   <stage className="org.apache.commons.pipeline.stage.FileFinderStage"
          filePattern="surveyData.*"
          driverFactoryId="df1" />
<feed>
       <value>/data/prod/ingestNow</value>
   </feed>

<!-- Unpack the data file. Each input file becomes a bunch of data bean objects.
        ** FAST STAGE **
        Input: Data File object
        Output: Data point object
   -->
<!-- ================================================================== -->
   <stage className="gov.noaa.eds.SurveyReaderStage"
          driverFactoryId="df1" />

   <!-- Write the data beans to the database
        ** SLOW STAGE - Set ArrayBlockingQueueFactory capacity to 10 **
The queueFactory must be a blocking type that implements the
        java.util.concurrent.BlockingQueue interface.
Input: Data point object
        Output: Data point object
   -->
<!-- ================================================================== -->
   <stage className="gov.noaa.eds.WriteSurveyToDatabaseStage"
          driverFactoryId="df1" >
       <property propName="queueFactory"
className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
                 capacity="10" fair="false"/>
   </stage>

</pipeline>

Alternatively, you could set up the driver factory with the same property so that all stages have the same input queue size:

   <driverFactory
className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory"
       id="df1" faultToleranceLevel="ALL">
       <property propName="queueFactory"
className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
                 capacity="10" fair="false"/>
   </driverFactory>

Aquator wrote:
Hi,

I am playing with pipeline, and I have some questions regarding to usage.

What is the suggested method of avoiding "traffic-jam" in the pipe? I mean, 
when a stage produces results fast, followed by a long-running stage. I will run out of 
stack space in large amount of input data.

Currently, my solution uses the context raise/registerListener methods. The slow stage 
notifies it's "feeder" stage, that new data can be processed. Is there any 
better ideas for this problem?

My other issue is about branches. Is there a way to attach separated branches 
together? For example, a stage needs input from two different branches. Is 
there any solution to apply a synchronized data flow? (Lets say I have two 
branches, one produces A-s, and the other produces B-s. I want a stage, that is 
being fed by those two branches, and produces a sequence of ABABAB...) Is there 
an implementation for such behaviour?

Normally each branch should carry just one type of data, otherwise all the stages on a branch of pipeline processing end up have the same conditional logic at the beginning to sort out what to do with the incoming objects. If you do need to combine information back into another branch, you should do it by raising events, which can bring along with them the data transport beans you wish combine. The receiving stage needs to register a listener to bring those in. I have some documentation on this, but it's not publicly available yet. If you want I can email you the HTML directly.
Finally, I am interested about the various StageDrivers. I'd like some more 
detailed informations
then the API. Especially usage advices, samples, to help choose the best 
stagedriver for the certain stages.

Thanks in advance for your time,
Istvan Cseh
______________________________________________________________________
Olcsó repülőjegyet mindenkinek!
Repjegyek a legjobb napi áron akár BBP és sztornó biztosítással is.
repulojegy.budavartours.hu


---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]




---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to