Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 

The following page has been changed by Arun C Murthy:

The comment on the change is:
First-cut of Pig Streaming design

New page:
= Pig Streaming 1.0 - Design =

The main goal of Pig-Streaming 1.0 is to support a form of processing in which 
the entire portion of the dataset that corresponds to a task in sent to the 
task and output streams out. There is no temporal or causal correspondence 
between an input record and specific output records.
This document specs out the high-level design of how Pig will support the 
''Streaming'' concept. It builds off the functional spec documented at: 


Main Components:
1. User-facing changes (e.g. Pig Latin)
2. Logical Layer
3. Physical Layer
4. Streaming Implementation

== 1. User-facing changes ==

The main changes include the addition of the new '''STREAM''' operator and the 
enhancement of the DEFINE operator to allow alias-ing the actual command to 
which data is streamed. (See the wiki for details.)

There are two affected components:

a) `QueryParser`

Straight-forward changes to `QueryParser` include parsing the STREAM operator 
and then save relevant details in a '''StreamEvalSpec'''. `StreamEvalSpec` is a 
sub-class of `org.apache.pig.impl.eval.EvalSpec`; and it works similar to other 
Eval operators (FILTER|FOREACH) in the sense that it just takes a bag of tuples 
and does one operation on each tuple. It also ensures that the STREAM operator 
can be _chained_ with other Evals in exactly the same manner as in Pig today 
(by constructing `CompositeEvalSpecs`).

StreamEvalSpec also contains necessary details such as:
i. Actual _command_ and it's arguments if any.
ii. Information about the _ship-spec_ and _cache-spec_ which will go through 
Hadoop's `DistributedCache`.
iii. `Serializer`/`Deserializer` information.

b) `PigScriptParser`

The `PigScriptParser` also needs to be enhanced to enable it to process the 
newer constructs supported by the DEFINE operator. The one change we need to 
make to `PigContext` is to add a ''PigContext.registerStreamingCommand'' api to 
enable the `PigScriptParser` to store the streaming command and relevant 
information to be passed along to `QueryParser` and other components.


  {{{}}} (extends `EvalSpec`)
  {{{ (add registerStreamingCommand)}}}

== 2. Logical Layer ==

Since 'streaming' ''is'' an eval on each record in the dataset, it should still 
be a logical Eval operator i.e. `LOEval` should suffice for streaming 
operations too.

== 3. Physical Layer ==

Pig's MapReduce physical layer shouldn't be affected at all, since the 
`StreamEvalSpec` neatly fits into the map/reduce pipeline as another 
`CompositeEvalSpec`. (`StreamEvalSpec.setupDefaultPipe` is the critical knob.)

== 4. Streaming Implementation ==

The main infrastructure to support the notion of data processing by sending 
dataset to a task's input and collecting its output is a generic manager who 
takes care of setup/teardown of the streaming task, manages it's 
stdin/stderr/stdout streams and also does post-processing. The plan is to 
implement a {{{org.apache.hadoop.mapred.lib.external.ExecutableManager}}} to 
take over the aforementioned responsibilities. The decision to keep that 
separate from Hadoop's Streaming component (in contrib/streaming) to ensure 
that Pig has no extraneous dependency on Hadoop streaming and am putting it 
into org.apache.hadoop.mapred.lib to ensure Pig depends on Hadoop Core only.

The `ExecutableManager` also is responsible for dealing with multiple outputs 
of the streaming tasks (refer to the functional spec in the wiki).


  class org.apache.hadoop.mapred.lib.external.ExecutableManager {
    void setup() throws Exception;
    void teardown() throws Exception;
    void setInput(Writable w);
    Writable getOutput();

The important deviation from current Pig infrastructure is that there isn't a 
one-to-one mapping between inputs and output records anymore since the 
user-script could (potentially) consume ''all'' the input before it emits 
''any'' output records. Hence, StreamEvalSpec.add will call 
{{{ExecutableManager.setInput((Writable)(d))}}}; while it collects output from 
the task ({{{ExecutableManager.getOutput()}}}) and pass it along for the next 
''eval'' in the pipeline.

Reply via email to