[ 
https://issues.apache.org/jira/browse/PIG-1285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12845458#action_12845458
 ] 

Pradeep Kamath commented on PIG-1285:
-------------------------------------

Couple of comments:
 * I think instead of the code below the implementation of write should be 
inlined into SingleTupleBag.write() (I guess DefaultDataBag.write() and 
SingleTupleBag.write() could call a common method to implement write()).
{noformat}
+        DataBag bag = bagFactory.newDefaultBag();
+        bag.addAll(this);
+        bag.write(out)
{noformat}

The reason is that bagFactory.newDefaultBag() registers the bag with the 
SpillableMemoryManager which inturn puts a weak reference to the bag on a 
Linked list - in the past we have seen this list grow in size and cause memory 
issue and was one of the main motivations for creating SingleTupleBag.

 * There is an implementation for write() but not read() - reading through the 
code I guess this is because during deserialization SingleTupleBag.read() will 
not be called but DefaultDataBag.read() would be called. I am wondering if 
leaving the SingleTupleBag.read() as-is is confusing since it throws an 
exception with the message - "SingleTupleBag should never be serialized or 
deserialized."


> Allow SingleTupleBag to be serialized
> -------------------------------------
>
>                 Key: PIG-1285
>                 URL: https://issues.apache.org/jira/browse/PIG-1285
>             Project: Pig
>          Issue Type: Improvement
>            Reporter: Dmitriy V. Ryaboy
>            Assignee: Dmitriy V. Ryaboy
>             Fix For: 0.7.0
>
>         Attachments: PIG-1285.patch
>
>
> Currently, Pig uses a SingleTupleBag for efficiency when a full-blown 
> spillable bag implementation is not needed in the Combiner optimization.
> Unfortunately this can create problems. The below Initial.exec() code fails 
> at run-time with the message that a SingleTupleBag cannot be serialized:
> {code}
> @Override
> public Tuple exec(Tuple in) throws IOException {
>       // single record. just copy.
>       if (in == null) return null;   
>       try {
>          Tuple resTuple = tupleFactory_.newTuple(in.size());
>          for (int i=0; i< in.size(); i++) {
>            resTuple.set(i, in.get(i));
>         }
>         return resTuple;
>        } catch (IOException e) {
>          log.warn(e);
>          return null;
>       }
>     }
> {code}
> The code below can fix the problem in the UDF, but it seems like something 
> that should be handled transparently, not requiring UDF authors to know about 
> SingleTupleBags.
> {code}
> @Override
> public Tuple exec(Tuple in) throws IOException {
>       // single record. just copy.
>       if (in == null) return null;   
>       
>       /*
>        * Unfortunately SingleTupleBags are not serializable. We cache whether 
> a given index contains a bag
>        * in the map below, and copy all bags into DefaultBags before 
> returning to avoid serialization exceptions.
>        */
>       Map<Integer, Boolean> isBagAtIndex = Maps.newHashMap();
>       
>       try {
>         Tuple resTuple = tupleFactory_.newTuple(in.size());
>         for (int i=0; i< in.size(); i++) {
>           Object obj = in.get(i);
>           if (!isBagAtIndex.containsKey(i)) {
>             isBagAtIndex.put(i, obj instanceof SingleTupleBag);
>           }
>           if (isBagAtIndex.get(i)) {
>             DataBag newBag = bagFactory_.newDefaultBag();
>             newBag.addAll((DataBag)obj);
>             obj = newBag;
>           }
>           resTuple.set(i, obj);
>         }
>         return resTuple;
>       } catch (IOException e) {
>         log.warn(e);
>         return null;
>       }
>     }
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to