[ https://issues.apache.org/jira/browse/PIG-1285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12847721#action_12847721 ]
Pradeep Kamath commented on PIG-1285: ------------------------------------- yes > Allow SingleTupleBag to be serialized > ------------------------------------- > > Key: PIG-1285 > URL: https://issues.apache.org/jira/browse/PIG-1285 > Project: Pig > Issue Type: Improvement > Reporter: Dmitriy V. Ryaboy > Assignee: Dmitriy V. Ryaboy > Fix For: 0.7.0 > > Attachments: PIG-1285.patch > > > Currently, Pig uses a SingleTupleBag for efficiency when a full-blown > spillable bag implementation is not needed in the Combiner optimization. > Unfortunately this can create problems. The below Initial.exec() code fails > at run-time with the message that a SingleTupleBag cannot be serialized: > {code} > @Override > public Tuple exec(Tuple in) throws IOException { > // single record. just copy. > if (in == null) return null; > try { > Tuple resTuple = tupleFactory_.newTuple(in.size()); > for (int i=0; i< in.size(); i++) { > resTuple.set(i, in.get(i)); > } > return resTuple; > } catch (IOException e) { > log.warn(e); > return null; > } > } > {code} > The code below can fix the problem in the UDF, but it seems like something > that should be handled transparently, not requiring UDF authors to know about > SingleTupleBags. > {code} > @Override > public Tuple exec(Tuple in) throws IOException { > // single record. just copy. > if (in == null) return null; > > /* > * Unfortunately SingleTupleBags are not serializable. We cache whether > a given index contains a bag > * in the map below, and copy all bags into DefaultBags before > returning to avoid serialization exceptions. > */ > Map<Integer, Boolean> isBagAtIndex = Maps.newHashMap(); > > try { > Tuple resTuple = tupleFactory_.newTuple(in.size()); > for (int i=0; i< in.size(); i++) { > Object obj = in.get(i); > if (!isBagAtIndex.containsKey(i)) { > isBagAtIndex.put(i, obj instanceof SingleTupleBag); > } > if (isBagAtIndex.get(i)) { > DataBag newBag = bagFactory_.newDefaultBag(); > newBag.addAll((DataBag)obj); > obj = newBag; > } > resTuple.set(i, obj); > } > return resTuple; > } catch (IOException e) { > log.warn(e); > return null; > } > } > {code} -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.