saveasorcfile on partitioned orc
Hi, I followed the information on https://www.mail-archive.com/reviews@spark.apache.org/msg141113.html to save orc file with spark 1.2.1. I can save data to a new orc file. I wonder how to save data to an existing and partitioned orc file? Any suggestions? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Insert overwrite to hive - ArrayIndexOutOfBoundsException
Hi, I am using spark 1.3.1. I tried to insert (a new partition) into an existing partitioned hive table, but got ArrayIndexOutOfBoundsException. Below is a code snippet and the debug log. Any suggestions please. + case class Record4Dim(key: String, date: Int, hh: Int, x: Int, y: Int, z: Int, height: Float, u: Float , v: Float, w: Float, ph: Float, phb: Float, t: Float, p: Float, pb: Float, qvapor: Float, qgraup: Float, qnice: Float, qnrain: Float, tke_pbl: Float, el_pbl: Float) def flatKeyFromWrf(x: (String, (Map[String,Float], Float))): Record4Dim = { } val varWHeightFlatRDD = varWHeightRDD.map(FlatMapUtilClass().flatKeyFromWrf).toDF() varWHeightFlatRDD.registerTempTable("table_4Dim") for (zz <- 1 to 51) hiveContext.sql("INSERT OVERWRITE table 4dim partition (zone=" + ZONE + ",z=" + zz + ",year=" + YEAR + ",month=" + MONTH + ") " + "select date, hh, x, y, height, u, v, w, ph, phb, t, pb, pb, qvapor, qgraup, qnice, tke_pbl, el_pbl from table_4Dim where z=" + zz); + 15/06/01 21:07:20 DEBUG YarnHistoryService: Enqueue [1433192840040]: SparkListenerTaskEnd(4,0,ResultTask,ExceptionFailure(java.lang.ArrayIndexOutOfBoundsException,18,[Ljava.lang.StackTraceElement;@5783ce22,java.lang.ArrayIndexOutOfBoundsException: 18 at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:79) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:103) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:100) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:100) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:82) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:82) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ERROR cluster.YarnScheduler: Lost executor
Hi, What can be the cause of this ERROR cluster.YarnScheduler: Lost executor? How can I fix it? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MetaException(message:java.security.AccessControlException: Permission denied
Hi, I was running a spark job to insert overwrite hive table and got Permission denied. My question is why spark job did the insert by using user 'hive', not myself who ran the job? How can I fix the problem? val hiveContext = new HiveContext(sc) import hiveContext.implicits._ hiveContext.sql("INSERT OVERWRITE table 4dim ... ") Caused by: MetaException(message:java.security.AccessControlException: Permission denied: user=hive, access=WRITE, inode="/apps/hive/warehouse/wrf_tables/4dim/zone=2/z=1/year=2009/month=1":patcharee:hdfs:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:185) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6795) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6777) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6702) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAccess(FSNamesystem.java:9529) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.checkAccess(NameNodeRpcServer.java:1516) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.checkAccess(ClientNamenodeProtocolServerSideTranslatorPB.java:1433) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033) ) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_partition_result$alter_partition_resultStandardScheme.read(ThriftHiveMetastore.java) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_partition_result$alter_partition_resultStandardScheme.read(ThriftHiveMetastore.java) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_partition_result.read(ThriftHiveMetastore.java) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_alter_partition(ThriftHiveMetastore.java:2033) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.alter_partition(ThriftHiveMetastore.java:2018) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.alter_partition(HiveMetaStoreClient.java:1091) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at com.sun.proxy.$Proxy37.alter_partition(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.alterPartition(Hive.java:469) ... 26 more BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ERROR cluster.YarnScheduler: Lost executor
This is log I can get> 15/06/02 16:37:31 INFO shuffle.RetryingBlockFetcher: Retrying fetch (2/3) for 4 outstanding blocks after 5000 ms 15/06/02 16:37:36 INFO client.TransportClientFactory: Found inactive connection to compute-10-3.local/10.10.255.238:33671, creating a new one. 15/06/02 16:37:36 WARN server.TransportChannelHandler: Exception in connection from /10.10.255.238:35430 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:744) 15/06/02 16:37:36 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1033433133943, chunkIndex=1}, buffer=FileSegmentManagedBuffer{file=/hdisk3/hadoop/yarn/local/usercache/patcharee/appcache/application_1432633634512_0213/blockmgr-12d59e6b-0895-4a0e-9d06-152d2f7ee855/09/shuffle_0_56_0.data, offset=896, length=1132499356}} to /10.10.255.238:35430; closing connection java.nio.channels.ClosedChannelException 15/06/02 16:37:38 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 4 outstanding blocks (after 2 retries) java.io.IOException: Failed to connect to compute-10-3.local/10.10.255.238:33671 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.net.ConnectException: Connection refused: compute-10-3.local/10.10.255.238:33671 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) ... 1 more Best, Patcharee On 03. juni 2015 09:21, Akhil Das wrote: You need to look into your executor/worker logs to see whats going on. Thanks Best Regards On Wed, Jun 3, 2015 at 12:01 PM, patcharee <mailto:patcharee.thong...@uni.no>> wrote: Hi, What can be the cause of this ERROR cluster.YarnScheduler: Lost executor? How can I fix it? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org
Re: ERROR cluster.YarnScheduler: Lost executor
(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) ... 1 more ) I am using spark 1.3.1, is the problem from the https://issues.apache.org/jira/browse/SPARK-4516? Best, Patcharee On 03. juni 2015 10:11, Akhil Das wrote: Which version of spark? Looks like you are hitting this one https://issues.apache.org/jira/browse/SPARK-4516 Thanks Best Regards On Wed, Jun 3, 2015 at 1:06 PM, patcharee <mailto:patcharee.thong...@uni.no>> wrote: This is log I can get> 15/06/02 16:37:31 INFO shuffle.RetryingBlockFetcher: Retrying fetch (2/3) for 4 outstanding blocks after 5000 ms 15/06/02 16:37:36 INFO client.TransportClientFactory: Found inactive connection to compute-10-3.local/10.10.255.238:33671 <http://10.10.255.238:33671>, creating a new one. 15/06/02 16:37:36 WARN server.TransportChannelHandler: Exception in connection from /10.10.255.238:35430 <http://10.10.255.238:35430> java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:744) 15/06/02 16:37:36 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1033433133943, chunkIndex=1}, buffer=FileSegmentManagedBuffer{file=/hdisk3/hadoop/yarn/local/usercache/patcharee/appcache/application_1432633634512_0213/blockmgr-12d59e6b-0895-4a0e-9d06-152d2f7ee855/09/shuffle_0_56_0.data, offset=896, length=1132499356}} to /10.10.255.238:35430 <http://10.10.255.238:35430>; closing connection java.nio.channels.ClosedChannelException 15/06/02 16:37:38 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 4 outstanding blocks (after 2 retries) java.io.IOException: Failed to connect to compute-10-3.local/10.10.255.238:33671 <http://10.10.255.238:33671> at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.net.ConnectException: Connection refused: compute-10-3.local/10.10.255.238:33671 <http://10.10.255.238:33671> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) at
NullPointerException SQLConf.setConf
Hi, I am using Hive 0.14 and spark 0.13. I got java.lang.NullPointerException when inserted into hive. Any suggestions please. hiveContext.sql("INSERT OVERWRITE table 4dim partition (zone=" + ZONE + ",z=" + zz + ",year=" + YEAR + ",month=" + MONTH + ") " + "select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim where z=" + zz); java.lang.NullPointerException at org.apache.spark.sql.SQLConf.setConf(SQLConf.scala:196) at org.apache.spark.sql.SQLContext.setConf(SQLContext.scala:74) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:251) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:250) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:95) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$main$3$$anonfun$apply$1.apply(LoadWrfIntoHiveOptReduce1.scala:110) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$main$3$$anonfun$apply$1.apply(LoadWrfIntoHiveOptReduce1.scala:107) at scala.collection.immutable.Range.foreach(Range.scala:141) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$main$3.apply(LoadWrfIntoHiveOptReduce1.scala:107) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$main$3.apply(LoadWrfIntoHiveOptReduce1.scala:107) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FetchFailed Exception
Hi, I has this problem before, and in my case it is because the executor/container was killed by yarn when it used more memory than allocated. You can check if your case is the same by checking yarn node manager log. Best, Patcharee On 05. juni 2015 07:25, ÐΞ€ρ@Ҝ (๏̯͡๏) wrote: I see this Is this a problem with my code or the cluster ? Is there any way to fix it ? FetchFailed(BlockManagerId(2, phxdpehdc9dn2441.stratus.phx.ebay.com <http://phxdpehdc9dn2441.stratus.phx.ebay.com>, 59574), shuffleId=1, mapId=80, reduceId=20, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect to phxdpehdc9dn2441.stratus.phx.ebay.com/10.115.81.47:59574 <http://phxdpehdc9dn2441.stratus.phx.ebay.com/10.115.81.47:59574> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to connect to phxdpehdc9dn2441.stratus.phx.ebay.com/10.115.81.47:59574 <http://phxdpehdc9dn2441.stratus.phx.ebay.com/10.115.81.47:59574> at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) ... 3 more Caused by: java.net.ConnectException: Connection refused: phxdpehdc9dn2441.stratus.phx.ebay.com/10.115.81.47:59574 <http://phxdpehdc9dn2441.stratus.phx.ebay.com/10.115.81.47:59574> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) at io.netty.channel.nio.NioEventLoop.processSelectedKey(N
write multiple outputs by key
Hi, How can I write to multiple outputs for each key? I tried to create custom partitioner or define the number of partition but does not work. There are only the few tasks/partitions (which equals to the number of all key combination) gets large datasets, data is not splitting to all tasks/partition. The job failed as the few tasks handled too far large datasets. Below is my code snippet. val varWFlatRDD = varWRDD.map(FlatMapUtilClass().flatKeyFromWrf).groupByKey() //key are (zone, z, year, month) .foreach( x => { val z = x._1._1 val year = x._1._2 val month = x._1._3 val df_table_4dim = x._2.toList.toDF() df_table_4dim.registerTempTable("table_4Dim") hiveContext.sql("INSERT OVERWRITE table 4dim partition (zone=" + ZONE + ",z=" + z + ",year=" + year + ",month=" + month + ") " + "select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim"); }) From the spark history UI, at groupByKey there are > 1000 tasks (equals to the parent's # partitions). at foreach there are > 1000 tasks as well, but 50 tasks (same as the # all key combination) gets datasets. How can I fix this problem? Any suggestions are appreciated. BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
hiveContext.sql NullPointerException
Hi, I try to insert data into a partitioned hive table. The groupByKey is to combine dataset into a partition of the hive table. After the groupByKey, I converted the iterable[X] to DB by X.toList.toDF(). But the hiveContext.sql throws NullPointerException, see below. Any suggestions? What could be wrong? Thanks! val varWHeightFlatRDD = varWHeightRDD.flatMap(FlatMapUtilClass().flatKeyFromWrf).groupByKey() .foreach( x => { val zone = x._1._1 val z = x._1._2 val year = x._1._3 val month = x._1._4 val df_table_4dim = x._2.toList.toDF() df_table_4dim.registerTempTable("table_4Dim") hiveContext.sql("INSERT OVERWRITE table 4dim partition (zone=" + zone + ",z=" + z + ",year=" + year + ",month=" + month + ") " + "select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim"); }) java.lang.NullPointerException at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:100) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$7.apply(LoadWrfIntoHiveOptReduce1.scala:113) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$7.apply(LoadWrfIntoHiveOptReduce1.scala:103) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: hiveContext.sql NullPointerException
Hi, How can I expect to work on HiveContext on the executor? If only the driver can see HiveContext, does it mean I have to collect all datasets (very large) to the driver and use HiveContext there? It will be memory overload on the driver and fail. BR, Patcharee On 07. juni 2015 11:51, Cheng Lian wrote: Hi, This is expected behavior. HiveContext.sql (and also DataFrame.registerTempTable) is only expected to be invoked on driver side. However, the closure passed to RDD.foreach is executed on executor side, where no viable HiveContext instance exists. Cheng On 6/7/15 10:06 AM, patcharee wrote: Hi, I try to insert data into a partitioned hive table. The groupByKey is to combine dataset into a partition of the hive table. After the groupByKey, I converted the iterable[X] to DB by X.toList.toDF(). But the hiveContext.sql throws NullPointerException, see below. Any suggestions? What could be wrong? Thanks! val varWHeightFlatRDD = varWHeightRDD.flatMap(FlatMapUtilClass().flatKeyFromWrf).groupByKey() .foreach( x => { val zone = x._1._1 val z = x._1._2 val year = x._1._3 val month = x._1._4 val df_table_4dim = x._2.toList.toDF() df_table_4dim.registerTempTable("table_4Dim") hiveContext.sql("INSERT OVERWRITE table 4dim partition (zone=" + zone + ",z=" + z + ",year=" + year + ",month=" + month + ") " + "select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim"); }) java.lang.NullPointerException at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:100) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$7.apply(LoadWrfIntoHiveOptReduce1.scala:113) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$7.apply(LoadWrfIntoHiveOptReduce1.scala:103) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: hiveContext.sql NullPointerException
Hi, Thanks for your guidelines. I will try it out. Btw how do you know HiveContext.sql (and also DataFrame.registerTempTable) is only expected to be invoked on driver side? Where can I find document? BR, Patcharee On 07. juni 2015 16:40, Cheng Lian wrote: Spark SQL supports Hive dynamic partitioning, so one possible workaround is to create a Hive table partitioned by zone, z, year, and month dynamically, and then insert the whole dataset into it directly. In 1.4, we also provides dynamic partitioning support for non-Hive environment, and you can do something like this: df.write.partitionBy("zone", "z", "year", "month").format("parquet").mode("overwrite").saveAsTable("tbl") Cheng On 6/7/15 9:48 PM, patcharee wrote: Hi, How can I expect to work on HiveContext on the executor? If only the driver can see HiveContext, does it mean I have to collect all datasets (very large) to the driver and use HiveContext there? It will be memory overload on the driver and fail. BR, Patcharee On 07. juni 2015 11:51, Cheng Lian wrote: Hi, This is expected behavior. HiveContext.sql (and also DataFrame.registerTempTable) is only expected to be invoked on driver side. However, the closure passed to RDD.foreach is executed on executor side, where no viable HiveContext instance exists. Cheng On 6/7/15 10:06 AM, patcharee wrote: Hi, I try to insert data into a partitioned hive table. The groupByKey is to combine dataset into a partition of the hive table. After the groupByKey, I converted the iterable[X] to DB by X.toList.toDF(). But the hiveContext.sql throws NullPointerException, see below. Any suggestions? What could be wrong? Thanks! val varWHeightFlatRDD = varWHeightRDD.flatMap(FlatMapUtilClass().flatKeyFromWrf).groupByKey() .foreach( x => { val zone = x._1._1 val z = x._1._2 val year = x._1._3 val month = x._1._4 val df_table_4dim = x._2.toList.toDF() df_table_4dim.registerTempTable("table_4Dim") hiveContext.sql("INSERT OVERWRITE table 4dim partition (zone=" + zone + ",z=" + z + ",year=" + year + ",month=" + month + ") " + "select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim"); }) java.lang.NullPointerException at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:100) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$7.apply(LoadWrfIntoHiveOptReduce1.scala:113) at no.uni.computing.etl.LoadWrfIntoHiveOptReduce1$$anonfun$7.apply(LoadWrfIntoHiveOptReduce1.scala:103) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1511) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: hiveContext.sql NullPointerException
Hi, Does df.write.partitionBy("partitions").format("format").mode("overwrite").saveAsTable("tbl") support orc file? I tried df.write.partitionBy("zone", "z", "year", "month").format("orc").mode("overwrite").saveAsTable("tbl"), but after the insert my table "tbl" schema has been changed to something I did not expected .. -- FROM -- CREATE EXTERNAL TABLE `4dim`(`u` float, `v` float) PARTITIONED BY (`zone` int, `z` int, `year` int, `month` int) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' TBLPROPERTIES ( 'orc.compress'='ZLIB', 'transient_lastDdlTime'='1433016475') -- TO -- CREATE TABLE `4dim`(`col` array COMMENT 'from deserializer') PARTITIONED BY (`zone` int COMMENT '', `z` int COMMENT '', `year` int COMMENT '', `month` int COMMENT '') ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat' TBLPROPERTIES ( 'EXTERNAL'='FALSE', 'spark.sql.sources.provider'='orc', 'spark.sql.sources.schema.numParts'='1', 'spark.sql.sources.schema.part.0'='{\"type\":\"struct\",\"fields\":[{\"name\":\"u\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"v\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"zone\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"z\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"year\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"month\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}', 'transient_lastDdlTime'='1434055247') I noticed there are files stored in hdfs as *.orc, but when I tried to query from hive I got nothing. How can I fix this? Any suggestions please BR, Patcharee On 07. juni 2015 16:40, Cheng Lian wrote: Spark SQL supports Hive dynamic partitioning, so one possible workaround is to create a Hive table partitioned by zone, z, year, and month dynamically, and then insert the whole dataset into it directly. In 1.4, we also provides dynamic partitioning support for non-Hive environment, and you can do something like this: df.write.partitionBy("zone", "z", "year", "month").format("parquet").mode("overwrite").saveAsTable("tbl") Cheng On 6/7/15 9:48 PM, patcharee wrote: Hi, How can I expect to work on HiveContext on the executor? If only the driver can see HiveContext, does it mean I have to collect all datasets (very large) to the driver and use HiveContext there? It will be memory overload on the driver and fail. BR, Patcharee On 07. juni 2015 11:51, Cheng Lian wrote: Hi, This is expected behavior. HiveContext.sql (and also DataFrame.registerTempTable) is only expected to be invoked on driver side. However, the closure passed to RDD.foreach is executed on executor side, where no viable HiveContext instance exists. Cheng On 6/7/15 10:06 AM, patcharee wrote: Hi, I try to insert data into a partitioned hive table. The groupByKey is to combine dataset into a partition of the hive table. After the groupByKey, I converted the iterable[X] to DB by X.toList.toDF(). But the hiveContext.sql throws NullPointerException, see below. Any suggestions? What could be wrong? Thanks! val varWHeightFlatRDD = varWHeightRDD.flatMap(FlatMapUtilClass().flatKeyFromWrf).groupByKey() .foreach( x => { val zone = x._1._1 val z = x._1._2 val year = x._1._3 val month = x._1._4 val df_table_4dim = x._2.toList.toDF() df_table_4dim.registerTempTable("table_4Dim") hiveContext.sql("INSERT OVERWRITE table 4dim partition (zone=" + zone + ",z=" + z + ",year=" + year + ",month=" + month + ") " + "select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim"); }) java.lang.Nu
sql.catalyst.ScalaReflection scala.reflect.internal.MissingRequirementError
Hi, I use spark 0.14. I tried to create dataframe from RDD below, but got scala.reflect.internal.MissingRequirementError val partitionedTestDF2 = pairVarRDD.toDF("column1","column2","column3") //pairVarRDD is RDD[Record4Dim_2], and Record4Dim_2 is a Case Class How can I fix this? Exception in thread "main" scala.reflect.internal.MissingRequirementError: class etl.Record4Dim_2 in JavaMirror with sun.misc.Launcher$AppClassLoader@30177039 of type class sun.misc.Launcher$AppClassLoader with classpath [file:/local/spark140/conf/,file:/local/spark140/lib/spark-assembly-1.4.0-SNAPSHOT-hadoop2.6.0.jar,file:/local/spark140/lib/datanucleus-core-3.2.10.jar,file:/local/spark140/lib/datanucleus-rdbms-3.2.9.jar,file:/local/spark140/lib/datanucleus-api-jdo-3.2.6.jar,file:/etc/hadoop/conf/] and parent being sun.misc.Launcher$ExtClassLoader@52c8c6d9 of type class sun.misc.Launcher$ExtClassLoader with classpath [file:/usr/jdk64/jdk1.7.0_67/jre/lib/ext/sunec.jar,file:/usr/jdk64/jdk1.7.0_67/jre/lib/ext/sunjce_provider.jar,file:/usr/jdk64/jdk1.7.0_67/jre/lib/ext/sunpkcs11.jar,file:/usr/jdk64/jdk1.7.0_67/jre/lib/ext/zipfs.jar,file:/usr/jdk64/jdk1.7.0_67/jre/lib/ext/localedata.jar,file:/usr/jdk64/jdk1.7.0_67/jre/lib/ext/dnsns.jar] and parent being primordial classloader with boot classpath [/usr/jdk64/jdk1.7.0_67/jre/lib/resources.jar:/usr/jdk64/jdk1.7.0_67/jre/lib/rt.jar:/usr/jdk64/jdk1.7.0_67/jre/lib/sunrsasign.jar:/usr/jdk64/jdk1.7.0_67/jre/lib/jsse.jar:/usr/jdk64/jdk1.7.0_67/jre/lib/jce.jar:/usr/jdk64/jdk1.7.0_67/jre/lib/charsets.jar:/usr/jdk64/jdk1.7.0_67/jre/lib/jfr.jar:/usr/jdk64/jdk1.7.0_67/jre/classes] not found. at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16) at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61) at scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21) at no.uni.computing.etl.LoadWrfV14$$typecreator1$1.apply(LoadWrfV14.scala:91) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231) at org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:71) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:59) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:28) at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:410) at org.apache.spark.sql.SQLContext$implicits$.rddToDataFrameHolder(SQLContext.scala:335) BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HiveContext saveAsTable create wrong partition
Hi, I am using spark 1.4 and HiveContext to append data into a partitioned hive table. I found that the data insert into the table is correct, but the partition(folder) created is totally wrong. Below is my code snippet>> --- val schemaString = "zone z year month date hh x y height u v w ph phb t p pb qvapor qgraup qnice qnrain tke_pbl el_pbl" val schema = StructType( schemaString.split(" ").map(fieldName => if (fieldName.equals("zone") || fieldName.equals("z") || fieldName.equals("year") || fieldName.equals("month") || fieldName.equals("date") || fieldName.equals("hh") || fieldName.equals("x") || fieldName.equals("y")) StructField(fieldName, IntegerType, true) else StructField(fieldName, FloatType, true) )) val pairVarRDD = sc.parallelize(Seq((Row(2,42,2009,3,1,0,218,365,9989.497.floatValue(),29.627113.floatValue(),19.071793.floatValue(),0.11982734.floatValue(),3174.6812.floatValue(), 97735.2.floatValue(),16.389032.floatValue(),-96.62891.floatValue(),25135.365.floatValue(),2.6476808E-5.floatValue(),0.0.floatValue(),13195.351.floatValue(), 0.0.floatValue(),0.1.floatValue(),0.0.floatValue())) )) val partitionedTestDF2 = sqlContext.createDataFrame(pairVarRDD, schema) partitionedTestDF2.write.format("org.apache.spark.sql.hive.orc.DefaultSource") .mode(org.apache.spark.sql.SaveMode.Append).partitionBy("zone","z","year","month").saveAsTable("test4DimBySpark") --- The table contains 23 columns (longer than Tuple maximum length), so I use Row Object to store raw data, not Tuple. Here is some message from spark when it saved data>> 15/06/16 10:39:22 INFO metadata.Hive: Renaming src:hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0/part-1;dest: hdfs://service-10-0.local:8020/apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1;Status:true 15/06/16 10:39:22 INFO metadata.Hive: New loading path = hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0 with partSpec {zone=13195, z=0, year=0, month=0} From the raw data (pairVarRDD) zone = 2, z = 42, year = 2009, month = 3. But spark created a partition {zone=13195, z=0, year=0, month=0}. When I queried from hive>> hive> select * from test4dimBySpark; OK 242200931.00.0218.0365.09989.497 29.62711319.0717930.11982734-3174.681297735.2 16.389032-96.6289125135.3652.6476808E-50.0 13195 0 0 0 hive> select zone, z, year, month from test4dimBySpark; OK 13195000 hive> dfs -ls /apps/hive/warehouse/test4dimBySpark/*/*/*/*; Found 2 items -rw-r--r-- 3 patcharee hdfs 1411 2015-06-16 10:39 /apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1 The data stored in the table is correct zone = 2, z = 42, year = 2009, month = 3, but the partition created was wrong "zone=13195/z=0/year=0/month=0" Is this a bug or what could be wrong? Any suggestion is appreciated. BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HiveContext saveAsTable create wrong partition
I found if I move the partitioned columns in schemaString and in Row to the end of the sequence, then it works correctly... On 16. juni 2015 11:14, patcharee wrote: Hi, I am using spark 1.4 and HiveContext to append data into a partitioned hive table. I found that the data insert into the table is correct, but the partition(folder) created is totally wrong. Below is my code snippet>> --- val schemaString = "zone z year month date hh x y height u v w ph phb t p pb qvapor qgraup qnice qnrain tke_pbl el_pbl" val schema = StructType( schemaString.split(" ").map(fieldName => if (fieldName.equals("zone") || fieldName.equals("z") || fieldName.equals("year") || fieldName.equals("month") || fieldName.equals("date") || fieldName.equals("hh") || fieldName.equals("x") || fieldName.equals("y")) StructField(fieldName, IntegerType, true) else StructField(fieldName, FloatType, true) )) val pairVarRDD = sc.parallelize(Seq((Row(2,42,2009,3,1,0,218,365,9989.497.floatValue(),29.627113.floatValue(),19.071793.floatValue(),0.11982734.floatValue(),3174.6812.floatValue(), 97735.2.floatValue(),16.389032.floatValue(),-96.62891.floatValue(),25135.365.floatValue(),2.6476808E-5.floatValue(),0.0.floatValue(),13195.351.floatValue(), 0.0.floatValue(),0.1.floatValue(),0.0.floatValue())) )) val partitionedTestDF2 = sqlContext.createDataFrame(pairVarRDD, schema) partitionedTestDF2.write.format("org.apache.spark.sql.hive.orc.DefaultSource") .mode(org.apache.spark.sql.SaveMode.Append).partitionBy("zone","z","year","month").saveAsTable("test4DimBySpark") --- The table contains 23 columns (longer than Tuple maximum length), so I use Row Object to store raw data, not Tuple. Here is some message from spark when it saved data>> 15/06/16 10:39:22 INFO metadata.Hive: Renaming src:hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0/part-1;dest: hdfs://service-10-0.local:8020/apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1;Status:true 15/06/16 10:39:22 INFO metadata.Hive: New loading path = hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0 with partSpec {zone=13195, z=0, year=0, month=0} From the raw data (pairVarRDD) zone = 2, z = 42, year = 2009, month = 3. But spark created a partition {zone=13195, z=0, year=0, month=0}. When I queried from hive>> hive> select * from test4dimBySpark; OK 242200931.00.0218.0365.09989.497 29.62711319.0717930.11982734-3174.681297735.2 16.389032-96.6289125135.3652.6476808E-50.0 13195 00 0 hive> select zone, z, year, month from test4dimBySpark; OK 13195000 hive> dfs -ls /apps/hive/warehouse/test4dimBySpark/*/*/*/*; Found 2 items -rw-r--r-- 3 patcharee hdfs 1411 2015-06-16 10:39 /apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1 The data stored in the table is correct zone = 2, z = 42, year = 2009, month = 3, but the partition created was wrong "zone=13195/z=0/year=0/month=0" Is this a bug or what could be wrong? Any suggestion is appreciated. BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kryo serialization of classes in additional jars
Hi, I am having this problem on spark 1.4. Do you have any ideas how to solve it? I tried to use spark.executor.extraClassPath, but it did not help BR, Patcharee On 04. mai 2015 23:47, Imran Rashid wrote: Oh, this seems like a real pain. You should file a jira, I didn't see an open issue -- if nothing else just to document the issue. As you've noted, the problem is that the serializer is created immediately in the executors, right when the SparkEnv is created, but the other jars aren't downloaded later. I think you could workaround with some combination of pushing the jars to the cluster manually, and then using spark.executor.extraClassPath On Wed, Apr 29, 2015 at 6:42 PM, Akshat Aranya <mailto:aara...@gmail.com>> wrote: Hi, Is it possible to register kryo serialization for classes contained in jars that are added with "spark.jars"? In my experiment it doesn't seem to work, likely because the class registration happens before the jar is shipped to the executor and added to the classloader. Here's the general idea of what I want to do: val sparkConf = new SparkConf(true) .set("spark.jars", "foo.jar") .setAppName("foo") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // register classes contained in foo.jar sparkConf.registerKryoClasses(Array( classOf[com.foo.Foo], classOf[com.foo.Bar]))
Re: pyspark split pair rdd to multiple
I can also use dataframe. Any suggestions? Best, Patcharee On 20. april 2016 10:43, Gourav Sengupta wrote: Is there any reason why you are not using data frames? Regards, Gourav On Tue, Apr 19, 2016 at 8:51 PM, pth001 <mailto:patcharee.thong...@uni.no>> wrote: Hi, How can I split pair rdd [K, V] to map [K, Array(V)] efficiently in Pyspark? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org <mailto:user-h...@spark.apache.org>
what contribute to Task Deserialization Time
Hi, I'm running a simple job (reading sequential file and collect data at the driver) with yarn-client mode. When looking at the history server UI, Task Deserialization Time of tasks are quite different (5 ms to 5 s). What contribute to this Task Deserialization Time? Thank you in advance! Patcharee - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
visualize data from spark streaming
Hi, How to visualize realtime data (in graph/chart) from spark streaming? Any tools? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark streaming input rate strange
Hi, I have a streaming application with - 1 sec interval - accept data from a simulation through MulticastSocket The simulation sent out data using multiple clients/threads every 1 sec interval. The input rate accepted by the streaming looks strange. - When clients = 10,000 the event rate raises up to 10,000, stays at 10,000 a while and drops to about 7000-8000. - When clients = 20,000 the event rate raises up to 20,000, stays at 20,000 a while and drops to about 15000-17000. The same pattern Processing time is just about 400 ms. Any ideas/suggestions? Thanks, Patcharee
streaming textFileStream problem - got only ONE line
Hi, My streaming application is receiving data from file system and just prints the input count every 1 sec interval, as the code below: val sparkConf = new SparkConf() val ssc = new StreamingContext(sparkConf, Milliseconds(interval_ms)) val lines = ssc.textFileStream(args(0)) lines.count().print() The problem is sometimes the data received from scc.textFileStream is ONLY ONE line. But in fact there are multiple lines in the new file found in that interval. See log below which shows three intervals. In the 2nd interval, the new file is: hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt. This file contains 6288 lines. The ssc.textFileStream returns ONLY ONE line (the header). Any ideas/suggestions what the problem is? - SPARK LOG - 16/01/25 15:11:11 INFO FileInputDStream: Cleared 1 old files that were older than 1453731011000 ms: 145373101 ms 16/01/25 15:11:11 INFO FileInputDStream: Cleared 0 old files that were older than 1453731011000 ms: 16/01/25 15:11:12 INFO FileInputDStream: Finding new files took 4 ms 16/01/25 15:11:12 INFO FileInputDStream: New files at time 1453731072000 ms: hdfs://helmhdfs/user/patcharee/cerdata/datetime_19616.txt --- Time: 1453731072000 ms --- 6288 16/01/25 15:11:12 INFO FileInputDStream: Cleared 1 old files that were older than 1453731012000 ms: 1453731011000 ms 16/01/25 15:11:12 INFO FileInputDStream: Cleared 0 old files that were older than 1453731012000 ms: 16/01/25 15:11:13 INFO FileInputDStream: Finding new files took 4 ms 16/01/25 15:11:13 INFO FileInputDStream: New files at time 1453731073000 ms: hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt --- Time: 1453731073000 ms --- 1 16/01/25 15:11:13 INFO FileInputDStream: Cleared 1 old files that were older than 1453731013000 ms: 1453731012000 ms 16/01/25 15:11:13 INFO FileInputDStream: Cleared 0 old files that were older than 1453731013000 ms: 16/01/25 15:11:14 INFO FileInputDStream: Finding new files took 3 ms 16/01/25 15:11:14 INFO FileInputDStream: New files at time 1453731074000 ms: hdfs://helmhdfs/user/patcharee/cerdata/datetime_19618.txt --- Time: 1453731074000 ms --- 6288 Thanks, Patcharee
Re: streaming textFileStream problem - got only ONE line
I moved them every interval to the monitored directory. Patcharee On 25. jan. 2016 22:30, Shixiong(Ryan) Zhu wrote: Did you move the file into "hdfs://helmhdfs/user/patcharee/cerdata/", or write into it directly? `textFileStream` requires that files must be written to the monitored directory by "moving" them from another location within the same file system. On Mon, Jan 25, 2016 at 6:30 AM, patcharee <mailto:patcharee.thong...@uni.no>> wrote: Hi, My streaming application is receiving data from file system and just prints the input count every 1 sec interval, as the code below: val sparkConf = new SparkConf() val ssc = new StreamingContext(sparkConf, Milliseconds(interval_ms)) val lines = ssc.textFileStream(args(0)) lines.count().print() The problem is sometimes the data received from scc.textFileStream is ONLY ONE line. But in fact there are multiple lines in the new file found in that interval. See log below which shows three intervals. In the 2nd interval, the new file is: hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt. This file contains 6288 lines. The ssc.textFileStream returns ONLY ONE line (the header). Any ideas/suggestions what the problem is? - SPARK LOG - 16/01/25 15:11:11 INFO FileInputDStream: Cleared 1 old files that were older than 1453731011000 ms: 145373101 ms 16/01/25 15:11:11 INFO FileInputDStream: Cleared 0 old files that were older than 1453731011000 ms: 16/01/25 15:11:12 INFO FileInputDStream: Finding new files took 4 ms 16/01/25 15:11:12 INFO FileInputDStream: New files at time 1453731072000 ms: hdfs://helmhdfs/user/patcharee/cerdata/datetime_19616.txt --- Time: 1453731072000 ms --- 6288 16/01/25 15:11:12 INFO FileInputDStream: Cleared 1 old files that were older than 1453731012000 ms: 1453731011000 ms 16/01/25 15:11:12 INFO FileInputDStream: Cleared 0 old files that were older than 1453731012000 ms: 16/01/25 15:11:13 INFO FileInputDStream: Finding new files took 4 ms 16/01/25 15:11:13 INFO FileInputDStream: New files at time 1453731073000 ms: hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt --- Time: 1453731073000 ms --- 1 16/01/25 15:11:13 INFO FileInputDStream: Cleared 1 old files that were older than 1453731013000 ms: 1453731012000 ms 16/01/25 15:11:13 INFO FileInputDStream: Cleared 0 old files that were older than 1453731013000 ms: 16/01/25 15:11:14 INFO FileInputDStream: Finding new files took 3 ms 16/01/25 15:11:14 INFO FileInputDStream: New files at time 1453731074000 ms: hdfs://helmhdfs/user/patcharee/cerdata/datetime_19618.txt --- Time: 1453731074000 ms --- 6288 Thanks, Patcharee
Pyspark filter not empty
Hi, In pyspark how to filter if a column of dataframe is not empty? I tried: dfNotEmpty = df.filter(df['msg']!='') It did not work. Thanks, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
kafka streaming topic partitions vs executors
Hi, I am working a streaming application integrated with Kafka by the API createDirectStream. The application streams a topic which contains 10 partitions (on Kafka). It executes with 10 workers (--num-executors 10) When it reads data from Kafka/ZooKeeper, Spark creates 10 tasks (as same as the topic's partitions). However some executors are given more than 1 tasks and work on these tasks sequentially. Why Spark does not distribute these 10 tasks to 10 executors? How to do that? Thanks, Patcharee
hiveContext sql number of tasks
Hi, I do a sql query on about 10,000 partitioned orc files. Because of the partition schema the files cannot be merged any longer (to reduce the total number). From this command hiveContext.sql(sqlText), the 10K tasks were created to handle each file. Is it possible to use less tasks? How to force the spark sql to use less tasks? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
sql query orc slow
Hi, I am using spark sql 1.5 to query a hive table stored as partitioned orc file. We have the total files is about 6000 files and each file size is about 245MB. What is the difference between these two query methods below: 1. Using query on hive table directly hiveContext.sql("select col1, col2 from table1") 2. Reading from orc file, register temp table and query from the temp table val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1") c.registerTempTable("regTable") hiveContext.sql("select col1, col2 from regTable") When the number of files is large (query all from the total 6000 files) , the second case is much slower then the first one. Any ideas why? BR, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sql query orc slow
Yes, the predicate pushdown is enabled, but still take longer time than the first method BR, Patcharee On 08. okt. 2015 18:43, Zhan Zhang wrote: Hi Patcharee, Did you enable the predicate pushdown in the second method? Thanks. Zhan Zhang On Oct 8, 2015, at 1:43 AM, patcharee wrote: Hi, I am using spark sql 1.5 to query a hive table stored as partitioned orc file. We have the total files is about 6000 files and each file size is about 245MB. What is the difference between these two query methods below: 1. Using query on hive table directly hiveContext.sql("select col1, col2 from table1") 2. Reading from orc file, register temp table and query from the temp table val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1") c.registerTempTable("regTable") hiveContext.sql("select col1, col2 from regTable") When the number of files is large (query all from the total 6000 files) , the second case is much slower then the first one. Any ideas why? BR, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sql query orc slow
Hi Zhan Zhang Actually my query has WHERE clause "select date, month, year, hh, (u*0.9122461 - v*-0.40964267), (v*0.9122461 + u*-0.40964267), z from 4D where x = 320 and y = 117 and zone == 2 and year=2009 and z >= 2 and z <= 8", column "x", "y" is not partition column, the others are partition columns. I expected the system will use predicate pushdown. I turned on the debug and found pushdown predicate was not generated ("DEBUG OrcInputFormat: No ORC pushdown predicate") Then I tried to set the search argument explicitly (on the column "x" which is not partition column) val xs = SearchArgumentFactory.newBuilder().startAnd().equals("x", 320).end().build() hiveContext.setConf("hive.io.file.readcolumn.names", "x") hiveContext.setConf("sarg.pushdown", xs.toKryo()) this time in the log pushdown predicate was generated but results was wrong (no results at all) 15/10/09 18:36:06 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 = (EQUALS x 320) expr = leaf-0 Any ideas What wrong with this? Why the ORC pushdown predicate is not applied by the system? BR, Patcharee On 09. okt. 2015 18:31, Zhan Zhang wrote: Hi Patcharee, >From the query, it looks like only the column pruning will be applied. Partition pruning and predicate pushdown does not have effect. Do you see big IO difference between two methods? The potential reason of the speed difference I can think of may be the different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, but the spark path use OrcInputFormat. Thanks. Zhan Zhang On Oct 8, 2015, at 11:55 PM, patcharee wrote: Yes, the predicate pushdown is enabled, but still take longer time than the first method BR, Patcharee On 08. okt. 2015 18:43, Zhan Zhang wrote: Hi Patcharee, Did you enable the predicate pushdown in the second method? Thanks. Zhan Zhang On Oct 8, 2015, at 1:43 AM, patcharee wrote: Hi, I am using spark sql 1.5 to query a hive table stored as partitioned orc file. We have the total files is about 6000 files and each file size is about 245MB. What is the difference between these two query methods below: 1. Using query on hive table directly hiveContext.sql("select col1, col2 from table1") 2. Reading from orc file, register temp table and query from the temp table val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1") c.registerTempTable("regTable") hiveContext.sql("select col1, col2 from regTable") When the number of files is large (query all from the total 6000 files) , the second case is much slower then the first one. Any ideas why? BR, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sql query orc slow
I set hiveContext.setConf("spark.sql.orc.filterPushdown", "true"). But from the log No ORC pushdown predicate for my query with WHERE clause. 15/10/09 19:16:01 DEBUG OrcInputFormat: No ORC pushdown predicate I did not understand what wrong with this. BR, Patcharee On 09. okt. 2015 19:10, Zhan Zhang wrote: In your case, you manually set an AND pushdown, and the predicate is right based on your setting, : leaf-0 = (EQUALS x 320) The right way is to enable the predicate pushdown as follows. sqlContext.setConf("spark.sql.orc.filterPushdown", "true”) Thanks. Zhan Zhang On Oct 9, 2015, at 9:58 AM, patcharee <mailto:patcharee.thong...@uni.no>> wrote: Hi Zhan Zhang Actually my query has WHERE clause "select date, month, year, hh, (u*0.9122461 - v*-0.40964267), (v*0.9122461 + u*-0.40964267), z from 4D where x = 320 and y = 117 and zone == 2 and year=2009 and z >= 2 and z <= 8", column "x", "y" is not partition column, the others are partition columns. I expected the system will use predicate pushdown. I turned on the debug and found pushdown predicate was not generated ("DEBUG OrcInputFormat: No ORC pushdown predicate") Then I tried to set the search argument explicitly (on the column "x" which is not partition column) val xs = SearchArgumentFactory.newBuilder().startAnd().equals("x", 320).end().build() hiveContext.setConf("hive.io.file.readcolumn.names", "x") hiveContext.setConf("sarg.pushdown", xs.toKryo()) this time in the log pushdown predicate was generated but results was wrong (no results at all) 15/10/09 18:36:06 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 = (EQUALS x 320) expr = leaf-0 Any ideas What wrong with this? Why the ORC pushdown predicate is not applied by the system? BR, Patcharee On 09. okt. 2015 18:31, Zhan Zhang wrote: Hi Patcharee, >From the query, it looks like only the column pruning will be applied. Partition pruning and predicate pushdown does not have effect. Do you see big IO difference between two methods? The potential reason of the speed difference I can think of may be the different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, but the spark path use OrcInputFormat. Thanks. Zhan Zhang On Oct 8, 2015, at 11:55 PM, patcharee <mailto:patcharee.thong...@uni.no>> wrote: Yes, the predicate pushdown is enabled, but still take longer time than the first method BR, Patcharee On 08. okt. 2015 18:43, Zhan Zhang wrote: Hi Patcharee, Did you enable the predicate pushdown in the second method? Thanks. Zhan Zhang On Oct 8, 2015, at 1:43 AM, patcharee <mailto:patcharee.thong...@uni.no>> wrote: Hi, I am using spark sql 1.5 to query a hive table stored as partitioned orc file. We have the total files is about 6000 files and each file size is about 245MB. What is the difference between these two query methods below: 1. Using query on hive table directly hiveContext.sql("select col1, col2 from table1") 2. Reading from orc file, register temp table and query from the temp table val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1") c.registerTempTable("regTable") hiveContext.sql("select col1, col2 from regTable") When the number of files is large (query all from the total 6000 files) , the second case is much slower then the first one. Any ideas why? BR, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org <mailto:user-h...@spark.apache.org>
execute native system commands in Spark
Hi, Is it possible to execute native system commands (in parallel) Spark, like scala.sys.process ? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to run parallel on each DataFrame group
Hi, I need suggestions on my coding. I would like to split DataFrame (rowDF) by a column (depth) into groups. Then sort each group, repartition and save output of each group into one file. See code below> val rowDF = sqlContext.createDataFrame(rowRDD, schema).cache() for (i <- 0 to 16) { val filterDF = rowDF.filter("depth="+i) val finalDF = filterDF.sort("xy").coalesce(1) finalDF.write.format("org.apache.spark.sql.hive.orc.DefaultSource").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("depth").saveAsTable(args(3)) } The problem is each group after filtered is handled by an executor one by one. How to change the code to allow each group run in parallel? I looked at groupBy, but seem only for aggregation. Thanks, Patcharee
spark streaming count msg in batch
Hi, In spark streaming how to count the total number of message (from Socket) in one batch? Thanks, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming - History UI
Hi, On my history server UI, I cannot see "streaming" tab for any streaming jobs? I am using version 1.5.1. Any ideas? Thanks, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming - History UI
I meant there is no streaming tab at all. It looks like I need version 1.6 Patcharee On 02. des. 2015 11:34, Steve Loughran wrote: The history UI doesn't update itself for live apps (SPARK-7889) -though I'm working on it Are you trying to view a running streaming job? On 2 Dec 2015, at 05:28, patcharee wrote: Hi, On my history server UI, I cannot see "streaming" tab for any streaming jobs? I am using version 1.5.1. Any ideas? Thanks, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark applications metrics
Hi How can I see the summary of data read / write, shuffle read / write, etc of an Application, not per stage? Thanks, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark UI - Streaming Tab
Hi, We tried to get the streaming tab interface on Spark UI - https://databricks.com/blog/2015/07/08/new-visualizations-for-understanding-spark-streaming-applications.html Tested on version 1.5.1, 1.6.0-snapshot, but no such interface for streaming applications at all. Any suggestions? Do we need to configure the history UI somehow to get such interface? Thanks, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark UI - Streaming Tab
I ran streaming jobs, but no streaming tab appeared for those jobs. Patcharee On 04. des. 2015 18:12, PhuDuc Nguyen wrote: I believe the "Streaming" tab is dynamic - it appears once you have a streaming job running, not when the cluster is simply up. It does not depend on 1.6 and has been in there since at least 1.0. HTH, Duc On Fri, Dec 4, 2015 at 7:28 AM, patcharee <mailto:patcharee.thong...@uni.no>> wrote: Hi, We tried to get the streaming tab interface on Spark UI - https://databricks.com/blog/2015/07/08/new-visualizations-for-understanding-spark-streaming-applications.html Tested on version 1.5.1, 1.6.0-snapshot, but no such interface for streaming applications at all. Any suggestions? Do we need to configure the history UI somehow to get such interface? Thanks, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org <mailto:user-h...@spark.apache.org>
bad performance on PySpark - big text file
Hi, I am very new to PySpark. I have a PySpark app working on text files with different size (100M - 100G). However each task is handling the same size of input split. But workers spend very much longer time on some input splits, especially when the input splits belong to a big file. See the log of these two input splits (check python.PythonRunner: Times: total ... ) 15/12/08 07:37:15 INFO rdd.NewHadoopRDD: Input split: hdfs://helmhdfs/user/patcharee/ntap-raw-20151015-20151126/html2/budisansblog.blogspot.com.html:39728447488+134217728 15/12/08 08:49:30 INFO python.PythonRunner: Times: total = 4335010, boot = -140, init = 282, finish = 4334868 15/12/08 08:49:30 INFO storage.MemoryStore: ensureFreeSpace(125163) called with curMem=227636200, maxMem=4341293383 15/12/08 08:49:30 INFO storage.MemoryStore: Block rdd_3_1772 stored as bytes in memory (estimated size 122.2 KB, free 3.8 GB) 15/12/08 08:49:30 INFO python.PythonRunner: Times: total = 4, boot = 1, init = 0, finish = 3 15/12/08 08:49:30 INFO storage.MemoryStore: ensureFreeSpace(126595) called with curMem=227761363, maxMem=4341293383 15/12/08 08:49:30 INFO storage.MemoryStore: Block rdd_9_1772 stored as bytes in memory (estimated size 123.6 KB, free 3.8 GB) 15/12/08 08:49:30 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 15/12/08 08:49:30 INFO datasources.DynamicPartitionWriterContainer: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 15/12/08 08:49:30 INFO output.FileOutputCommitter: Saved output of task 'attempt_201512080849_0002_m_001772_0' to hdfs://helmhdfs/user/patcharee/NTAPBlogInfo/_temporary/0/task_201512080849_0002_m_001772 15/12/08 08:49:30 INFO mapred.SparkHadoopMapRedUtil: attempt_201512080849_0002_m_001772_0: Committed 15/12/08 08:49:30 INFO executor.Executor: Finished task 1772.0 in stage 2.0 (TID 1770). 16216 bytes result sent to driver 15/12/07 20:52:24 INFO rdd.NewHadoopRDD: Input split: hdfs://helmhdfs/user/patcharee/ntap-raw-20151015-20151126/html2/bcnn1wp.wordpress.com.html:1476395008+134217728 15/12/07 20:53:06 INFO python.PythonRunner: Times: total = 41776, boot = -425, init = 432, finish = 41769 15/12/07 20:53:06 INFO storage.MemoryStore: ensureFreeSpace(1434614) called with curMem=167647961, maxMem=4341293383 15/12/07 20:53:06 INFO storage.MemoryStore: Block rdd_3_994 stored as bytes in memory (estimated size 1401.0 KB, free 3.9 GB) 15/12/07 20:53:06 INFO python.PythonRunner: Times: total = 40, boot = -20, init = 21, finish = 39 15/12/07 20:53:06 INFO storage.MemoryStore: ensureFreeSpace(1463477) called with curMem=169082575, maxMem=4341293383 15/12/07 20:53:06 INFO storage.MemoryStore: Block rdd_9_994 stored as bytes in memory (estimated size 1429.2 KB, free 3.9 GB) 15/12/07 20:53:06 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 15/12/07 20:53:06 INFO datasources.DynamicPartitionWriterContainer: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 15/12/07 20:53:06 INFO output.FileOutputCommitter: Saved output of task 'attempt_201512072053_0002_m_000994_0' to hdfs://helmhdfs/user/patcharee/NTAPBlogInfo/_temporary/0/task_201512072053_0002_m_000994 15/12/07 20:53:06 INFO mapred.SparkHadoopMapRedUtil: attempt_201512072053_0002_m_000994_0: Committed 15/12/07 20:53:06 INFO executor.Executor: Finished task 994.0 in stage 2.0 (TID 990). 9386 bytes result sent to driver Any suggestions please Thanks, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark 1.5 sort slow
Hi, I found spark 1.5 sorting is very slow compared to spark 1.4. Below is my code snippet val sqlRDD = sql("select date, u, v, z from fino3_hr3 where zone == 2 and z >= 2 and z <= order by date, z") println("sqlRDD " + sqlRDD.count()) The fino3_hr3 (in the sql command) is a hive table in orc format, partitioned by zone and z. Spark 1.5 takes 4.5 mins to execute this sql, while spark 1.4 takes 1.5 mins. I noticed that dissimilar to spark 1.4 when spark 1.5 sorted, data was shuffled into few tasks, not divided for all tasks. Do I need to set any configuration explicitly? Any suggestions? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark performance - executor computing time
Hi, I was running a job (on Spark 1.5 + Yarn + java 8). In a stage that lookup (org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:873)) there was an executor that took the executor computing time > 6 times of median. This executor had almost the same shuffle read size and low gc time as others. What can impact the executor computing time? Any suggestions what parameters I should monitor/configure? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Idle time between jobs
Hi, I am using Spark 1.5. I have a spark application which is divided into some jobs. I noticed from the Event Timeline - Spark History UI, that there was idle time between jobs. See below, job 1 was submitted at 11:20:49 and finished at 11:20:52, but the job 2 was submitted "16s" after (at 11:21:08). I wonder what is going on during 16s? Any suggestions? Job IdDescription SubmittedDuration 2 saveAsTextFile at GenerateHistogram.scala:143 2015/09/16 11:21:08 0.7 s 1 collect at GenerateHistogram.scala:132 2015/09/16 11:20:49 2 s 0 count at GenerateHistogram.scala:129 2015/09/16 11:20:41 9 s Below is log 15/09/16 11:20:52 INFO DAGScheduler: Job 1 finished: collect at GenerateHistogram.scala:132, took 2.221756 s 15/09/16 11:21:08 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 15/09/16 11:21:08 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 15/09/16 11:21:08 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 15/09/16 11:21:08 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 15/09/16 11:21:08 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 15/09/16 11:21:08 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 15/09/16 11:21:08 INFO SparkContext: Starting job: saveAsTextFile at GenerateHistogram.scala:143 15/09/16 11:21:08 INFO DAGScheduler: Got job 2 (saveAsTextFile at GenerateHistogram.scala:143) with 1 output partitions 15/09/16 11:21:08 INFO DAGScheduler: Final stage: ResultStage 2(saveAsTextFile at GenerateHistogram.scala:143) BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
sparkR 3rd library
Hi, I am using spark.lapply to execute an existing R script in standalone mode. This script calls a function 'rbga' from a 3rd library 'genalg'. This rbga function works fine in sparkR env when I call it directly, but when I apply this to spark.lapply I get the error could not find function "rbga" at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108) at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala Any ideas/suggestions? BR, Patcharee - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
RDD String foreach println
Hi, I would like to print the content of RDD[String]. I tried 1) linesWithSpark.foreach(println) 2) linesWithSpark.collect().foreach(println) I submitted the job by spark-submit. 1) did not print, but 2) did. But when I used the shell, both 1) and 2) printed. Any ideas why 1) behaves differently on job submit and shell? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
method newAPIHadoopFile
Hi, I am new to spark and scala. I have a custom inputformat (used before with mapreduce) and I am trying to use it in spark. In java api (the syntax is correct): JavaPairRDD pairVarOriRDD = sc.newAPIHadoopFile( path, NetCDFFileInputFormat.class, WRFIndex.class, WRFVariable.class, jobConf); But in scala: val pairVarOriRDD = sc.newAPIHadoopFile(path, classOf[NetCDFFileInputFormat], classOf[WRFIndex], classOf[WRFVariable], jobConf) The compiler complained> inferred type arguments [no.uni.computing.io.WRFIndex,no.uni.computing.io.WRFVariable,no.uni.computing.io.input.NetCDFFileInputFormat] do not conform to method newAPIHadoopFile's type parameter bounds [K,V,F <: org.apache.hadoop.mapreduce.InputFormat[K,V]] What is the correct syntax for scala api? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: method newAPIHadoopFile
This is the declaration of my custom inputformat public class NetCDFFileInputFormat extends ArrayBasedFileInputFormat public abstract class ArrayBasedFileInputFormat extends org.apache.hadoop.mapreduce.lib.input.FileInputFormat Best, Patcharee On 25. feb. 2015 10:15, patcharee wrote: Hi, I am new to spark and scala. I have a custom inputformat (used before with mapreduce) and I am trying to use it in spark. In java api (the syntax is correct): JavaPairRDD pairVarOriRDD = sc.newAPIHadoopFile( path, NetCDFFileInputFormat.class, WRFIndex.class, WRFVariable.class, jobConf); But in scala: val pairVarOriRDD = sc.newAPIHadoopFile(path, classOf[NetCDFFileInputFormat], classOf[WRFIndex], classOf[WRFVariable], jobConf) The compiler complained> inferred type arguments [no.uni.computing.io.WRFIndex,no.uni.computing.io.WRFVariable,no.uni.computing.io.input.NetCDFFileInputFormat] do not conform to method newAPIHadoopFile's type parameter bounds [K,V,F <: org.apache.hadoop.mapreduce.InputFormat[K,V]] What is the correct syntax for scala api? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: method newAPIHadoopFile
I tried val pairVarOriRDD = sc.newAPIHadoopFile(path, classOf[NetCDFFileInputFormat].asSubclass( classOf[org.apache.hadoop.mapreduce.lib.input.FileInputFormat[WRFIndex,WRFVariable]]), classOf[WRFIndex], classOf[WRFVariable], jobConf) The compiler does not complain. Please let me know if this solution is not good enough. Patcharee On 25. feb. 2015 10:57, Sean Owen wrote: OK, from the declaration you sent me separately: public class NetCDFFileInputFormat extends ArrayBasedFileInputFormat public abstract class ArrayBasedFileInputFormat extends org.apache.hadoop.mapreduce.lib.input.FileInputFormat It looks like you do not declare any generic types that FileInputFormat declares for the key and value type. I think you can get away with this in the Java API with warnings, but scalac is correct that you have not given an InputFormat that matches the bounds required by the API. That is you need to extend something like ArrayBasedFileInputFormat< WRFIndex ,WRFVariable> On Wed, Feb 25, 2015 at 9:15 AM, patcharee wrote: Hi, I am new to spark and scala. I have a custom inputformat (used before with mapreduce) and I am trying to use it in spark. In java api (the syntax is correct): JavaPairRDD pairVarOriRDD = sc.newAPIHadoopFile( path, NetCDFFileInputFormat.class, WRFIndex.class, WRFVariable.class, jobConf); But in scala: val pairVarOriRDD = sc.newAPIHadoopFile(path, classOf[NetCDFFileInputFormat], classOf[WRFIndex], classOf[WRFVariable], jobConf) The compiler complained> inferred type arguments [no.uni.computing.io.WRFIndex,no.uni.computing.io.WRFVariable,no.uni.computing.io.input.NetCDFFileInputFormat] do not conform to method newAPIHadoopFile's type parameter bounds [K,V,F <: org.apache.hadoop.mapreduce.InputFormat[K,V]] What is the correct syntax for scala api? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
custom inputformat serializable problem
Hi, I am using custom inputformat and recordreader. This custom recordreader has declaration: public class NetCDFRecordReader extends RecordReaderWRFVariableText> The WRFVariableText extends Text: public class WRFVariableText extends org.apache.hadoop.io.Text The WRFVariableText overrides readFields(DataInput in) and write(DataOutput out) method. I understand that this WRFVariableText already implements serialization. But I got an exception about serialization when I ran my job using the custom inputformat and recordreader>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: no.uni.computing.io.WRFVariableText Any ideas? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
NoSuchElementException: None.get
Hi, I got NoSuchElementException when I tried to iterate through a Map which contains some elements (not null, not empty). When I debug my code (below). It seems the first part of the code which fills the Map is executed after the second part that iterates the Map. The 1st part and 2nd part belongs to a method of a case class, it should be executed sequentially? Any ideas? Best, Patcharee --- java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at no.uni.computing.etl.CalculateHeightClass$$anonfun$calculateHeightForEachZ$2.apply(LoadWrfIntoHive.scala:161) at no.uni.computing.etl.CalculateHeightClass$$anonfun$calculateHeightForEachZ$2.apply(LoadWrfIntoHive.scala:156) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) at no.uni.computing.etl.CalculateHeightClass.calculateHeightForEachZ(LoadWrfIntoHive.scala:156) at no.uni.computing.etl.LoadWrfIntoHive$$anonfun$6.apply(LoadWrfIntoHive.scala:74) at no.uni.computing.etl.LoadWrfIntoHive$$anonfun$6.apply(LoadWrfIntoHive.scala:74) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) --- object LoadWrfIntoHive { def main(args: Array[String]) { .. val listDataHeightRDD = groupDataHeightRDD.map(CalculateHeightClass().calculateHeightForEachZ) .. } } case class CalculateHeightClass() { def calculateHeightForEachZ(x: (String, Iterable[RawDataForHeight])): (String, Map[Integer,Float]) = { var result: Map[Integer, Float] = Map() var valMap: Map[Integer, scala.collection.mutable.MutableList[Double]] = Map() val it = x._2.iterator while (it.hasNext) { //Adding element into valMap } for (currZ <- valMap.keySet) { < ERROR THROWN } (x._1, result) } } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
insert Hive table with RDD
Hi, How can I insert an existing hive table with an RDD containing my data? Any examples? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: insert Hive table with RDD
Hi, I guess that toDF() api in spark 1.3 which is required build from source code? Patcharee On 03. mars 2015 13:42, Cheng, Hao wrote: Using the SchemaRDD / DataFrame API via HiveContext Assume you're using the latest code, something probably like: val hc = new HiveContext(sc) import hc.implicits._ existedRdd.toDF().insertInto("hivetable") or existedRdd.toDF().registerTempTable("mydata") hc.sql("insert into hivetable as select xxx from mydata") -Original Message- From: patcharee [mailto:patcharee.thong...@uni.no] Sent: Tuesday, March 3, 2015 7:09 PM To: user@spark.apache.org Subject: insert Hive table with RDD Hi, How can I insert an existing hive table with an RDD containing my data? Any examples? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
insert hive partitioned table
Hi, I tried to insert into a hive partitioned table val ZONE: Int = Integer.valueOf(args(2)) val MONTH: Int = Integer.valueOf(args(3)) val YEAR: Int = Integer.valueOf(args(4)) val weightedUVToDF = weightedUVToRecord.toDF() weightedUVToDF.registerTempTable("speeddata") hiveContext.sql("INSERT OVERWRITE table speed partition (year=" + YEAR + ",month=" + MONTH + ",zone=" + ZONE + ") select key, speed, direction from speeddata") First I registered a temporary table "speeddata". The value of the partitioned column (year, month, zone) is from user input. If I would like to get the value of the partitioned column from the temporary table, how can I do that? BR, Patcharee
Re: insert hive partitioned table
I would like to insert the table, and the value of the partition column to be inserted must be from temporary registered table/dataframe. Patcharee On 16. mars 2015 15:26, Cheng Lian wrote: Not quite sure whether I understand your question properly. But if you just want to read the partition columns, it’s pretty easy. Take the “year” column as an example, you may do this in HiveQL: |hiveContext.sql("SELECT year FROM speed") | or in DataFrame DSL: |hiveContext.table("speed").select("year") | Cheng On 3/16/15 9:59 PM, patcharee wrote: Hi, I tried to insert into a hive partitioned table val ZONE: Int = Integer.valueOf(args(2)) val MONTH: Int = Integer.valueOf(args(3)) val YEAR: Int = Integer.valueOf(args(4)) val weightedUVToDF = weightedUVToRecord.toDF() weightedUVToDF.registerTempTable("speeddata") hiveContext.sql("INSERT OVERWRITE table speed partition (year=" + YEAR + ",month=" + MONTH + ",zone=" + ZONE + ") select key, speed, direction from speeddata") First I registered a temporary table "speeddata". The value of the partitioned column (year, month, zone) is from user input. If I would like to get the value of the partitioned column from the temporary table, how can I do that? BR, Patcharee
Spark Job History Server
Hi, I am using spark 1.3. I would like to use Spark Job History Server. I added the following line into conf/spark-defaults.conf spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService spark.history.provider org.apache.spark.deploy.yarn.history.YarnHistoryProvider spark.yarn.historyServer.address sandbox.hortonworks.com:19888 But got Exception in thread "main" java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider What class is really needed? How to fix it? Br, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Job History Server
I turned it on. But it failed to start. In the log, Spark assembly has been built with Hive, including Datanucleus jars on classpath Spark Command: /usr/lib/jvm/java-1.7.0-openjdk.x86_64/bin/java -cp :/root/spark-1.3.0-bin-hadoop2.4/sbin/../conf:/root/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/etc/hadoop/conf -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.history.HistoryServer 15/03/18 10:23:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Exception in thread "main" java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:183) at org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala) Patcharee On 18. mars 2015 11:35, Akhil Das wrote: You can simply turn it on using: |./sbin/start-history-server.sh| Read more here <http://spark.apache.org/docs/1.3.0/monitoring.html>. Thanks Best Regards On Wed, Mar 18, 2015 at 4:00 PM, patcharee <mailto:patcharee.thong...@uni.no>> wrote: Hi, I am using spark 1.3. I would like to use Spark Job History Server. I added the following line into conf/spark-defaults.conf spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService spark.history.provider org.apache.spark.deploy.yarn.history.YarnHistoryProvider spark.yarn.historyServer.address sandbox.hortonworks.com:19888 <http://sandbox.hortonworks.com:19888> But got Exception in thread "main" java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider What class is really needed? How to fix it? Br, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org <mailto:user-h...@spark.apache.org>
Re: Spark Job History Server
Hi, My spark was compiled with yarn profile, I can run spark on yarn without problem. For the spark job history server problem, I checked spark-assembly-1.3.0-hadoop2.4.0.jar and found that the package org.apache.spark.deploy.yarn.history is missing. I don't know why BR, Patcharee On 18. mars 2015 11:43, Akhil Das wrote: You are not having yarn package in the classpath. You need to build your spark it with yarn. You can read these docs. <http://spark.apache.org/docs/1.3.0/running-on-yarn.html> Thanks Best Regards On Wed, Mar 18, 2015 at 4:07 PM, patcharee <mailto:patcharee.thong...@uni.no>> wrote: I turned it on. But it failed to start. In the log, Spark assembly has been built with Hive, including Datanucleus jars on classpath Spark Command: /usr/lib/jvm/java-1.7.0-openjdk.x86_64/bin/java -cp :/root/spark-1.3.0-bin-hadoop2.4/sbin/../conf:/root/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/root/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/etc/hadoop/conf -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.history.HistoryServer 15/03/18 10:23:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Exception in thread "main" java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:183) at org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala) Patcharee On 18. mars 2015 11:35, Akhil Das wrote: You can simply turn it on using: |./sbin/start-history-server.sh| Read more here <http://spark.apache.org/docs/1.3.0/monitoring.html>. Thanks Best Regards On Wed, Mar 18, 2015 at 4:00 PM, patcharee mailto:patcharee.thong...@uni.no>> wrote: Hi, I am using spark 1.3. I would like to use Spark Job History Server. I added the following line into conf/spark-defaults.conf spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService spark.history.provider org.apache.spark.deploy.yarn.history.YarnHistoryProvider spark.yarn.historyServer.address sandbox.hortonworks.com:19888 <http://sandbox.hortonworks.com:19888> But got Exception in thread "main" java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.history.YarnHistoryProvider What class is really needed? How to fix it? Br, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org <mailto:user-h...@spark.apache.org>
override log4j.properties
Hello, How to override log4j.properties for a specific spark job? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
AccessControlException hive table created from spark shell
Hi, I found a permission problem when I created a hive table from hive context in spark shell version 1.2.1. Then I tried to update this table, but got AccessControlException because this table is owned by hive, not my account. From the hive context, hiveContext.sql("create table orc_table(key INT, value STRING) stored as orc") hiveContext.hql("INSERT INTO table orc_table select * from testtable") --> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=patcharee, access=WRITE, inode="/apps/hive/warehouse/orc_table":hive:hdfs:drwxr-xr-x From hadoop dsf, the table is owned by hive. [patcharee@machine-10-0 ~]$ hadoop fs -ls /apps/hive/warehouse/ drwxr-xr-x - hive hdfs 0 2015-05-18 11:02 /apps/hive/warehouse/orc_table Any ideas? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
executor running time vs getting result from jupyter notebook
Hi, I am running a jupyter notebook - pyspark. I noticed from the history server UI there are some tasks spending a lot of time on either - executor running time - getting result But some tasks finished both steps very quick. All tasks however have very similar input size. What can be the factor of time spending on these steps? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sql query orc slow
Hi Zhan Zhang, Is my problem (which is ORC predicate is not generated from WHERE clause even though spark.sql.orc.filterPushdown=true) can be related to some factors below ? - orc file version (File Version: 0.12 with HIVE_8732) - hive version (using Hive 1.2.1.2.3.0.0-2557) - orc table is not sorted / indexed - the split strategy hive.exec.orc.split.strategy BR, Patcharee On 10/09/2015 08:01 PM, Zhan Zhang wrote: That is weird. Unfortunately, there is no debug info available on this part. Can you please open a JIRA to add some debug information on the driver side? Thanks. Zhan Zhang On Oct 9, 2015, at 10:22 AM, patcharee <mailto:patcharee.thong...@uni.no>> wrote: I set hiveContext.setConf("spark.sql.orc.filterPushdown", "true"). But from the log No ORC pushdown predicate for my query with WHERE clause. 15/10/09 19:16:01 DEBUG OrcInputFormat: No ORC pushdown predicate I did not understand what wrong with this. BR, Patcharee On 09. okt. 2015 19:10, Zhan Zhang wrote: In your case, you manually set an AND pushdown, and the predicate is right based on your setting, : leaf-0 = (EQUALS x 320) The right way is to enable the predicate pushdown as follows. sqlContext.setConf("spark.sql.orc.filterPushdown", "true”) Thanks. Zhan Zhang On Oct 9, 2015, at 9:58 AM, patcharee wrote: Hi Zhan Zhang Actually my query has WHERE clause "select date, month, year, hh, (u*0.9122461 - v*-0.40964267), (v*0.9122461 + u*-0.40964267), z from 4D where x = 320 and y = 117 and zone == 2 and year=2009 and z >= 2 and z <= 8", column "x", "y" is not partition column, the others are partition columns. I expected the system will use predicate pushdown. I turned on the debug and found pushdown predicate was not generated ("DEBUG OrcInputFormat: No ORC pushdown predicate") Then I tried to set the search argument explicitly (on the column "x" which is not partition column) val xs = SearchArgumentFactory.newBuilder().startAnd().equals("x", 320).end().build() hiveContext.setConf("hive.io.file.readcolumn.names", "x") hiveContext.setConf("sarg.pushdown", xs.toKryo()) this time in the log pushdown predicate was generated but results was wrong (no results at all) 15/10/09 18:36:06 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 = (EQUALS x 320) expr = leaf-0 Any ideas What wrong with this? Why the ORC pushdown predicate is not applied by the system? BR, Patcharee On 09. okt. 2015 18:31, Zhan Zhang wrote: Hi Patcharee, >From the query, it looks like only the column pruning will be applied. Partition pruning and predicate pushdown does not have effect. Do you see big IO difference between two methods? The potential reason of the speed difference I can think of may be the different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, but the spark path use OrcInputFormat. Thanks. Zhan Zhang On Oct 8, 2015, at 11:55 PM, patcharee wrote: Yes, the predicate pushdown is enabled, but still take longer time than the first method BR, Patcharee On 08. okt. 2015 18:43, Zhan Zhang wrote: Hi Patcharee, Did you enable the predicate pushdown in the second method? Thanks. Zhan Zhang On Oct 8, 2015, at 1:43 AM, patcharee wrote: Hi, I am using spark sql 1.5 to query a hive table stored as partitioned orc file. We have the total files is about 6000 files and each file size is about 245MB. What is the difference between these two query methods below: 1. Using query on hive table directly hiveContext.sql("select col1, col2 from table1") 2. Reading from orc file, register temp table and query from the temp table val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1") c.registerTempTable("regTable") hiveContext.sql("select col1, col2 from regTable") When the number of files is large (query all from the total 6000 files) , the second case is much slower then the first one. Any ideas why? BR, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org <mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: <mailto:user-h...@spark.apache.org>user-h...@spark.apache.org
Re: sql query orc slow
Hi Zhan Zhang, Here is the issue https://issues.apache.org/jira/browse/SPARK-11087 BR, Patcharee On 10/13/2015 06:47 PM, Zhan Zhang wrote: Hi Patcharee, I am not sure which side is wrong, driver or executor. If it is executor side, the reason you mentioned may be possible. But if the driver side didn’t set the predicate at all, then somewhere else is broken. Can you please file a JIRA with a simple reproduce step, and let me know the JIRA number? Thanks. Zhan Zhang On Oct 13, 2015, at 1:01 AM, Patcharee Thongtra mailto:patcharee.thong...@uni.no>> wrote: Hi Zhan Zhang, Is my problem (which is ORC predicate is not generated from WHERE clause even though spark.sql.orc.filterPushdown=true) can be related to some factors below ? - orc file version (File Version: 0.12 with HIVE_8732) - hive version (using Hive 1.2.1.2.3.0.0-2557) - orc table is not sorted / indexed - the split strategy hive.exec.orc.split.strategy BR, Patcharee On 10/09/2015 08:01 PM, Zhan Zhang wrote: That is weird. Unfortunately, there is no debug info available on this part. Can you please open a JIRA to add some debug information on the driver side? Thanks. Zhan Zhang On Oct 9, 2015, at 10:22 AM, patcharee wrote: I set hiveContext.setConf("spark.sql.orc.filterPushdown", "true"). But from the log No ORC pushdown predicate for my query with WHERE clause. 15/10/09 19:16:01 DEBUG OrcInputFormat: No ORC pushdown predicate I did not understand what wrong with this. BR, Patcharee On 09. okt. 2015 19:10, Zhan Zhang wrote: In your case, you manually set an AND pushdown, and the predicate is right based on your setting, : leaf-0 = (EQUALS x 320) The right way is to enable the predicate pushdown as follows. sqlContext.setConf("spark.sql.orc.filterPushdown", "true”) Thanks. Zhan Zhang On Oct 9, 2015, at 9:58 AM, patcharee wrote: Hi Zhan Zhang Actually my query has WHERE clause "select date, month, year, hh, (u*0.9122461 - v*-0.40964267), (v*0.9122461 + u*-0.40964267), z from 4D where x = 320 and y = 117 and zone == 2 and year=2009 and z >= 2 and z <= 8", column "x", "y" is not partition column, the others are partition columns. I expected the system will use predicate pushdown. I turned on the debug and found pushdown predicate was not generated ("DEBUG OrcInputFormat: No ORC pushdown predicate") Then I tried to set the search argument explicitly (on the column "x" which is not partition column) val xs = SearchArgumentFactory.newBuilder().startAnd().equals("x", 320).end().build() hiveContext.setConf("hive.io.file.readcolumn.names", "x") hiveContext.setConf("sarg.pushdown", xs.toKryo()) this time in the log pushdown predicate was generated but results was wrong (no results at all) 15/10/09 18:36:06 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 = (EQUALS x 320) expr = leaf-0 Any ideas What wrong with this? Why the ORC pushdown predicate is not applied by the system? BR, Patcharee On 09. okt. 2015 18:31, Zhan Zhang wrote: Hi Patcharee, >From the query, it looks like only the column pruning will be applied. Partition pruning and predicate pushdown does not have effect. Do you see big IO difference between two methods? The potential reason of the speed difference I can think of may be the different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, but the spark path use OrcInputFormat. Thanks. Zhan Zhang On Oct 8, 2015, at 11:55 PM, patcharee wrote: Yes, the predicate pushdown is enabled, but still take longer time than the first method BR, Patcharee On 08. okt. 2015 18:43, Zhan Zhang wrote: Hi Patcharee, Did you enable the predicate pushdown in the second method? Thanks. Zhan Zhang On Oct 8, 2015, at 1:43 AM, patcharee wrote: Hi, I am using spark sql 1.5 to query a hive table stored as partitioned orc file. We have the total files is about 6000 files and each file size is about 245MB. What is the difference between these two query methods below: 1. Using query on hive table directly hiveContext.sql("select col1, col2 from table1") 2. Reading from orc file, register temp table and query from the temp table val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1") c.registerTempTable("regTable") hiveContext.sql("select col1, col2 from regTable") When the number of files is large (query all from the total 6000 files) , the second case is much slower then the first one. Any ideas why? BR, - To unsubscribe, e-mail: <mailto:user-unsubscr...@spark.apache.org>user-unsubscr...@spark.apache.org For additional commands, e-mail: <mailto:user-h...@spark.apache.org>user-h...@spark.apache.org
locality level counter
Hi, I do not understand how this locality level counter work. I have an application working on unsplittable binary files in 6 nodes cluster. One file = 3 data blocks. The application reads the whole file into RDD. Why are the Local Level of all tasks (in History Server UI) NODE_LOCAL? Thanks, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
data local read counter
Hi, Is there a counter for data local read? I understood that it is locality level counter, but it seems not. Thanks, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
custom inputformat recordreader
Hi, In python how to use inputformat/custom recordreader? Thanks, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.lang.RuntimeException: Couldn't find function Some
Hi, In my spark application I queried a hive table and tried to take only one record, but got java.lang.RuntimeException: Couldn't find function Some val rddCoOrd = sql("SELECT date, x, y FROM coordinate where order by date limit 1") valresultCoOrd = rddCoOrd.take(1)(0) Any ideas? I tested the same code on spark shell, it worked. Best, Patcharee
bad symbolic reference. A signature in SparkContext.class refers to term conf in value org.apache.hadoop which is not available
Hi, I have built spark version 1.3 and tried to use this in my spark scala application. When I tried to compile and build the application by SBT, I got error> bad symbolic reference. A signature in SparkContext.class refers to term conf in value org.apache.hadoop which is not available It seems hadoop library is missing, but it should be referred automatically by SBT, isn't it. This application is buit-able on spark version 1.2 Here is my build.sbt name := "wind25t-v013" version := "0.1" scalaVersion := "2.10.4" unmanagedBase := baseDirectory.value / "lib" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.3.0" libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.3.0" libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.3.0" What should I do to fix it? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
No assemblies found in assembly/target/scala-2.10
Hi, I am trying to build spark 1.3 from source. After I executed| mvn -DskipTests clean package| I tried to use shell but got this error [root@sandbox spark]# ./bin/spark-shell Exception in thread "main" java.lang.IllegalStateException: No assemblies found in '/root/spark/assembly/target/scala-2.10'. at org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:228) at org.apache.spark.launcher.AbstractCommandBuilder.findAssembly(AbstractCommandBuilder.java:352) at org.apache.spark.launcher.AbstractCommandBuilder.buildClassPath(AbstractCommandBuilder.java:185) at org.apache.spark.launcher.AbstractCommandBuilder.buildJavaCommand(AbstractCommandBuilder.java:111) at org.apache.spark.launcher.SparkSubmitCommandBuilder.buildSparkSubmitCommand(SparkSubmitCommandBuilder.java:177) at org.apache.spark.launcher.SparkSubmitCommandBuilder.buildCommand(SparkSubmitCommandBuilder.java:102) at org.apache.spark.launcher.Main.main(Main.java:74) Any ideas? Patcharee