[ 
https://issues.apache.org/jira/browse/PIG-94?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12569435#action_12569435
 ] 

acmurthy edited comment on PIG-94 at 3/4/08 12:06 AM:
-----------------------------------------------------------

I've put up a first cut here: http://wiki.apache.org/pig/PigStreamingDesign.

Summary:

----

Pig Streaming 1.0 - Design

The main goal of Pig-Streaming 1.0 is to support a form of processing in which 
the entire portion of the dataset that corresponds to a task in sent to the 
task and output streams out. There is no temporal or causal correspondence 
between an input record and specific output records.

This document specs out the high-level design of how Pig will support the 
Streaming concept. It builds off the functional spec documented at: 
http://wiki.apache.org/pig/PigStreamingFunctionalSpec.

Main Components: 
1. User-facing changes (e.g. Pig Latin) 
2. Logical Layer 
3. Physical Layer 
4. Streaming Implementation


1. User-facing changes

The main changes include the addition of the new STREAM operator and the 
enhancement of the DEFINE operator to allow alias-ing the actual command to 
which data is streamed. (See the wiki for details.)

There are two affected components:

a) QueryParser

Straight-forward changes to QueryParser include parsing the STREAM operator and 
then save relevant details in a StreamEvalSpec. StreamEvalSpec is a sub-class 
of org.apache.pig.impl.eval.EvalSpec; and it works similar to other Eval 
operators (FILTER|FOREACH) in the sense that it just takes a bag of tuples and 
does one operation on each tuple. It also ensures that the STREAM operator can 
be _chained_ with other Evals in exactly the same manner as in Pig today (by 
constructing CompositeEvalSpecs).

StreamEvalSpec also contains necessary details such as: 
i. Actual _command_ and it's arguments, if any. 
ii. Information about the _ship-spec_ and _cache-spec_ which will go through 
Hadoop's DistributedCache. 
iii. Serializer/Deserializer information.

b) PigScriptParser

The PigScriptParser also needs to be enhanced to enable it to process the newer 
constructs supported by the DEFINE operator. The one change we need to make to 
PigContext is to add a PigContext.registerStreamingCommand api to enable the 
PigScriptParser to store the streaming command and relevant information to be 
passed along to QueryParser and other components.

Summary:

New:
      StreamEvalSpec.java (extends EvalSpec)

Modify:
      QueryParser.jjt 
      PigScriptParser.jj 
      PigContext.java (add registerStreamingCommand)

2. Logical Layer

Since 'streaming' is an eval on each record in the dataset, it should still be 
a logical Eval operator i.e. LOEval should suffice for streaming operations too.

3. Physical Layer

Pig's MapReduce physical layer shouldn't be affected at all, since the 
StreamEvalSpec neatly fits into the map/reduce pipeline as another 
CompositeEvalSpec. (StreamEvalSpec.setupDefaultPipe is the critical knob.)

4. Streaming Implementation

The main infrastructure to support the notion of data processing by sending 
dataset to a task's input and collecting its output is a generic manager who 
takes care of setup/teardown of the streaming task, manages it's 
stdin/stderr/stdout streams and also does post-processing. The plan is to 
implement a org.apache.pig.backend.streaming.PigExecutableManager to take over 
the aforementioned responsibilities. The decision to keep that separate from 
Hadoop's Streaming component (in contrib/streaming) to ensure that Pig has no 
extraneous dependency on Hadoop streaming.

The PigExecutableManager also is responsible for dealing with multiple outputs 
of the streaming tasks (refer to the functional spec in the wiki).

New:
      org.apache.pig.backend.streaming.PigExecutableManager

{noformat}
class PigExecutableManager {

    // Configure the executable-manager
    void configure() throws IOException;

    // Runtime
    void run() throws IOException;

    // Clean-up hook (for e.g. multiple outputs' handling etc.)
    void close() throws IOException;

    // Send the Datum to the executable
    void add(Datum d);
  }
{noformat}

The important deviation from current Pig infrastructure is that there isn't a 
one-to-one mapping between inputs and output records anymore since the 
user-script could (potentially) consume all the input before it emits any 
output records. 
The way to get around this is to wrap the DataCollector and hence the next 
successor in the pipleline in an OutputCollector and pass it along to the 
ExecutableManager.

      was (Author: acmurthy):
    I've put up a first cut here: http://wiki.apache.org/pig/PigStreamingDesign.

Summary:

----

Pig Streaming 1.0 - Design

The main goal of Pig-Streaming 1.0 is to support a form of processing in which 
the entire portion of the dataset that corresponds to a task in sent to the 
task and output streams out. There is no temporal or causal correspondence 
between an input record and specific output records.

This document specs out the high-level design of how Pig will support the 
Streaming concept. It builds off the functional spec documented at: 
http://wiki.apache.org/pig/PigStreamingFunctionalSpec.

Main Components: 
1. User-facing changes (e.g. Pig Latin) 
2. Logical Layer 
3. Physical Layer 
4. Streaming Implementation


1. User-facing changes

The main changes include the addition of the new STREAM operator and the 
enhancement of the DEFINE operator to allow alias-ing the actual command to 
which data is streamed. (See the wiki for details.)

There are two affected components:

a) QueryParser

Straight-forward changes to QueryParser include parsing the STREAM operator and 
then save relevant details in a StreamEvalSpec. StreamEvalSpec is a sub-class 
of org.apache.pig.impl.eval.EvalSpec; and it works similar to other Eval 
operators (FILTER|FOREACH) in the sense that it just takes a bag of tuples and 
does one operation on each tuple. It also ensures that the STREAM operator can 
be _chained_ with other Evals in exactly the same manner as in Pig today (by 
constructing CompositeEvalSpecs).

StreamEvalSpec also contains necessary details such as: 
i. Actual _command_ and it's arguments, if any. 
ii. Information about the _ship-spec_ and _cache-spec_ which will go through 
Hadoop's DistributedCache. 
iii. Serializer/Deserializer information.

b) PigScriptParser

The PigScriptParser also needs to be enhanced to enable it to process the newer 
constructs supported by the DEFINE operator. The one change we need to make to 
PigContext is to add a PigContext.registerStreamingCommand api to enable the 
PigScriptParser to store the streaming command and relevant information to be 
passed along to QueryParser and other components.

Summary:

New:
      StreamEvalSpec.java (extends EvalSpec)

Modify:
      QueryParser.jjt 
      PigScriptParser.jj 
      PigContext.java (add registerStreamingCommand)

2. Logical Layer

Since 'streaming' is an eval on each record in the dataset, it should still be 
a logical Eval operator i.e. LOEval should suffice for streaming operations too.

3. Physical Layer

Pig's MapReduce physical layer shouldn't be affected at all, since the 
StreamEvalSpec neatly fits into the map/reduce pipeline as another 
CompositeEvalSpec. (StreamEvalSpec.setupDefaultPipe is the critical knob.)

4. Streaming Implementation

The main infrastructure to support the notion of data processing by sending 
dataset to a task's input and collecting its output is a generic manager who 
takes care of setup/teardown of the streaming task, manages it's 
stdin/stderr/stdout streams and also does post-processing. The plan is to 
implement a org.apache.hadoop.mapred.lib.external.ExecutableManager to take 
over the aforementioned responsibilities. The decision to keep that separate 
from Hadoop's Streaming component (in contrib/streaming) to ensure that Pig has 
no extraneous dependency on Hadoop streaming and am putting it into 
org.apache.hadoop.mapred.lib to ensure Pig depends on Hadoop Core only.

The ExecutableManager also is responsible for dealing with multiple outputs of 
the streaming tasks (refer to the functional spec in the wiki).

New:
      org.apache.hadoop.mapred.lib.external.ExecutableManager

{noformat}
  class org.apache.hadoop.mapred.lib.external.ExecutableManager {
    void configure() throws IOException;
    void run() throws IOException;
    void close() throws IOException;
    void next(WritableComparable key, Writable value);
  }
{noformat}

The important deviation from current Pig infrastructure is that there isn't a 
one-to-one mapping between inputs and output records anymore since the 
user-script could (potentially) consume all the input before it emits any 
output records. 
The way to get around this is to wrap the DataCollector and hence the next 
successor in the pipleline in an OutputCollector and pass it along to the 
ExecutableManager.
  
> Pig Streaming functional spec proposal
> --------------------------------------
>
>                 Key: PIG-94
>                 URL: https://issues.apache.org/jira/browse/PIG-94
>             Project: Pig
>          Issue Type: New Feature
>            Reporter: Olga Natkovich
>
> This issue is for discussion about Pig streaming functional spec.
> http://wiki.apache.org/pig/PigStreamingFunctionalSpec

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to