Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 

The following page has been changed by OlgaN:

New page:
= Pig Streaming Functional Specification =

Streaming can have three separate meaning in the context of Pig project:

 1. A specific way of submitting jobs to Hadoop: Hadoop Streaming
 2. A form of processing in which the entire portion of the dataset that 
corresponds to a task in sent to the task and output streams out. There is no 
temporal or causal correspondence between an input record and specific output 
 3. The use of non-Java functions with Hadoop.

The goal of Pig with respect to streaming is to support #2 for (a)Java UDFs, 
(b)non-Java UDFs and (c)user specified binaries/scripts. We will start with (c) 
since it would be most beneficial for the users. It is not our goal to be 
feature-by-feature compatible with Hadoop streaming as it is too open-ended and 
might force us to implement features that we don't necessarily want in Pig.

This document proposes the functional specification to fullfil 2.c

== Feature Specification ==

=== 1 Computation Specification ===

==== 1.1 Ability to specify streaming via binary/script. ====

==== 1.2 Ability to specify parameters to streaming ====

==== 1.3 Separation of streaming specification from stream operator. ====

For simple specifications, it is convenient for the user to be able to specify 
the computation directly within streaming operator:

A = load 'data';
B = stream A through ``;

However, for more complex specifications involving serializer/deserializer or 
input/output via file, the user will be required to use `define` command:

<define command> ::= define <alias> <computation spec>
<alias> ::= pig identifier
<comparison spec> ::= <UDF spec> | <command spec>
<UDF spec> ::= pig standard function spec
<command spec> ::= `<command>` [<input spec>] [<output spec>] [<ship_spec>] 
<command> ::= standard Unix command including the arguments
<input spec>::= input (<input stream spec> [using <serializer>]{, <input stream 
spec> [using <serializer>]})
<output spec>::= output (<output stream spec> [using <deserializer>]{, <output 
stream spec> [using <deserializer>]})
<ship spec>::=ship(<file spec>{,<file spec>})
<cache spec>::=cache(<file spec>{,<file spec>})
<input stream spec> ::= stdin | <file spec>
<output stream spec> ::= stdout | <file spec>
<file spec> ::= unix file path
<serializer> ::= <udf spec>
<deserializer> ::= <udf spec>

Note that we use backticks to enclose the streaming command. This is because we 
want to reserve quotes for use within the command without need to escape. 
Backticks should be reasonably intuitive for a user since in Unix they are 
associated with notion of "execute this".

Given the definition above, the stream command will look as follows:

<stream command>::= <alias> = stream <alias>{,<alias>} through <streaming spec> 
[<schema spec>]
<streaming spec>::= <alias> | <UDF spec> | <command>
<schema spec>:: standard pig schema spec like as (x,yz)

Within stream operator, If streaming specification is enclosed in backticks, it 
is assumed to be a streaming command specified inline. Otherwise, the string is 
looked up in the alias hash and if found assumed to be an alias. Otherwise, an 
error is reported.

A = load 'data';
define cmd `stream_cmd -input file.dat`
S1 = stream A through cmd;

More examples in the later sections.

==== 1.4 Data Guarantees ====

The guarantees made to the user will be determined based on the position of the 
streaming operator in the pig script. The following types will be supported:

   * *unordered* - no guarantees on the order in which the data is delivered to 
the streaming application
   * *grouped* - the data for the same key is guaranteed to be processed 
contiguously on a single node
   * *grouped and ordered* - date is grouped and sorted within a group on user 
specified key.

In addition to position, the data grouping and ordering can be determine by the 
data itself. For now, users would need to know the property of the data to be 
able to take advantage of its structure; however, eventually, this should be 
part of metadata.

'''Example 1''': The data given to streaming is _unordered_

A = load 'data';
B = stream A through ``;

'''Example 2''': The data is _grouped_

A = load 'data';
B = group A by $1;
C = foreach B flatten(A);
D = stream C through ``

'''Example 3''': data is _grouped and ordered_

A = load 'data';
B = group A by $1;
C = foreach B {
    D = order A BY ($3, $4);
    generate D;
stream C through ``

=== 2 Job Management ===

==== 2.1 Ability to ship computation and support data ====

We need to be able to ship streaming binary and supporting files, if any, from 
the client box to the computation nodes. 

A user can specify the files to ship via `ship` clause in the define. For 

define X ` foo.txt` ship('', 'foo.txt')

If `ship` and `cache` options are not specified, pig will attempt to ship the 
binary in the following way:

   * If the first word on the streaming command is `perl` or `python`, pig 
would assume that the binary is the first string it encounters that does not 
start with dash.
   * Otherwise, pig will attempt to ship the first string from the command line 
as long as it does not come from `/bin, /user/bin, /user/local/bin, 
/home/y/bin`. It will determine that by scanning the path if an absolute path 
is provided or by executing `which`.

To prevent a command from being shipped, an empty list can be passed to 

==== 2.2 Ability to cache data ====

The approach described above works fine for binaries/jars and small data sets. 
For larger datasets, loading them at run time for every execution can have 
serious performance consequences. 

Similarly to 2.1, a user will be able to specify files to cache via =cache= 
clause in the define statement. For instance,

define X ` foo.gz` ship('') cache('foo.gz')

=== 3 Input/Output Handling ===

==== 3.1 Data serialization/deserialization ====

By default, the data going into the streaming command and the one coming out is 
assumed to be tab delimited.

S = stream A through ``;

In the example above, the elements of A are concatenated with tabs and passed 
to ``. The output of streaming is processed one line at a time and 
split on tabs. The user would be able to provide an alternative delimiter to 
default (de)serializer via `define command`:

define X `` input(stdin using DefaultSerializer('^A')) output (stdout 
using DefaultDeserializer('^A'));
S = stream A through X;

User will be able to provide custom serializer and deserializer. This would 
look something like example below and is similar to load/store commands. In 
fact, load function is identical to the deserializer and store to serializer.

define X `` input(stdin using MySerializer) output (stdout using 
S = stream A through X;

The following serializers/deserializer will be part of pig distribution:

 1. !DefaultSerializer, !DefaultDeserializer as described above
 2. !FlattenSerializer - it would take a bag and flatten it before passing it 
to streaming application.
 3. !PythonSerializer, !PythonDeserializer 

==== Ability to declare schema for streaming output ====

Just like the case with load, rather than relying on the position, the user can 
give names to the output columns:

S = stream A through `` as (a, b ,c);

==== 3.3 Ability to let streaming read from file(s) on disk ====

This is useful in two cases:

 1. To use existing binaries that expect input in a file rather than stdin.
 2. To provide multiple inputs.

In the initial release, we would only address the first case. Support for 
multiple inputs would be delayed till the later versions.

In case of a single input coming from the file the command would look as follows

define Y ` foo` input('foo' using MySerializer) output (stdout using 
Z = stream X through Y;

This statement will cause the content of `X` to be written to file `foo` using 

==== 3.4 Ability to write output(s) to the disk ====

The motivation here is similar for ability to read data from disk described in 

 1. To accommodate existing applications that write output to the disk rather 
than stdout.
 2. To provide ability to create multiple outputs.

In the initial release, only limitted support for multiple outputs would be 
provided as described below.

To accommodate the first case, the following command can be used:

define Z ` outputfile` output('outputfile' using MyDeserializer);
Y = stream X through Z;

This tells pig that streaming application stored its complete output into file 
called _outputfile_ and that the content of that file should be serialized into 
Y using !MySerializer. (The `outputfile` is written by `` by using 
standard open/write/close functions. Pig might need to capture the data and 
store them as part files in DFS. )

A user can specify multiple outputs but only the first one will be 
automatically loaded; the rest would be stored in dfs using the file name 
specified in the output as absolute path:

define A `` output('output1', '/data/mydata/output2' using 
Y = stream X through A;

In this example, output1 would loaded into Y while the second output will be 
stored in DFS in the file called  '/data/mydata/output2'.

=== 4 Non-streaming Features ===

This section describes a set of features that are not directly related to 
streaming but would be very useful in the context of streaming.

==== 4.1 Logging/Debugging/Error handling ====

===== 4.1.1 Logging =====

stderr of streaming application needs to be captured and presented to the user 
in a easily digestable format. The user will be presented the output of each 
streaming task separately with the header that includes the following 
information:  task name, task result code, start time, end time.

===== 4.1.2 Error Handling =====

If streaming application fails, pig will report the error including return code 
and any stderr reported by the streaming application.

If available, it would also report on the amount of data consumed so far.

==== 4.2 Ability to templatize pig scripts ====

See for details 

==== 4.3 Ability to processing binary data ====

Sometimes, applications need to consume the entire data file without any 
parsing. All we would need in this case is to provide a custom loader function 
that just reads the entire data.

A = load 'data' using AsIsLoader();
B = stream A by ``

==== 4.3 Working with scalars ====

Several users need to have a way to compute a scalar using pig and use it in 
later computations.

The want to be able to do something like this:

A = load 'data1';
B = group A all;
C = foreach B generate COUNT(B);
D = load 'data2';
E = foreach D generate $1/C;

This does not work in pig because C is a relation not a scalar. Proposed short 
term solution is to do the following:

A = load 'data1';
B = group A all;
C = foreach B generate COUNT(B);
store C into 'count';
D = load 'data2';
E = foreach D generate $1/GetScalar(C);

`GetScalar` is a udf that reads the file on the first invocation and produces 
its content in the form of a value. This UDF will be provided as part of pig 
distribution till better solution is available.

=== 5 Performance ===

We should have a performance target in mind as compared to Hadoop streaming. I 
think for the initial release it would make sense to aim for '''30%''' overhead 
for streaming in Pig.

The following optimizations were suggested:

Reply via email to