Can you try loading the input files without the schema?
raw = LOAD '$log_path' using PigStorage('\t', '-noschema');
PigStorage by default looks for schema files and that *may* be slowing down
things (based on your assessment of slowness due to the # of input dirs).
On Mon, Jun 3, 2013 at 12:59 PM, Eugene Morozov
<[email protected]>wrote:
> Hello!
>
>
> Question #1
> I noticed couple of days ago that my scripts started running slower than
> usual. I experimented a bit and it turns out that "compilation" time
> depends on how many input files I give to my script. By compilation I mean
> everything it does after Pig is being run and before I see new job in
> JobTracker webUI.
>
> I have 3600 input files that lives in 24 different folders with names 00 to
> 23. Pig consumes different amount of time starting from pig -p
> input_path=... my-script.pig up to generating jar step depending on how
> many input files the script should process. When I give it just one
> directory like 00/* it takes only 10-20 seconds before starting job. When I
> use bunch of directories as a param 0?/* then it takes about 120-240
> seconds. And it consumes tremendous 15 minutes when I use all my data.
>
> During that hanging (and seems doing nothing) period of time I use
> java/bin/jstack and strace and I see that there are only two active
> threads:
> * FIRST
> epoll_wait(291, {}, 1024, 0) = 0
> read(287,
>
> "\6\10\327\205\25\20\0\0\0\0;\n9\10\2\22\0\30\254\264\264'\"\3\10\244\3*\7per"...,
> 8192) = 70
> futex(0x4907b534, FUTEX_WAKE_OP_PRIVATE, 1, 1, 0x4907b530,
> {FUTEX_OP_SET, 0, FUTEX_OP_CMP_GT, 1}) = 1
> futex(0x47d48a28, FUTEX_WAKE_PRIVATE, 1) = 1
> clock_gettime(CLOCK_REALTIME, {1370272461, 649119000}) = 0
> futex(0x4907b340, FUTEX_WAKE_PRIVATE, 1) = 1
> futex(0x4907b344, FUTEX_WAIT_PRIVATE, 689631, {9, 998984000}) = -1
> EAGAIN (Resource temporarily unavailable)
> futex(0x48c25928, FUTEX_WAKE_PRIVATE, 1) = 0
> read(287, 0x2aaab1111000, 8192) = -1 EAGAIN (Resource
> temporarily unavailable)
> #287 is just a socket
>
> its java stack is
> "IPC Client (2138196637) connection to
> hbase01.303net.pvt/10.0.240.16:8020from emorozov" daemon prio=10
> tid=0x00002aaab108c000 nid=0x711 runnable
> [0x0000000042ed9000]
> java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
> - locked <0x00000000c1aab558> (a sun.nio.ch.Util$2)
> - locked <0x00000000c1aab548> (a java.util.Collections$UnmodifiableSet)
> - locked <0x00000000c1aa4578> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
> at
>
> org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:336)
> at
>
> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:158)
> at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:154)
> at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:127)
> at java.io.FilterInputStream.read(FilterInputStream.java:116)
> at java.io.FilterInputStream.read(FilterInputStream.java:116)
> at
>
> org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:386)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
> - locked <0x00000000c1800600> (a java.io.BufferedInputStream)
> at java.io.FilterInputStream.read(FilterInputStream.java:66)
> at
>
> com.google.protobuf.AbstractMessageLite$Builder.mergeDelimitedFrom(AbstractMessageLite.java:276)
> at
>
> com.google.protobuf.AbstractMessage$Builder.mergeDelimitedFrom(AbstractMessage.java:760)
> at
>
> com.google.protobuf.AbstractMessageLite$Builder.mergeDelimitedFrom(AbstractMessageLite.java:288)
> at
>
> com.google.protobuf.AbstractMessage$Builder.mergeDelimitedFrom(AbstractMessage.java:752)
> at
>
> org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto.parseDelimitedFrom(RpcPayloadHeaderProtos.java:985)
> at
> org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:882)
> at org.apache.hadoop.ipc.Client$Connection.run(Client.java:813)
>
>
>
> * SECOND
> futex(0x4dd23a28, FUTEX_WAKE_PRIVATE, 1) = 0
> futex(0x4e0e9f94, FUTEX_WAKE_OP_PRIVATE, 1, 1, 0x4e0e9f90,
> ¨FUTEX_OP_SET, 0, FUTEX_OP_CMP_GT, 1¼) = 1
> futex(0x2aaab105cf28, FUTEX_WAKE_PRIVATE, 1) = 1
> write(287,
> "½0½0½0½306½10½10½2½20½0½30½256½265j½273½1½n½vgetFileInfo½22z½nx"..., 202)
> = 202
> #287 is same socket
>
>
> "main" prio=10 tid=0x000000004dd22800 nid=0x6fd in Object.wait()
> [0x0000000041fc9000]
> java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:485)
> at org.apache.hadoop.ipc.Client.call(Client.java:1146)
> - locked <0x00000000eda48e00> (a org.apache.hadoop.ipc.Client$Call)
> at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> at $Proxy9.getFileInfo(Unknown Source)
> at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> at $Proxy9.getFileInfo(Unknown Source)
> at
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:628)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1507)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:783)
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1257)
> at
>
> org.apache.pig.backend.hadoop.datastorage.HDataStorage.isContainer(HDataStorage.java:203)
> at
>
> org.apache.pig.backend.hadoop.datastorage.HDataStorage.asElement(HDataStorage.java:131)
> at
>
> org.apache.pig.backend.hadoop.datastorage.HDataStorage.asElement(HDataStorage.java:147)
> at
>
> org.apache.pig.backend.hadoop.datastorage.HDataStorage.asElement(HDataStorage.java:153)
> at org.apache.pig.builtin.JsonMetadata.findMetaFile(JsonMetadata.java:131)
> at org.apache.pig.builtin.JsonMetadata.getSchema(JsonMetadata.java:188)
> at org.apache.pig.builtin.PigStorage.getSchema(PigStorage.java:465)
> at
>
> org.apache.pig.newplan.logical.relational.LOLoad.getSchemaFromMetaData(LOLoad.java:151)
> ...
>
>
>
> My complete script is following:
> raw = LOAD '$log_path';
> raw = FOREACH raw GENERATE $40 AS userId, (long)$14 as advEntityId,
> (long)$17 as adNetworkId, (chararray)$34 AS eventType,
> ((chararray)$0 == 'ERROR' OR (chararray)$1 == 'ERROR' OR
> (chararray)$3 == 'ERROR' OR
> (chararray)$5 == 'ERROR' OR (chararray)$6 == 'ERROR' OR
> (chararray)$8 == 'ERROR' OR
> (chararray)$16 == 'ERROR' OR (chararray)$33 == 'ERROR' OR
> (chararray)$34 == 'ERROR' OR
> (chararray)$39 == 'ERROR' OR (chararray)$40 == 'ERROR' OR
> (chararray)$41 == 'ERROR' OR
> (chararray)$48 == 'ERROR' OR (chararray)$49 == 'ERROR' ? 1 : 0) AS
> skip;
> raw2 = FOREACH raw GENERATE (advEntityId is null ? -1 : advEntityId) as
> advEntityId, (adNetworkId is null ? -1 : adNetworkId) as adNetworkId,
> count, eventType, skip;
> raw3 = FILTER raw2 BY NOT (advEntityId == -1 AND adNetworkId == -1 OR
> eventType == 'api' OR skip == 1);
> raw4 = FOREACH raw3 GENERATE userId;
> d_raw = DISTINCT raw4;
> s_raw = FOREACH (GROUP d_raw all PARALLEL 1) GENERATE COUNT(d_raw.userId);
>
> Here is what we did couple of days ago.
> We had just one box with one zookeeper, namenode and jobtracker. I must say
> everything worked perfectly. So, couple of days ago we
> 1. Moved JobTracker to separate box
> 2. Moved Zookeepers out of this box. Applied HA to Zookeepers (instead of
> one we have three now)
>
> Namenode lives on same box and I tried to run my script on same box.
>
> That's it. I would really really appreciate in anybody could give me a clue
> where to go next.
>
>
> Question #2.
> Previous version of the script didn't contain ERROR comparisons. Such a
> boolean expression slows my script even more. I would be glad if anyone
> could explain it to me why it's happening, cause such an expression is
> quite a simple one.
>
>
> --
> Evgeny Morozov
> Developer Grid Dynamics
> Skype: morozov.evgeny
> www.griddynamics.com
> [email protected]
>