sorry for the delay !!!
it must be better option, but I wrote a simple loader, extending PigStorage
(I re-used/took a lot of code from PigStorage, specially its parse/split
method)
you need to complete the method 'process' to take the field/fields you need
to convert your date and then set the right field ( 0? )
to compile it, you have to put in your classpath pig-core.jar and
hadoop-code.jar
something like:
javac -cp /usr/lib/pig/pig-core.jar:/usr/lib/hadoop/hadoop-core.jar
myPackage/MyLoader.java
any doubt, just let me know
On Tue, Nov 8, 2011 at 7:31 AM, pablomar <[email protected]>wrote:
> sorry, I read custom log and I thought you have a custom loader
> you can extend PigStorage and do the field replacement in its putNext
> method
>
> I'll do an example later
>
> On 11/8/11, Rauan Maemirov <[email protected]> wrote:
> > Yes, you understand my task right. What is putNext? I'm new to pig, and
> > didn't customize udfs.
> >
> > 2011/11/8 pablomar <[email protected]>
> >
> >> sorry, I didn't understand completely
> >>
> >> do you want to read a line, if the date is invalid (performing a
> >> IsoToUnix directly and not a regex before) you want to skip it ? it
> >> that ?
> >> if yes, you can replace the field with your converted date (unix
> >> format), and if it fails put a null or nothing
> >>
> >> I mean, in your overridden putNext, you have you individual columns,
> >> you can try to convert the date in there and put in the output your
> >> unix date.
> >>
> >> sorry if I misunderstood again your problem
> >>
> >> On 11/8/11, Rauan Maemirov <[email protected]> wrote:
> >> > Sure, but now I'm just omiting the rows _after_ regex matching.
> >> > What I want to do is to avoid additional filtering by regex and ignore
> >> > invalid rows right after unsuccessful IsoToUnix().
> >> >
> >> > 2011/11/8 pablomar <[email protected]>
> >> >
> >> >> can you write something else (a null, for example) in your putNext
> >> >> method for that field when the date is invalid ?
> >> >>
> >> >> On 11/8/11, Rauan Maemirov <[email protected]> wrote:
> >> >> > Well, I solved this issue via regex matching, but I wonder if it's
> >> >> > too
> >> >> > costful.
> >> >> > Is there anyway the way to ignore exceptions and move on just by
> >> omiting
> >> >> > the wrong tuples?
> >> >> >
> >> >> > 2011/11/8 Rauan Maemirov <[email protected]>
> >> >> >
> >> >> >> Hi, all. I've got custom log (csv delimited by comma) with iso
> >> >> >> dates,
> >> >> >> sometimes log writing lags and I'm having exceptions with wrong
> iso
> >> >> >> date
> >> >> >> format.
> >> >> >> Here's exception: https://gist.github.com/1347406. (Date is the
> last
> >> >> >> "parameter" in the row, and it's incorrectly overwritten at the
> end
> >> by
> >> >> >> another string).
> >> >> >>
> >> >> >> The question is how can I filter out all wrong dates or at least
> >> force
> >> >> pig
> >> >> >> to ignore them instead of failing?
> >> >> >>
> >> >> >
> >> >>
> >> >
> >>
> >
>
package myPackage;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.hadoop.io.Text;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.StorageUtil;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.builtin.PigStorage;
public class MyLoader extends PigStorage
{
private char fieldDel = ',';
private ArrayList<Object> mProtoTuple = null;
private TupleFactory mTupleFactory = TupleFactory.getInstance();
private boolean mRequiredColumnsInitialized = false;
public MyLoader()
{
super();
}
public MyLoader(String delimiter)
{
this();
fieldDel = (char)StorageUtil.parseFieldDel(delimiter);
}
@Override
public Tuple getNext() throws IOException
{
mProtoTuple = new ArrayList<Object>();
if(!mRequiredColumnsInitialized)
{
if(signature != null)
{
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty(signature));
}
mRequiredColumnsInitialized = true;
}
try
{
boolean notDone = in.nextKeyValue();
if(!notDone)
{
return null;
}
Text value = (Text) in.getCurrentValue();
List <String> fields = process(value.toString(), fieldDel);
for(String field : fields)
{
readField(field);
}
Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple);
return t;
}
catch(InterruptedException e)
{
int errCode = 6018;
String errMsg = "Error while reading input";
throw new ExecException(errMsg, errCode, PigException.REMOTE_ENVIRONMENT, e);
}
}
private List<String> process(String line, char delim)
{
List<String> fields = parse(line, delim);
//transform your date field
// set field 0 with the new value
return fields;
}
private List<String> parse(String line, char delim)
{
List<String> values = new ArrayList<String>();
byte []buf = line.getBytes();
int len = line.length();
int start = 0;
String token;
for(int i = 0; i < len; i++)
{
if(buf[i] == delim)
{
//new String to avoir substring memory leak
token = new String(line.substring(start, i));
values.add(token);
start = i + 1;
}
}
if(start <= len)
{
token = new String(line.substring(start,len));
values.add(token);
}
return values;
}
private void readField(String field)
{
if(field.length() == 0)
{
mProtoTuple.add(null);
}
else
{
mProtoTuple.add(new DataByteArray(field));
}
}
// I return false and let Pig to get rid of the unused columns
@Override
public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList)
{
if(requiredFieldList == null)
return null;
return new RequiredFieldResponse(false);
}
@Override
public boolean equals(Object obj)
{
if(obj instanceof MyLoader)
return equals((MyLoader)obj);
else
return false;
}
}